Added base_sld_domains filter

This commit is contained in:
Kevin Veen-Birkenbach 2025-05-17 12:19:50 +02:00
parent 76f303da27
commit 1c0224d1df
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
6 changed files with 171 additions and 109 deletions

View File

@ -0,0 +1,69 @@
import re
from ansible.errors import AnsibleFilterError
class FilterModule(object):
"""
Custom Ansible filter plugin:
- generate_all_domains: Flatten, dedupe, sort domains with optional www prefixes
- generate_base_sld_domains: Extract unique sld.tld domains from values and redirect sources
"""
def filters(self):
return {
'generate_all_domains': self.generate_all_domains,
'generate_base_sld_domains': self.generate_base_sld_domains,
}
@staticmethod
def generate_all_domains(domains_dict, include_www=True):
"""
Transform a dict of domains (values: str, list, dict) into a flat list,
optionally add 'www.' prefixes, dedupe and sort alphabetically.
Avoids infinite loops by snapshotting initial domain list for www prefixes.
"""
try:
flat = FilterModule._flatten_domain_values(domains_dict)
if include_www:
# Snapshot original list to avoid extending while iterating
original = list(flat)
flat.extend([f"www.{d}" for d in original])
return sorted(set(flat))
except Exception as exc:
raise AnsibleFilterError(f"generate_all_domains failed: {exc}")
@staticmethod
def generate_base_sld_domains(domains_dict, redirect_mappings):
"""
Flatten domains_dict and redirect_mappings, extract second-level + top-level domains.
redirect_mappings: list of dicts with key 'source'
"""
try:
flat = FilterModule._flatten_domain_values(domains_dict)
for mapping in redirect_mappings or []:
src = mapping.get('source')
if isinstance(src, str):
flat.append(src)
elif isinstance(src, list):
flat.extend(src)
pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$')
slds = {m.group(1) for d in flat if (m := pattern.match(d))}
return sorted(slds)
except Exception as exc:
raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}")
@staticmethod
def _flatten_domain_values(domains_dict):
"""
Helper to extract domain strings from dict values (str, list, dict).
"""
flat = []
for val in (domains_dict or {}).values():
if isinstance(val, str):
flat.append(val)
elif isinstance(val, list):
flat.extend(val)
elif isinstance(val, dict):
flat.extend(val.values())
return flat

View File

@ -1,47 +0,0 @@
import os
from ansible.errors import AnsibleFilterError
class FilterModule(object):
"""
Custom Ansible filter to generate a flattened, deduplicated,
and sorted list of domains, with optional 'www.' prefixes.
"""
def filters(self):
return {
'generate_all_domains': self.generate_all_domains,
}
@staticmethod
def generate_all_domains(domains_dict, include_www=True):
"""
Transform a dict of domains into a flat list of domain strings.
Values in domains_dict may be strings, lists, or dicts.
If include_www is True, also generate "www." variants.
The final list is deduplicated and sorted alphabetically.
:param domains_dict: dict where each value is str, list, or dict of domains
:param include_www: bool indicating if 'www.' prefixes should be added
:return: sorted list of unique domain names
"""
try:
flat = []
for val in domains_dict.values():
if isinstance(val, str):
flat.append(val)
elif isinstance(val, list):
flat.extend(val)
elif isinstance(val, dict):
flat.extend(val.values())
else:
# skip unsupported types
continue
if include_www:
flat.extend(['www.' + d for d in flat])
# dedupe and sort
return sorted(set(flat))
except Exception as exc:
raise AnsibleFilterError(f"generate_all_domains failed: {exc}")

View File

