Optimized replace logic for oc
This commit is contained in:
26
main.py
26
main.py
@@ -200,18 +200,21 @@ def main():
|
|||||||
result = conn.search_s(schema_dn, ldap.SCOPE_BASE,
|
result = conn.search_s(schema_dn, ldap.SCOPE_BASE,
|
||||||
attrlist=['olcObjectClasses'])
|
attrlist=['olcObjectClasses'])
|
||||||
existing = result[0][1].get('olcObjectClasses', [])
|
existing = result[0][1].get('olcObjectClasses', [])
|
||||||
if encoded in existing:
|
norm_existing = [normalize(v) for v in existing]
|
||||||
print(f"ℹ️ ObjectClass already exists, replacing (DELETE + ADD): {ocdef}")
|
norm_encoded = normalize(encoded)
|
||||||
mods = [
|
|
||||||
(ldap.MOD_DELETE, 'olcObjectClasses', [encoded]),
|
if norm_encoded in norm_existing:
|
||||||
(ldap.MOD_ADD, 'olcObjectClasses', [encoded])
|
print(f"✅ ObjectClass already up to date: {ocdef}")
|
||||||
]
|
continue
|
||||||
try:
|
|
||||||
conn.modify_s(schema_dn, mods)
|
elif any(extract_oid(oc.decode()) == extract_oid(ocdef) for oc in existing):
|
||||||
|
print(f"⚠️ ObjectClass with same OID exists, replacing...")
|
||||||
|
to_delete = [oc for oc in existing if extract_oid(oc.decode()) == extract_oid(ocdef)]
|
||||||
|
for oc in to_delete:
|
||||||
|
conn.modify_s(schema_dn, [(ldap.MOD_DELETE, 'olcObjectClasses', [oc])])
|
||||||
|
conn.modify_s(schema_dn, [(ldap.MOD_ADD, 'olcObjectClasses', [encoded])])
|
||||||
print(f"🔄 Replaced ObjectClass: {ocdef}")
|
print(f"🔄 Replaced ObjectClass: {ocdef}")
|
||||||
except ldap.LDAPError as e:
|
|
||||||
print(f"❌ LDAP error replacing ObjectClass '{ocdef}': {e}")
|
|
||||||
sys.exit(2)
|
|
||||||
else:
|
else:
|
||||||
conn.modify_s(schema_dn, [
|
conn.modify_s(schema_dn, [
|
||||||
(ldap.MOD_ADD, 'olcObjectClasses', [encoded])
|
(ldap.MOD_ADD, 'olcObjectClasses', [encoded])
|
||||||
@@ -221,6 +224,7 @@ def main():
|
|||||||
print(f"❌ LDAP error for ObjectClass '{ocdef}': {e}", file=sys.stderr)
|
print(f"❌ LDAP error for ObjectClass '{ocdef}': {e}", file=sys.stderr)
|
||||||
sys.exit(3)
|
sys.exit(3)
|
||||||
|
|
||||||
|
|
||||||
conn.unbind_s()
|
conn.unbind_s()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from unittest.mock import patch, MagicMock
|
from unittest.mock import patch, MagicMock
|
||||||
import main as manage_schema
|
import main as manage_schema
|
||||||
|
import sys
|
||||||
|
|
||||||
class TestManageSchema(unittest.TestCase):
|
class TestManageSchema(unittest.TestCase):
|
||||||
|
|
||||||
@@ -53,6 +54,21 @@ class TestManageSchema(unittest.TestCase):
|
|||||||
dn, [(0, 'olcObjectClasses', [encoded])]
|
dn, [(0, 'olcObjectClasses', [encoded])]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Test no double add
|
||||||
|
mock_conn.modify_s.reset_mock()
|
||||||
|
mock_conn.search_s.return_value = [("cn={0}nextcloud,cn=schema,cn=config", {'olcObjectClasses': [encoded]})]
|
||||||
|
with patch.object(sys, 'argv', [
|
||||||
|
'manage_schema.py',
|
||||||
|
'-s', 'ldap://localhost',
|
||||||
|
'-D', 'cn=admin,dc=example,dc=org',
|
||||||
|
'-W', 'secret',
|
||||||
|
'-n', 'nextcloud',
|
||||||
|
'-c', ocdef
|
||||||
|
]):
|
||||||
|
manage_schema.main()
|
||||||
|
mock_conn.modify_s.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
def test_extract_oid_from_definition(self):
|
def test_extract_oid_from_definition(self):
|
||||||
ldif = "( 1.3.6.1.4.1.99999.1 NAME 'example' DESC 'something' )"
|
ldif = "( 1.3.6.1.4.1.99999.1 NAME 'example' DESC 'something' )"
|
||||||
oid = manage_schema.extract_oid(ldif)
|
oid = manage_schema.extract_oid(ldif)
|
||||||
|
|||||||
Reference in New Issue
Block a user