diff --git a/filter_plugins/alias_domains_map.py b/filter_plugins/alias_domains_map.py new file mode 100644 index 00000000..d2b51972 --- /dev/null +++ b/filter_plugins/alias_domains_map.py @@ -0,0 +1,86 @@ +from ansible.errors import AnsibleFilterError + +class FilterModule(object): + def filters(self): + return {'alias_domains_map': self.alias_domains_map} + + def alias_domains_map(self, apps, primary_domain): + """ + Build a map of application IDs to their alias domains. + + - If no `domains` key → [] + - If `domains` exists but is an empty dict → return the original cfg + - Explicit `aliases` are used (default appended if missing) + - If only `canonical` defined and it doesn't include default, default is added + - Invalid types raise AnsibleFilterError + """ + def parse_entry(domains_cfg, key, app_id): + if key not in domains_cfg: + return None + entry = domains_cfg[key] + if isinstance(entry, dict): + values = list(entry.values()) + elif isinstance(entry, list): + values = entry + else: + raise AnsibleFilterError( + f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}" + ) + for d in values: + if not isinstance(d, str) or not d.strip(): + raise AnsibleFilterError( + f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}" + ) + return values + + def default_domain(app_id, primary): + return f"{app_id}.{primary}" + + # 1) Precompute canonical domains per app (fallback to default) + canonical_map = {} + for app_id, cfg in apps.items(): + domains_cfg = cfg.get('domains') or {} + entry = domains_cfg.get('canonical') + if entry is None: + canonical_map[app_id] = [default_domain(app_id, primary_domain)] + elif isinstance(entry, dict): + canonical_map[app_id] = list(entry.values()) + elif isinstance(entry, list): + canonical_map[app_id] = list(entry) + else: + raise AnsibleFilterError( + f"Unexpected type for 'domains.canonical' in application '{app_id}': {type(entry).__name__}" + ) + + # 2) Build alias list per app + result = {} + for app_id, cfg in apps.items(): + domains_cfg = cfg.get('domains') + + # no domains key → no aliases + if domains_cfg is None: + result[app_id] = [] + continue + + # empty domains dict → return the original cfg + if isinstance(domains_cfg, dict) and not domains_cfg: + result[app_id] = cfg + continue + + # otherwise, compute aliases + aliases = parse_entry(domains_cfg, 'aliases', app_id) or [] + default = default_domain(app_id, primary_domain) + has_aliases = 'aliases' in domains_cfg + has_canon = 'canonical' in domains_cfg + + if has_aliases: + if default not in aliases: + aliases.append(default) + elif has_canon: + canon = canonical_map.get(app_id, []) + if default not in canon and default not in aliases: + aliases.append(default) + + result[app_id] = aliases + + return result \ No newline at end of file diff --git a/filter_plugins/canonical_domains_map.py b/filter_plugins/canonical_domains_map.py new file mode 100644 index 00000000..afbb0c10 --- /dev/null +++ b/filter_plugins/canonical_domains_map.py @@ -0,0 +1,75 @@ +from ansible.errors import AnsibleFilterError + +class FilterModule(object): + def filters(self): + return {'canonical_domains_map': self.canonical_domains_map} + + def canonical_domains_map(self, apps, primary_domain): + def parse_entry(domains_cfg, key, app_id): + if key not in domains_cfg: + return None + entry = domains_cfg[key] + if isinstance(entry, dict): + values = list(entry.values()) + elif isinstance(entry, list): + values = entry + else: + raise AnsibleFilterError( + f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}" + ) + for d in values: + if not isinstance(d, str) or not d.strip(): + raise AnsibleFilterError( + f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}" + ) + return values + + result = {} + seen = {} + + for app_id, cfg in apps.items(): + domains_cfg = cfg.get('domains') + if not domains_cfg or 'canonical' not in domains_cfg: + default = f"{app_id}.{primary_domain}" + if default in seen: + raise AnsibleFilterError( + f"Domain '{default}' is already configured for '{seen[default]}' and '{app_id}'" + ) + seen[default] = app_id + result[app_id] = [default] + continue + + entry = domains_cfg['canonical'] + + if isinstance(entry, dict): + for name, domain in entry.items(): + if not isinstance(domain, str) or not domain.strip(): + raise AnsibleFilterError( + f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}" + ) + if domain in seen: + raise AnsibleFilterError( + f"Domain '{domain}' is already configured for '{seen[domain]}' and '{app_id}'" + ) + seen[domain] = app_id + result[app_id] = entry.copy() + + elif isinstance(entry, list): + for domain in entry: + if not isinstance(domain, str) or not domain.strip(): + raise AnsibleFilterError( + f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}" + ) + if domain in seen: + raise AnsibleFilterError( + f"Domain '{domain}' is already configured for '{seen[domain]}' and '{app_id}'" + ) + seen[domain] = app_id + result[app_id] = list(entry) + + else: + raise AnsibleFilterError( + f"Unexpected type for 'domains.canonical' in application '{app_id}': {type(entry).__name__}" + ) + + return result diff --git a/filter_plugins/domain_filters.py b/filter_plugins/domain_filters.py deleted file mode 100644 index d6ac6c30..00000000 --- a/filter_plugins/domain_filters.py +++ /dev/null @@ -1,184 +0,0 @@ -import re -from ansible.errors import AnsibleFilterError - -class FilterModule(object): - """ - Ansible Filter Plugin for Domain Processing - - This plugin provides filters to manage and transform domain configurations for applications: - - - generate_all_domains(domains_dict, include_www=True): - Flattens nested domain values (string, list, or dict), optionally adds 'www.' prefixes, - removes duplicates, and returns a sorted list of unique domains. - - - generate_base_sld_domains(domains_dict, redirect_mappings): - Flattens domains and redirect mappings, extracts second-level + top-level domains (SLDs), - deduplicates, and returns a sorted list of base domains. - - - canonical_domains_map(apps, primary_domain): - Builds a mapping of application IDs to their canonical domains using - DomainUtils.canonical_list, enforcing uniqueness and detecting conflicts. - - - alias_domains_map(apps, primary_domain): - Generates alias domains for each application via DomainUtils.alias_list, - based on their canonical domains and provided configurations. - """ - - def filters(self): - return { - 'generate_all_domains': self.generate_all_domains, - 'generate_base_sld_domains': self.generate_base_sld_domains, - 'canonical_domains_map': self.canonical_domains_map, - 'alias_domains_map': self.alias_domains_map, - } - - @staticmethod - def parse_entry(domains_cfg, key, app_id): - """ - Extract list of strings from domains_cfg[key], which may be dict or list. - Returns None if key not in domains_cfg. - Raises AnsibleFilterError on invalid type or empty/invalid values. - """ - if key not in domains_cfg: - return None - entry = domains_cfg[key] - if isinstance(entry, dict): - values = list(entry.values()) - elif isinstance(entry, list): - values = entry - else: - raise AnsibleFilterError( - f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}" - ) - for d in values: - if not isinstance(d, str) or not d.strip(): - raise AnsibleFilterError( - f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}" - ) - return values - - @staticmethod - def default_domain(app_id, primary_domain): - """ - Returns the default domain string for an application. - """ - return f"{app_id}.{primary_domain}" - - @classmethod - def canonical_list(cls, domains_cfg, app_id, primary_domain): - """ - Returns the list of canonical domains: parsed entry or default. - """ - domains = cls.parse_entry(domains_cfg, 'canonical', app_id) - if domains is None: - return [cls.default_domain(app_id, primary_domain)] - return domains - - @classmethod - def alias_list(cls, domains_cfg, app_id, primary_domain, canonical_domains=None): - """ - Returns the list of alias domains based on: - - explicit aliases entry - - presence of canonical entry and default not in canonical - Always ensures default domain in aliases when appropriate. - """ - default = cls.default_domain(app_id, primary_domain) - aliases = cls.parse_entry(domains_cfg, 'aliases', app_id) or [] - has_aliases = 'aliases' in domains_cfg - has_canonical = 'canonical' in domains_cfg - - if has_aliases: - if default not in aliases: - aliases.append(default) - elif has_canonical: - # use provided canonical_domains if given otherwise parse - canon = canonical_domains if canonical_domains is not None else cls.parse_entry(domains_cfg, 'canonical', app_id) - if default not in (canon or []): - aliases.append(default) - # else: neither defined -> empty list - return aliases - - - @staticmethod - def generate_all_domains(domains_dict, include_www=True): - """ - Transform a dict of domains (values: str, list, dict) into a flat list, - optionally add 'www.' prefixes, dedupe and sort alphabetically. - - Avoids infinite loops by snapshotting initial domain list for www prefixes. - """ - try: - flat = FilterModule._flatten_domain_values(domains_dict) - if include_www: - # Snapshot original list to avoid extending while iterating - original = list(flat) - flat.extend([f"www.{d}" for d in original]) - return sorted(set(flat)) - except Exception as exc: - raise AnsibleFilterError(f"generate_all_domains failed: {exc}") - - @staticmethod - def generate_base_sld_domains(domains_dict, redirect_mappings): - """ - Flatten domains_dict and redirect_mappings, extract second-level + top-level domains. - redirect_mappings: list of dicts with key 'source' - """ - try: - flat = FilterModule._flatten_domain_values(domains_dict) - for mapping in redirect_mappings or []: - src = mapping.get('source') - if isinstance(src, str): - flat.append(src) - elif isinstance(src, list): - flat.extend(src) - - pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$') - slds = {m.group(1) for d in flat if (m := pattern.match(d))} - return sorted(slds) - except Exception as exc: - raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}") - - @staticmethod - def _flatten_domain_values(domains_dict): - """ - Helper to extract domain strings from dict values (str, list, dict). - """ - flat = [] - for val in (domains_dict or {}).values(): - if isinstance(val, str): - flat.append(val) - elif isinstance(val, list): - flat.extend(val) - elif isinstance(val, dict): - flat.extend(val.values()) - return flat - - def canonical_domains_map(self, apps, primary_domain): - result = {} - seen = {} - for app_id, app_cfg in apps.items(): - domains_cfg = app_cfg.get('domains', {}) or {} - domains = self.canonical_list(domains_cfg, app_id, primary_domain) - for d in domains: - if d in seen: - raise AnsibleFilterError( - f"Domain '{d}' is configured for both '{seen[d]}' and '{app_id}'" - ) - seen[d] = app_id - result[app_id] = domains - return result - - def alias_domains_map(self, apps, primary_domain): - result = {} - # wir können die canonical_map vorab holen… - canonical_map = self.canonical_domains_map(apps, primary_domain) - for app_id, app_cfg in apps.items(): - domains_cfg = app_cfg.get('domains', {}) or {} - aliases = self.alias_list( - domains_cfg, - app_id, - primary_domain, - canonical_domains=canonical_map.get(app_id), - ) - result[app_id] = aliases - return result diff --git a/filter_plugins/generate_all_domains.py b/filter_plugins/generate_all_domains.py new file mode 100644 index 00000000..2bf25ef2 --- /dev/null +++ b/filter_plugins/generate_all_domains.py @@ -0,0 +1,31 @@ +from ansible.errors import AnsibleFilterError + +class FilterModule(object): + def filters(self): + return {'generate_all_domains': self.generate_all_domains} + + def generate_all_domains(self, domains_dict, include_www=True): + """ + Transform a dict of domains (values: str, list, dict) into a flat list, + optionally add 'www.' prefixes, dedupe and sort alphabetically. + """ + # lokaler Helfer zum Flatten + def _flatten(domains): + flat = [] + for v in (domains or {}).values(): + if isinstance(v, str): + flat.append(v) + elif isinstance(v, list): + flat.extend(v) + elif isinstance(v, dict): + flat.extend(v.values()) + return flat + + try: + flat = _flatten(domains_dict) + if include_www: + original = list(flat) + flat.extend([f"www.{d}" for d in original]) + return sorted(set(flat)) + except Exception as exc: + raise AnsibleFilterError(f"generate_all_domains failed: {exc}") diff --git a/filter_plugins/generate_base_sld_domains.py b/filter_plugins/generate_base_sld_domains.py new file mode 100644 index 00000000..6c72e332 --- /dev/null +++ b/filter_plugins/generate_base_sld_domains.py @@ -0,0 +1,37 @@ +import re +from ansible.errors import AnsibleFilterError + +class FilterModule(object): + def filters(self): + return {'generate_base_sld_domains': self.generate_base_sld_domains} + + def generate_base_sld_domains(self, domains_dict, redirect_mappings): + """ + Flatten domains_dict und redirect_mappings, extrahiere SLDs (z.B. example.com), + dedupe und sortiere. + """ + def _flatten(domains): + flat = [] + for v in (domains or {}).values(): + if isinstance(v, str): + flat.append(v) + elif isinstance(v, list): + flat.extend(v) + elif isinstance(v, dict): + flat.extend(v.values()) + return flat + + try: + flat = _flatten(domains_dict) + for mapping in redirect_mappings or []: + src = mapping.get('source') + if isinstance(src, str): + flat.append(src) + elif isinstance(src, list): + flat.extend(src) + + pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$') + slds = {m.group(1) for d in flat if (m := pattern.match(d))} + return sorted(slds) + except Exception as exc: + raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}") diff --git a/tasks/constructor.yml b/tasks/constructor.yml index febf1471..ca1e8bf8 100644 --- a/tasks/constructor.yml +++ b/tasks/constructor.yml @@ -17,6 +17,13 @@ set_fact: domains: "{{ defaults_domains | combine(domains | default({}, true), recursive=True) }}" + - name: "Merged Variables" + # Add new merged variables here + debug: + msg: + domains: "{{ defaults_domains }}" + when: enable_debug | bool + - name: Merge redirect domain definitions into dictionary set_fact: combined_mapping: >- diff --git a/tests/unit/test_domain_filters.py b/tests/unit/test_domain_filters_alias.py similarity index 65% rename from tests/unit/test_domain_filters.py rename to tests/unit/test_domain_filters_alias.py index c42310b0..75b18ad2 100644 --- a/tests/unit/test_domain_filters.py +++ b/tests/unit/test_domain_filters_alias.py @@ -9,7 +9,7 @@ dir_path = os.path.abspath( sys.path.insert(0, dir_path) from ansible.errors import AnsibleFilterError -from domain_filters import FilterModule +from alias_domains_map import FilterModule class TestDomainFilters(unittest.TestCase): def setUp(self): @@ -17,45 +17,6 @@ class TestDomainFilters(unittest.TestCase): # Sample primary domain self.primary = 'example.com' - def test_canonical_empty_apps(self): - apps = {} - expected = {} - result = self.filter_module.canonical_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - def test_canonical_without_domains(self): - apps = {'app1': {}} - expected = {'app1': ['app1.example.com']} - result = self.filter_module.canonical_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - def test_canonical_with_list(self): - apps = { - 'app1': { - 'domains': {'canonical': ['foo.com', 'bar.com']} - } - } - result = self.filter_module.canonical_domains_map(apps, self.primary) - self.assertCountEqual(result['app1'], ['foo.com', 'bar.com']) - - def test_canonical_with_dict(self): - apps = { - 'app1': { - 'domains': {'canonical': {'one': 'one.com', 'two': 'two.com'}} - } - } - result = self.filter_module.canonical_domains_map(apps, self.primary) - self.assertCountEqual(result['app1'], ['one.com', 'two.com']) - - def test_canonical_duplicate_raises(self): - apps = { - 'app1': {'domains': {'canonical': ['dup.com']}}, - 'app2': {'domains': {'canonical': ['dup.com']}}, - } - with self.assertRaises(AnsibleFilterError) as cm: - self.filter_module.canonical_domains_map(apps, self.primary) - self.assertIn("configured for both", str(cm.exception)) - def test_alias_empty_apps(self): apps = {} expected = {} @@ -105,13 +66,6 @@ class TestDomainFilters(unittest.TestCase): result = self.filter_module.alias_domains_map(apps, self.primary) self.assertEqual(result, expected) - def test_invalid_canonical_type(self): - apps = { - 'app1': {'domains': {'canonical': 123}} - } - with self.assertRaises(AnsibleFilterError): - self.filter_module.canonical_domains_map(apps, self.primary) - def test_invalid_aliases_type(self): apps = { 'app1': {'domains': {'aliases': 123}} @@ -119,5 +73,31 @@ class TestDomainFilters(unittest.TestCase): with self.assertRaises(AnsibleFilterError): self.filter_module.alias_domains_map(apps, self.primary) + def test_alias_with_empty_domains_cfg(self): + apps = { + 'app1': { + 'domains': {} + } + } + expected = apps + result = self.filter_module.alias_domains_map(apps, self.primary) + self.assertEqual(result, expected) + + def test_alias_with_canonical_dict_not_default(self): + apps = { + 'app1': { + 'domains': { + 'canonical': { + 'one': 'one.com', + 'two': 'two.com' + } + } + } + } + expected = {'app1': ['app1.example.com']} + result = self.filter_module.alias_domains_map(apps, self.primary) + self.assertEqual(result, expected) + + if __name__ == "__main__": - unittest.main() + unittest.main() \ No newline at end of file diff --git a/tests/unit/test_domain_filters_all_domains.py b/tests/unit/test_domain_filters_all_domains.py index 5a17f7c0..d8b5a984 100644 --- a/tests/unit/test_domain_filters_all_domains.py +++ b/tests/unit/test_domain_filters_all_domains.py @@ -8,7 +8,7 @@ sys.path.insert( os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins')) ) -from domain_filters import FilterModule +from generate_all_domains import FilterModule class TestGenerateAllDomains(unittest.TestCase): def setUp(self): diff --git a/tests/unit/test_domain_filters_base_sld_domains.py b/tests/unit/test_domain_filters_base_sld_domains.py index 81a8253f..fd4ce1ff 100644 --- a/tests/unit/test_domain_filters_base_sld_domains.py +++ b/tests/unit/test_domain_filters_base_sld_domains.py @@ -8,7 +8,7 @@ sys.path.insert( os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins')) ) -from domain_filters import FilterModule +from generate_base_sld_domains import FilterModule class TestGenerateBaseSldDomains(unittest.TestCase): def setUp(self): diff --git a/tests/unit/test_domain_filters_canonical.py b/tests/unit/test_domain_filters_canonical.py new file mode 100644 index 00000000..f25eed83 --- /dev/null +++ b/tests/unit/test_domain_filters_canonical.py @@ -0,0 +1,74 @@ +import os +import sys +import unittest + +# Add the filter_plugins directory to the import path +dir_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), '../../filter_plugins') +) +sys.path.insert(0, dir_path) + +from ansible.errors import AnsibleFilterError +from canonical_domains_map import FilterModule + +class TestDomainFilters(unittest.TestCase): + def setUp(self): + self.filter_module = FilterModule() + # Sample primary domain + self.primary = 'example.com' + + def test_canonical_empty_apps(self): + apps = {} + expected = {} + result = self.filter_module.canonical_domains_map(apps, self.primary) + self.assertEqual(result, expected) + + def test_canonical_without_domains(self): + apps = {'app1': {}} + expected = {'app1': ['app1.example.com']} + result = self.filter_module.canonical_domains_map(apps, self.primary) + self.assertEqual(result, expected) + + def test_canonical_with_list(self): + apps = { + 'app1': { + 'domains': {'canonical': ['foo.com', 'bar.com']} + } + } + result = self.filter_module.canonical_domains_map(apps, self.primary) + self.assertCountEqual( + result['app1'], + ['foo.com', 'bar.com'] + ) + + def test_canonical_with_dict(self): + apps = { + 'app1': { + 'domains': {'canonical': {'one': 'one.com', 'two': 'two.com'}} + } + } + result = self.filter_module.canonical_domains_map(apps, self.primary) + self.assertEqual( + result['app1'], + {'one': 'one.com', 'two': 'two.com'} + ) + + def test_canonical_duplicate_raises(self): + apps = { + 'app1': {'domains': {'canonical': ['dup.com']}}, + 'app2': {'domains': {'canonical': ['dup.com']}}, + } + with self.assertRaises(AnsibleFilterError) as cm: + self.filter_module.canonical_domains_map(apps, self.primary) + # Updated to match new exception message + self.assertIn("already configured for", str(cm.exception)) + + def test_invalid_canonical_type(self): + apps = { + 'app1': {'domains': {'canonical': 123}} + } + with self.assertRaises(AnsibleFilterError): + self.filter_module.canonical_domains_map(apps, self.primary) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file