From 1c0224d1df43c4843ead0cca156eea84fb564c8e Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Sat, 17 May 2025 12:19:50 +0200 Subject: [PATCH] Added base_sld_domains filter --- filter_plugins/domain_filters.py | 69 +++++++++++++++++++ filter_plugins/generate_all_domains.py | 47 ------------- tasks/constructor.yml | 15 ++-- tests/unit/test_domain_filters_all_domains.py | 48 +++++++++++++ .../test_domain_filters_base_sld_domains.py | 49 +++++++++++++ tests/unit/test_generate_all_domains.py | 52 -------------- 6 files changed, 171 insertions(+), 109 deletions(-) create mode 100644 filter_plugins/domain_filters.py delete mode 100644 filter_plugins/generate_all_domains.py create mode 100644 tests/unit/test_domain_filters_all_domains.py create mode 100644 tests/unit/test_domain_filters_base_sld_domains.py delete mode 100644 tests/unit/test_generate_all_domains.py diff --git a/filter_plugins/domain_filters.py b/filter_plugins/domain_filters.py new file mode 100644 index 00000000..abb24043 --- /dev/null +++ b/filter_plugins/domain_filters.py @@ -0,0 +1,69 @@ +import re +from ansible.errors import AnsibleFilterError + +class FilterModule(object): + """ + Custom Ansible filter plugin: + - generate_all_domains: Flatten, dedupe, sort domains with optional www prefixes + - generate_base_sld_domains: Extract unique sld.tld domains from values and redirect sources + """ + + def filters(self): + return { + 'generate_all_domains': self.generate_all_domains, + 'generate_base_sld_domains': self.generate_base_sld_domains, + } + + @staticmethod + def generate_all_domains(domains_dict, include_www=True): + """ + Transform a dict of domains (values: str, list, dict) into a flat list, + optionally add 'www.' prefixes, dedupe and sort alphabetically. + + Avoids infinite loops by snapshotting initial domain list for www prefixes. + """ + try: + flat = FilterModule._flatten_domain_values(domains_dict) + if include_www: + # Snapshot original list to avoid extending while iterating + original = list(flat) + flat.extend([f"www.{d}" for d in original]) + return sorted(set(flat)) + except Exception as exc: + raise AnsibleFilterError(f"generate_all_domains failed: {exc}") + + @staticmethod + def generate_base_sld_domains(domains_dict, redirect_mappings): + """ + Flatten domains_dict and redirect_mappings, extract second-level + top-level domains. + redirect_mappings: list of dicts with key 'source' + """ + try: + flat = FilterModule._flatten_domain_values(domains_dict) + for mapping in redirect_mappings or []: + src = mapping.get('source') + if isinstance(src, str): + flat.append(src) + elif isinstance(src, list): + flat.extend(src) + + pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$') + slds = {m.group(1) for d in flat if (m := pattern.match(d))} + return sorted(slds) + except Exception as exc: + raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}") + + @staticmethod + def _flatten_domain_values(domains_dict): + """ + Helper to extract domain strings from dict values (str, list, dict). + """ + flat = [] + for val in (domains_dict or {}).values(): + if isinstance(val, str): + flat.append(val) + elif isinstance(val, list): + flat.extend(val) + elif isinstance(val, dict): + flat.extend(val.values()) + return flat diff --git a/filter_plugins/generate_all_domains.py b/filter_plugins/generate_all_domains.py deleted file mode 100644 index b9084352..00000000 --- a/filter_plugins/generate_all_domains.py +++ /dev/null @@ -1,47 +0,0 @@ -import os -from ansible.errors import AnsibleFilterError - -class FilterModule(object): - """ - Custom Ansible filter to generate a flattened, deduplicated, - and sorted list of domains, with optional 'www.' prefixes. - """ - - def filters(self): - return { - 'generate_all_domains': self.generate_all_domains, - } - - @staticmethod - def generate_all_domains(domains_dict, include_www=True): - """ - Transform a dict of domains into a flat list of domain strings. - Values in domains_dict may be strings, lists, or dicts. - If include_www is True, also generate "www." variants. - The final list is deduplicated and sorted alphabetically. - - :param domains_dict: dict where each value is str, list, or dict of domains - :param include_www: bool indicating if 'www.' prefixes should be added - :return: sorted list of unique domain names - """ - try: - flat = [] - for val in domains_dict.values(): - if isinstance(val, str): - flat.append(val) - elif isinstance(val, list): - flat.extend(val) - elif isinstance(val, dict): - flat.extend(val.values()) - else: - # skip unsupported types - continue - - if include_www: - flat.extend(['www.' + d for d in flat]) - - # dedupe and sort - return sorted(set(flat)) - - except Exception as exc: - raise AnsibleFilterError(f"generate_all_domains failed: {exc}") \ No newline at end of file diff --git a/tasks/constructor.yml b/tasks/constructor.yml index 9964d5ea..bb218357 100644 --- a/tasks/constructor.yml +++ b/tasks/constructor.yml @@ -62,20 +62,15 @@ set_fact: service_provider: "{{ defaults_service_provider | combine(service_provider | default({}, true), recursive=True) }}" - - name: Gather base domains (without www) + - name: Build base_sld_domains (sld.tld) in one go set_fact: - base_domains: >- - {{ - domains.values() - | flatten - + (redirect_domain_mappings | map(attribute='source') | list) + base_sld_domains: >- + {{ domains + | generate_base_sld_domains(redirect_domain_mappings) }} - - name: Extract sld.tld from base_domains + - name: Set all domains incl. www redirect if enabled set_fact: - base_sld_domains: "{{ base_domains | map('regex_replace', '^(?:.*\\.)?([^.]+\\.[^.]+)$', '\\1') | list | unique | sort }}" - - - set_fact: all_domains: >- {{ domains | generate_all_domains( diff --git a/tests/unit/test_domain_filters_all_domains.py b/tests/unit/test_domain_filters_all_domains.py new file mode 100644 index 00000000..5a17f7c0 --- /dev/null +++ b/tests/unit/test_domain_filters_all_domains.py @@ -0,0 +1,48 @@ +import unittest +import sys +import os + +# Ensure filter_plugins directory is on the path +sys.path.insert( + 0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins')) +) + +from domain_filters import FilterModule + +class TestGenerateAllDomains(unittest.TestCase): + def setUp(self): + self.filter = FilterModule().generate_all_domains + + def test_simple_string_values(self): + domains = {'app': 'example.com'} + result = self.filter(domains) + expected = ['example.com', 'www.example.com'] + self.assertEqual(result, expected) + + def test_list_and_dict_values(self): + domains = { + 'app1': ['one.com', 'two.com'], + 'app2': {'x': 'x.com', 'y': 'y.com'} + } + result = self.filter(domains) + expected = sorted([ + 'one.com', 'two.com', 'x.com', 'y.com', + 'www.one.com', 'www.two.com', 'www.x.com', 'www.y.com' + ]) + self.assertEqual(result, expected) + + def test_include_www_false(self): + domains = {'app': 'no-www.com'} + result = self.filter(domains, include_www=False) + self.assertEqual(result, ['no-www.com']) + + def test_deduplicate_and_sort(self): + domains = { + 'a': 'dup.com', + 'b': 'dup.com', + 'c': ['b.com', 'a.com'], + } + result = self.filter(domains) + expected = ['a.com', 'b.com', 'dup.com', 'www.a.com', 'www.b.com', 'www.dup.com'] + self.assertEqual(result, expected) \ No newline at end of file diff --git a/tests/unit/test_domain_filters_base_sld_domains.py b/tests/unit/test_domain_filters_base_sld_domains.py new file mode 100644 index 00000000..81a8253f --- /dev/null +++ b/tests/unit/test_domain_filters_base_sld_domains.py @@ -0,0 +1,49 @@ +import unittest +import sys +import os + +# Ensure filter_plugins directory is on the path +sys.path.insert( + 0, + os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins')) +) + +from domain_filters import FilterModule + +class TestGenerateBaseSldDomains(unittest.TestCase): + def setUp(self): + self.filter = FilterModule().generate_base_sld_domains + + def test_simple_string_and_redirect(self): + domains = {'app': 'sub.example.com'} + redirects = [{'source': 'alias.example.com'}] + result = self.filter(domains, redirects) + self.assertEqual(result, ['example.com']) + + def test_without_redirect_mappings(self): + domains = { + 'a': 'a.co', + 'b': ['b.co', 'sub.c.co'], + 'c': {'x': 'x.co'} + } + result = self.filter(domains, None) + self.assertEqual(result, ['a.co', 'b.co', 'c.co', 'x.co']) + + def test_redirect_list_sources(self): + domains = {'app': 'app.domain.org'} + redirects = [{'source': ['alias.domain.org', 'deep.sub.example.net']}] + result = self.filter(domains, redirects) + self.assertEqual(result, ['domain.org', 'example.net']) + + def test_duplicate_entries_and_sorting(self): + domains = { + 'x': ['one.com', 'sub.one.com'], + 'y': 'two.com', + 'z': {'k': 'one.com'} + } + redirects = [{'source': 'deep.two.com'}] + result = self.filter(domains, redirects) + self.assertEqual(result, ['one.com', 'two.com']) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/test_generate_all_domains.py b/tests/unit/test_generate_all_domains.py deleted file mode 100644 index 64d9c382..00000000 --- a/tests/unit/test_generate_all_domains.py +++ /dev/null @@ -1,52 +0,0 @@ -import sys -import os -import pytest - -from filter_plugins.generate_all_domains import FilterModule - -@pytest.fixture -def generate_filter(): - """ - Fixture to return the generate_all_domains filter function. - """ - fm = FilterModule() - return fm.generate_all_domains - -def test_simple_string_values(generate_filter): - domains = {'app': 'example.com'} - result = generate_filter(domains) - # Expect original and www-prefixed, deduped and sorted - expected = ['example.com', 'www.example.com'] - assert result == expected - -def test_list_and_dict_values(generate_filter): - domains = { - 'app1': ['one.com', 'two.com'], - 'app2': {'x': 'x.com', 'y': 'y.com'} - } - result = generate_filter(domains) - expected = sorted([ - 'one.com', 'two.com', 'x.com', 'y.com', - 'www.one.com', 'www.two.com', 'www.x.com', 'www.y.com' - ]) - assert result == expected - -def test_include_www_false(generate_filter): - domains = {'app': 'no-www.com'} - result = generate_filter(domains, include_www=False) - # Only the original domain - assert result == ['no-www.com'] - -def test_deduplicate_and_sort(generate_filter): - domains = { - 'a': 'dup.com', - 'b': 'dup.com', - 'c': ['b.com', 'a.com'], - } - result = generate_filter(domains) - # Should contain unique domains sorted alphabetically - expected = ['a.com', 'b.com', 'dup.com', 'www.a.com', 'www.b.com', 'www.dup.com'] - assert result == expected - -if __name__ == '__main__': - pytest.main()