@ -62,20 +62,15 @@
set_fact:
service_provider: "{{ defaults_service_provider | combine(service_provider | default({}, true), recursive=True) }}"
- name: Gather base domains (without www)
- name: Build base_sld_domains (sld.tld) in one go
set_fact:
base_domains: >-
{{
domains.values()
| flatten
+ (redirect_domain_mappings | map(attribute='source') | list)
base_sld_domains: >-
{{ domains
| generate_base_sld_domains(redirect_domain_mappings)
}}
- name: Extract sld.tld from base_domains
- name: Set all domains incl. www redirect if enabled
set_fact:
base_sld_domains: "{{ base_domains | map('regex_replace', '^(?:.*\\.)?([^.]+\\.[^.]+)$', '\\1') | list | unique | sort }}"
- set_fact:
all_domains: >-
{{ domains
| generate_all_domains(

View File

@ -0,0 +1,48 @@
import unittest
import sys
import os
# Ensure filter_plugins directory is on the path
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
)
from domain_filters import FilterModule
class TestGenerateAllDomains(unittest.TestCase):
def setUp(self):
self.filter = FilterModule().generate_all_domains
def test_simple_string_values(self):
domains = {'app': 'example.com'}
result = self.filter(domains)
expected = ['example.com', 'www.example.com']
self.assertEqual(result, expected)
def test_list_and_dict_values(self):
domains = {
'app1': ['one.com', 'two.com'],
'app2': {'x': 'x.com', 'y': 'y.com'}
}
result = self.filter(domains)
expected = sorted([
'one.com', 'two.com', 'x.com', 'y.com',
'www.one.com', 'www.two.com', 'www.x.com', 'www.y.com'
])
self.assertEqual(result, expected)
def test_include_www_false(self):
domains = {'app': 'no-www.com'}
result = self.filter(domains, include_www=False)
self.assertEqual(result, ['no-www.com'])
def test_deduplicate_and_sort(self):
domains = {
'a': 'dup.com',
'b': 'dup.com',
'c': ['b.com', 'a.com'],
}
result = self.filter(domains)
expected = ['a.com', 'b.com', 'dup.com', 'www.a.com', 'www.b.com', 'www.dup.com']
self.assertEqual(result, expected)

View File

@ -0,0 +1,49 @@
import unittest
import sys
import os
# Ensure filter_plugins directory is on the path
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
)
from domain_filters import FilterModule
class TestGenerateBaseSldDomains(unittest.TestCase):
def setUp(self):
self.filter = FilterModule().generate_base_sld_domains
def test_simple_string_and_redirect(self):
domains = {'app': 'sub.example.com'}
redirects = [{'source': 'alias.example.com'}]
result = self.filter(domains, redirects)
self.assertEqual(result, ['example.com'])
def test_without_redirect_mappings(self):
domains = {
'a': 'a.co',
'b': ['b.co', 'sub.c.co'],
'c': {'x': 'x.co'}
}
result = self.filter(domains, None)
self.assertEqual(result, ['a.co', 'b.co', 'c.co', 'x.co'])
def test_redirect_list_sources(self):
domains = {'app': 'app.domain.org'}
redirects = [{'source': ['alias.domain.org', 'deep.sub.example.net']}]
result = self.filter(domains, redirects)
self.assertEqual(result, ['domain.org', 'example.net'])
def test_duplicate_entries_and_sorting(self):
domains = {
'x': ['one.com', 'sub.one.com'],
'y': 'two.com',
'z': {'k': 'one.com'}
}
redirects = [{'source': 'deep.two.com'}]
result = self.filter(domains, redirects)
self.assertEqual(result, ['one.com', 'two.com'])
if __name__ == '__main__':
unittest.main()

View File

@ -1,52 +0,0 @@
import sys
import os
import pytest
from filter_plugins.generate_all_domains import FilterModule
@pytest.fixture
def generate_filter():
"""
Fixture to return the generate_all_domains filter function.
"""
fm = FilterModule()
return fm.generate_all_domains
def test_simple_string_values(generate_filter):
domains = {'app': 'example.com'}
result = generate_filter(domains)
# Expect original and www-prefixed, deduped and sorted
expected = ['example.com', 'www.example.com']
assert result == expected
def test_list_and_dict_values(generate_filter):
domains = {
'app1': ['one.com', 'two.com'],
'app2': {'x': 'x.com', 'y': 'y.com'}
}
result = generate_filter(domains)
expected = sorted([
'one.com', 'two.com', 'x.com', 'y.com',
'www.one.com', 'www.two.com', 'www.x.com', 'www.y.com'
])
assert result == expected
def test_include_www_false(generate_filter):
domains = {'app': 'no-www.com'}
result = generate_filter(domains, include_www=False)
# Only the original domain
assert result == ['no-www.com']
def test_deduplicate_and_sort(generate_filter):
domains = {
'a': 'dup.com',
'b': 'dup.com',
'c': ['b.com', 'a.com'],
}
result = generate_filter(domains)
# Should contain unique domains sorted alphabetically
expected = ['a.com', 'b.com', 'dup.com', 'www.a.com', 'www.b.com', 'www.dup.com']
assert result == expected
if __name__ == '__main__':
pytest.main()