diff --git a/main.py b/main.py index e284fcc..6264c4f 100755 --- a/main.py +++ b/main.py @@ -198,20 +198,23 @@ def main(): encoded = ocdef.encode() try: result = conn.search_s(schema_dn, ldap.SCOPE_BASE, - attrlist=['olcObjectClasses']) + attrlist=['olcObjectClasses']) existing = result[0][1].get('olcObjectClasses', []) - if encoded in existing: - print(f"â„šī¸ ObjectClass already exists, replacing (DELETE + ADD): {ocdef}") - mods = [ - (ldap.MOD_DELETE, 'olcObjectClasses', [encoded]), - (ldap.MOD_ADD, 'olcObjectClasses', [encoded]) - ] - try: - conn.modify_s(schema_dn, mods) - print(f"🔄 Replaced ObjectClass: {ocdef}") - except ldap.LDAPError as e: - print(f"❌ LDAP error replacing ObjectClass '{ocdef}': {e}") - sys.exit(2) + norm_existing = [normalize(v) for v in existing] + norm_encoded = normalize(encoded) + + if norm_encoded in norm_existing: + print(f"✅ ObjectClass already up to date: {ocdef}") + continue + + elif any(extract_oid(oc.decode()) == extract_oid(ocdef) for oc in existing): + print(f"âš ī¸ ObjectClass with same OID exists, replacing...") + to_delete = [oc for oc in existing if extract_oid(oc.decode()) == extract_oid(ocdef)] + for oc in to_delete: + conn.modify_s(schema_dn, [(ldap.MOD_DELETE, 'olcObjectClasses', [oc])]) + conn.modify_s(schema_dn, [(ldap.MOD_ADD, 'olcObjectClasses', [encoded])]) + print(f"🔄 Replaced ObjectClass: {ocdef}") + else: conn.modify_s(schema_dn, [ (ldap.MOD_ADD, 'olcObjectClasses', [encoded]) @@ -221,6 +224,7 @@ def main(): print(f"❌ LDAP error for ObjectClass '{ocdef}': {e}", file=sys.stderr) sys.exit(3) + conn.unbind_s() if __name__ == '__main__': diff --git a/test_manage_schema.py b/test_manage_schema.py index 8365375..81fdaf7 100644 --- a/test_manage_schema.py +++ b/test_manage_schema.py @@ -1,6 +1,7 @@ import unittest from unittest.mock import patch, MagicMock import main as manage_schema +import sys class TestManageSchema(unittest.TestCase): @@ -53,6 +54,21 @@ class TestManageSchema(unittest.TestCase): dn, [(0, 'olcObjectClasses', [encoded])] ) + # Test no double add + mock_conn.modify_s.reset_mock() + mock_conn.search_s.return_value = [("cn={0}nextcloud,cn=schema,cn=config", {'olcObjectClasses': [encoded]})] + with patch.object(sys, 'argv', [ + 'manage_schema.py', + '-s', 'ldap://localhost', + '-D', 'cn=admin,dc=example,dc=org', + '-W', 'secret', + '-n', 'nextcloud', + '-c', ocdef + ]): + manage_schema.main() + mock_conn.modify_s.assert_not_called() + + def test_extract_oid_from_definition(self): ldif = "( 1.3.6.1.4.1.99999.1 NAME 'example' DESC 'something' )" oid = manage_schema.extract_oid(ldif)