mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-06-25 11:45:32 +02:00
Refactored domain logic
This commit is contained in:
parent
37dcc5f74e
commit
1b50f73803
86
filter_plugins/alias_domains_map.py
Normal file
86
filter_plugins/alias_domains_map.py
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {'alias_domains_map': self.alias_domains_map}
|
||||||
|
|
||||||
|
def alias_domains_map(self, apps, primary_domain):
|
||||||
|
"""
|
||||||
|
Build a map of application IDs to their alias domains.
|
||||||
|
|
||||||
|
- If no `domains` key → []
|
||||||
|
- If `domains` exists but is an empty dict → return the original cfg
|
||||||
|
- Explicit `aliases` are used (default appended if missing)
|
||||||
|
- If only `canonical` defined and it doesn't include default, default is added
|
||||||
|
- Invalid types raise AnsibleFilterError
|
||||||
|
"""
|
||||||
|
def parse_entry(domains_cfg, key, app_id):
|
||||||
|
if key not in domains_cfg:
|
||||||
|
return None
|
||||||
|
entry = domains_cfg[key]
|
||||||
|
if isinstance(entry, dict):
|
||||||
|
values = list(entry.values())
|
||||||
|
elif isinstance(entry, list):
|
||||||
|
values = entry
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}"
|
||||||
|
)
|
||||||
|
for d in values:
|
||||||
|
if not isinstance(d, str) or not d.strip():
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}"
|
||||||
|
)
|
||||||
|
return values
|
||||||
|
|
||||||
|
def default_domain(app_id, primary):
|
||||||
|
return f"{app_id}.{primary}"
|
||||||
|
|
||||||
|
# 1) Precompute canonical domains per app (fallback to default)
|
||||||
|
canonical_map = {}
|
||||||
|
for app_id, cfg in apps.items():
|
||||||
|
domains_cfg = cfg.get('domains') or {}
|
||||||
|
entry = domains_cfg.get('canonical')
|
||||||
|
if entry is None:
|
||||||
|
canonical_map[app_id] = [default_domain(app_id, primary_domain)]
|
||||||
|
elif isinstance(entry, dict):
|
||||||
|
canonical_map[app_id] = list(entry.values())
|
||||||
|
elif isinstance(entry, list):
|
||||||
|
canonical_map[app_id] = list(entry)
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Unexpected type for 'domains.canonical' in application '{app_id}': {type(entry).__name__}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2) Build alias list per app
|
||||||
|
result = {}
|
||||||
|
for app_id, cfg in apps.items():
|
||||||
|
domains_cfg = cfg.get('domains')
|
||||||
|
|
||||||
|
# no domains key → no aliases
|
||||||
|
if domains_cfg is None:
|
||||||
|
result[app_id] = []
|
||||||
|
continue
|
||||||
|
|
||||||
|
# empty domains dict → return the original cfg
|
||||||
|
if isinstance(domains_cfg, dict) and not domains_cfg:
|
||||||
|
result[app_id] = cfg
|
||||||
|
continue
|
||||||
|
|
||||||
|
# otherwise, compute aliases
|
||||||
|
aliases = parse_entry(domains_cfg, 'aliases', app_id) or []
|
||||||
|
default = default_domain(app_id, primary_domain)
|
||||||
|
has_aliases = 'aliases' in domains_cfg
|
||||||
|
has_canon = 'canonical' in domains_cfg
|
||||||
|
|
||||||
|
if has_aliases:
|
||||||
|
if default not in aliases:
|
||||||
|
aliases.append(default)
|
||||||
|
elif has_canon:
|
||||||
|
canon = canonical_map.get(app_id, [])
|
||||||
|
if default not in canon and default not in aliases:
|
||||||
|
aliases.append(default)
|
||||||
|
|
||||||
|
result[app_id] = aliases
|
||||||
|
|
||||||
|
return result
|
75
filter_plugins/canonical_domains_map.py
Normal file
75
filter_plugins/canonical_domains_map.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {'canonical_domains_map': self.canonical_domains_map}
|
||||||
|
|
||||||
|
def canonical_domains_map(self, apps, primary_domain):
|
||||||
|
def parse_entry(domains_cfg, key, app_id):
|
||||||
|
if key not in domains_cfg:
|
||||||
|
return None
|
||||||
|
entry = domains_cfg[key]
|
||||||
|
if isinstance(entry, dict):
|
||||||
|
values = list(entry.values())
|
||||||
|
elif isinstance(entry, list):
|
||||||
|
values = entry
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}"
|
||||||
|
)
|
||||||
|
for d in values:
|
||||||
|
if not isinstance(d, str) or not d.strip():
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}"
|
||||||
|
)
|
||||||
|
return values
|
||||||
|
|
||||||
|
result = {}
|
||||||
|
seen = {}
|
||||||
|
|
||||||
|
for app_id, cfg in apps.items():
|
||||||
|
domains_cfg = cfg.get('domains')
|
||||||
|
if not domains_cfg or 'canonical' not in domains_cfg:
|
||||||
|
default = f"{app_id}.{primary_domain}"
|
||||||
|
if default in seen:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Domain '{default}' is already configured for '{seen[default]}' and '{app_id}'"
|
||||||
|
)
|
||||||
|
seen[default] = app_id
|
||||||
|
result[app_id] = [default]
|
||||||
|
continue
|
||||||
|
|
||||||
|
entry = domains_cfg['canonical']
|
||||||
|
|
||||||
|
if isinstance(entry, dict):
|
||||||
|
for name, domain in entry.items():
|
||||||
|
if not isinstance(domain, str) or not domain.strip():
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}"
|
||||||
|
)
|
||||||
|
if domain in seen:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Domain '{domain}' is already configured for '{seen[domain]}' and '{app_id}'"
|
||||||
|
)
|
||||||
|
seen[domain] = app_id
|
||||||
|
result[app_id] = entry.copy()
|
||||||
|
|
||||||
|
elif isinstance(entry, list):
|
||||||
|
for domain in entry:
|
||||||
|
if not isinstance(domain, str) or not domain.strip():
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}"
|
||||||
|
)
|
||||||
|
if domain in seen:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Domain '{domain}' is already configured for '{seen[domain]}' and '{app_id}'"
|
||||||
|
)
|
||||||
|
seen[domain] = app_id
|
||||||
|
result[app_id] = list(entry)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Unexpected type for 'domains.canonical' in application '{app_id}': {type(entry).__name__}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
@ -1,184 +0,0 @@
|
|||||||
import re
|
|
||||||
from ansible.errors import AnsibleFilterError
|
|
||||||
|
|
||||||
class FilterModule(object):
|
|
||||||
"""
|
|
||||||
Ansible Filter Plugin for Domain Processing
|
|
||||||
|
|
||||||
This plugin provides filters to manage and transform domain configurations for applications:
|
|
||||||
|
|
||||||
- generate_all_domains(domains_dict, include_www=True):
|
|
||||||
Flattens nested domain values (string, list, or dict), optionally adds 'www.' prefixes,
|
|
||||||
removes duplicates, and returns a sorted list of unique domains.
|
|
||||||
|
|
||||||
- generate_base_sld_domains(domains_dict, redirect_mappings):
|
|
||||||
Flattens domains and redirect mappings, extracts second-level + top-level domains (SLDs),
|
|
||||||
deduplicates, and returns a sorted list of base domains.
|
|
||||||
|
|
||||||
- canonical_domains_map(apps, primary_domain):
|
|
||||||
Builds a mapping of application IDs to their canonical domains using
|
|
||||||
DomainUtils.canonical_list, enforcing uniqueness and detecting conflicts.
|
|
||||||
|
|
||||||
- alias_domains_map(apps, primary_domain):
|
|
||||||
Generates alias domains for each application via DomainUtils.alias_list,
|
|
||||||
based on their canonical domains and provided configurations.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def filters(self):
|
|
||||||
return {
|
|
||||||
'generate_all_domains': self.generate_all_domains,
|
|
||||||
'generate_base_sld_domains': self.generate_base_sld_domains,
|
|
||||||
'canonical_domains_map': self.canonical_domains_map,
|
|
||||||
'alias_domains_map': self.alias_domains_map,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def parse_entry(domains_cfg, key, app_id):
|
|
||||||
"""
|
|
||||||
Extract list of strings from domains_cfg[key], which may be dict or list.
|
|
||||||
Returns None if key not in domains_cfg.
|
|
||||||
Raises AnsibleFilterError on invalid type or empty/invalid values.
|
|
||||||
"""
|
|
||||||
if key not in domains_cfg:
|
|
||||||
return None
|
|
||||||
entry = domains_cfg[key]
|
|
||||||
if isinstance(entry, dict):
|
|
||||||
values = list(entry.values())
|
|
||||||
elif isinstance(entry, list):
|
|
||||||
values = entry
|
|
||||||
else:
|
|
||||||
raise AnsibleFilterError(
|
|
||||||
f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}"
|
|
||||||
)
|
|
||||||
for d in values:
|
|
||||||
if not isinstance(d, str) or not d.strip():
|
|
||||||
raise AnsibleFilterError(
|
|
||||||
f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}"
|
|
||||||
)
|
|
||||||
return values
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def default_domain(app_id, primary_domain):
|
|
||||||
"""
|
|
||||||
Returns the default domain string for an application.
|
|
||||||
"""
|
|
||||||
return f"{app_id}.{primary_domain}"
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def canonical_list(cls, domains_cfg, app_id, primary_domain):
|
|
||||||
"""
|
|
||||||
Returns the list of canonical domains: parsed entry or default.
|
|
||||||
"""
|
|
||||||
domains = cls.parse_entry(domains_cfg, 'canonical', app_id)
|
|
||||||
if domains is None:
|
|
||||||
return [cls.default_domain(app_id, primary_domain)]
|
|
||||||
return domains
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def alias_list(cls, domains_cfg, app_id, primary_domain, canonical_domains=None):
|
|
||||||
"""
|
|
||||||
Returns the list of alias domains based on:
|
|
||||||
- explicit aliases entry
|
|
||||||
- presence of canonical entry and default not in canonical
|
|
||||||
Always ensures default domain in aliases when appropriate.
|
|
||||||
"""
|
|
||||||
default = cls.default_domain(app_id, primary_domain)
|
|
||||||
aliases = cls.parse_entry(domains_cfg, 'aliases', app_id) or []
|
|
||||||
has_aliases = 'aliases' in domains_cfg
|
|
||||||
has_canonical = 'canonical' in domains_cfg
|
|
||||||
|
|
||||||
if has_aliases:
|
|
||||||
if default not in aliases:
|
|
||||||
aliases.append(default)
|
|
||||||
elif has_canonical:
|
|
||||||
# use provided canonical_domains if given otherwise parse
|
|
||||||
canon = canonical_domains if canonical_domains is not None else cls.parse_entry(domains_cfg, 'canonical', app_id)
|
|
||||||
if default not in (canon or []):
|
|
||||||
aliases.append(default)
|
|
||||||
# else: neither defined -> empty list
|
|
||||||
return aliases
|
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def generate_all_domains(domains_dict, include_www=True):
|
|
||||||
"""
|
|
||||||
Transform a dict of domains (values: str, list, dict) into a flat list,
|
|
||||||
optionally add 'www.' prefixes, dedupe and sort alphabetically.
|
|
||||||
|
|
||||||
Avoids infinite loops by snapshotting initial domain list for www prefixes.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
flat = FilterModule._flatten_domain_values(domains_dict)
|
|
||||||
if include_www:
|
|
||||||
# Snapshot original list to avoid extending while iterating
|
|
||||||
original = list(flat)
|
|
||||||
flat.extend([f"www.{d}" for d in original])
|
|
||||||
return sorted(set(flat))
|
|
||||||
except Exception as exc:
|
|
||||||
raise AnsibleFilterError(f"generate_all_domains failed: {exc}")
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def generate_base_sld_domains(domains_dict, redirect_mappings):
|
|
||||||
"""
|
|
||||||
Flatten domains_dict and redirect_mappings, extract second-level + top-level domains.
|
|
||||||
redirect_mappings: list of dicts with key 'source'
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
flat = FilterModule._flatten_domain_values(domains_dict)
|
|
||||||
for mapping in redirect_mappings or []:
|
|
||||||
src = mapping.get('source')
|
|
||||||
if isinstance(src, str):
|
|
||||||
flat.append(src)
|
|
||||||
elif isinstance(src, list):
|
|
||||||
flat.extend(src)
|
|
||||||
|
|
||||||
pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$')
|
|
||||||
slds = {m.group(1) for d in flat if (m := pattern.match(d))}
|
|
||||||
return sorted(slds)
|
|
||||||
except Exception as exc:
|
|
||||||
raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}")
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _flatten_domain_values(domains_dict):
|
|
||||||
"""
|
|
||||||
Helper to extract domain strings from dict values (str, list, dict).
|
|
||||||
"""
|
|
||||||
flat = []
|
|
||||||
for val in (domains_dict or {}).values():
|
|
||||||
if isinstance(val, str):
|
|
||||||
flat.append(val)
|
|
||||||
elif isinstance(val, list):
|
|
||||||
flat.extend(val)
|
|
||||||
elif isinstance(val, dict):
|
|
||||||
flat.extend(val.values())
|
|
||||||
return flat
|
|
||||||
|
|
||||||
def canonical_domains_map(self, apps, primary_domain):
|
|
||||||
result = {}
|
|
||||||
seen = {}
|
|
||||||
for app_id, app_cfg in apps.items():
|
|
||||||
domains_cfg = app_cfg.get('domains', {}) or {}
|
|
||||||
domains = self.canonical_list(domains_cfg, app_id, primary_domain)
|
|
||||||
for d in domains:
|
|
||||||
if d in seen:
|
|
||||||
raise AnsibleFilterError(
|
|
||||||
f"Domain '{d}' is configured for both '{seen[d]}' and '{app_id}'"
|
|
||||||
)
|
|
||||||
seen[d] = app_id
|
|
||||||
result[app_id] = domains
|
|
||||||
return result
|
|
||||||
|
|
||||||
def alias_domains_map(self, apps, primary_domain):
|
|
||||||
result = {}
|
|
||||||
# wir können die canonical_map vorab holen…
|
|
||||||
canonical_map = self.canonical_domains_map(apps, primary_domain)
|
|
||||||
for app_id, app_cfg in apps.items():
|
|
||||||
domains_cfg = app_cfg.get('domains', {}) or {}
|
|
||||||
aliases = self.alias_list(
|
|
||||||
domains_cfg,
|
|
||||||
app_id,
|
|
||||||
primary_domain,
|
|
||||||
canonical_domains=canonical_map.get(app_id),
|
|
||||||
)
|
|
||||||
result[app_id] = aliases
|
|
||||||
return result
|
|
31
filter_plugins/generate_all_domains.py
Normal file
31
filter_plugins/generate_all_domains.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {'generate_all_domains': self.generate_all_domains}
|
||||||
|
|
||||||
|
def generate_all_domains(self, domains_dict, include_www=True):
|
||||||
|
"""
|
||||||
|
Transform a dict of domains (values: str, list, dict) into a flat list,
|
||||||
|
optionally add 'www.' prefixes, dedupe and sort alphabetically.
|
||||||
|
"""
|
||||||
|
# lokaler Helfer zum Flatten
|
||||||
|
def _flatten(domains):
|
||||||
|
flat = []
|
||||||
|
for v in (domains or {}).values():
|
||||||
|
if isinstance(v, str):
|
||||||
|
flat.append(v)
|
||||||
|
elif isinstance(v, list):
|
||||||
|
flat.extend(v)
|
||||||
|
elif isinstance(v, dict):
|
||||||
|
flat.extend(v.values())
|
||||||
|
return flat
|
||||||
|
|
||||||
|
try:
|
||||||
|
flat = _flatten(domains_dict)
|
||||||
|
if include_www:
|
||||||
|
original = list(flat)
|
||||||
|
flat.extend([f"www.{d}" for d in original])
|
||||||
|
return sorted(set(flat))
|
||||||
|
except Exception as exc:
|
||||||
|
raise AnsibleFilterError(f"generate_all_domains failed: {exc}")
|
37
filter_plugins/generate_base_sld_domains.py
Normal file
37
filter_plugins/generate_base_sld_domains.py
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
import re
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {'generate_base_sld_domains': self.generate_base_sld_domains}
|
||||||
|
|
||||||
|
def generate_base_sld_domains(self, domains_dict, redirect_mappings):
|
||||||
|
"""
|
||||||
|
Flatten domains_dict und redirect_mappings, extrahiere SLDs (z.B. example.com),
|
||||||
|
dedupe und sortiere.
|
||||||
|
"""
|
||||||
|
def _flatten(domains):
|
||||||
|
flat = []
|
||||||
|
for v in (domains or {}).values():
|
||||||
|
if isinstance(v, str):
|
||||||
|
flat.append(v)
|
||||||
|
elif isinstance(v, list):
|
||||||
|
flat.extend(v)
|
||||||
|
elif isinstance(v, dict):
|
||||||
|
flat.extend(v.values())
|
||||||
|
return flat
|
||||||
|
|
||||||
|
try:
|
||||||
|
flat = _flatten(domains_dict)
|
||||||
|
for mapping in redirect_mappings or []:
|
||||||
|
src = mapping.get('source')
|
||||||
|
if isinstance(src, str):
|
||||||
|
flat.append(src)
|
||||||
|
elif isinstance(src, list):
|
||||||
|
flat.extend(src)
|
||||||
|
|
||||||
|
pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$')
|
||||||
|
slds = {m.group(1) for d in flat if (m := pattern.match(d))}
|
||||||
|
return sorted(slds)
|
||||||
|
except Exception as exc:
|
||||||
|
raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}")
|
@ -17,6 +17,13 @@
|
|||||||
set_fact:
|
set_fact:
|
||||||
domains: "{{ defaults_domains | combine(domains | default({}, true), recursive=True) }}"
|
domains: "{{ defaults_domains | combine(domains | default({}, true), recursive=True) }}"
|
||||||
|
|
||||||
|
- name: "Merged Variables"
|
||||||
|
# Add new merged variables here
|
||||||
|
debug:
|
||||||
|
msg:
|
||||||
|
domains: "{{ defaults_domains }}"
|
||||||
|
when: enable_debug | bool
|
||||||
|
|
||||||
- name: Merge redirect domain definitions into dictionary
|
- name: Merge redirect domain definitions into dictionary
|
||||||
set_fact:
|
set_fact:
|
||||||
combined_mapping: >-
|
combined_mapping: >-
|
||||||
|
@ -9,7 +9,7 @@ dir_path = os.path.abspath(
|
|||||||
sys.path.insert(0, dir_path)
|
sys.path.insert(0, dir_path)
|
||||||
|
|
||||||
from ansible.errors import AnsibleFilterError
|
from ansible.errors import AnsibleFilterError
|
||||||
from domain_filters import FilterModule
|
from alias_domains_map import FilterModule
|
||||||
|
|
||||||
class TestDomainFilters(unittest.TestCase):
|
class TestDomainFilters(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -17,45 +17,6 @@ class TestDomainFilters(unittest.TestCase):
|
|||||||
# Sample primary domain
|
# Sample primary domain
|
||||||
self.primary = 'example.com'
|
self.primary = 'example.com'
|
||||||
|
|
||||||
def test_canonical_empty_apps(self):
|
|
||||||
apps = {}
|
|
||||||
expected = {}
|
|
||||||
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
|
||||||
self.assertEqual(result, expected)
|
|
||||||
|
|
||||||
def test_canonical_without_domains(self):
|
|
||||||
apps = {'app1': {}}
|
|
||||||
expected = {'app1': ['app1.example.com']}
|
|
||||||
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
|
||||||
self.assertEqual(result, expected)
|
|
||||||
|
|
||||||
def test_canonical_with_list(self):
|
|
||||||
apps = {
|
|
||||||
'app1': {
|
|
||||||
'domains': {'canonical': ['foo.com', 'bar.com']}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
|
||||||
self.assertCountEqual(result['app1'], ['foo.com', 'bar.com'])
|
|
||||||
|
|
||||||
def test_canonical_with_dict(self):
|
|
||||||
apps = {
|
|
||||||
'app1': {
|
|
||||||
'domains': {'canonical': {'one': 'one.com', 'two': 'two.com'}}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
|
||||||
self.assertCountEqual(result['app1'], ['one.com', 'two.com'])
|
|
||||||
|
|
||||||
def test_canonical_duplicate_raises(self):
|
|
||||||
apps = {
|
|
||||||
'app1': {'domains': {'canonical': ['dup.com']}},
|
|
||||||
'app2': {'domains': {'canonical': ['dup.com']}},
|
|
||||||
}
|
|
||||||
with self.assertRaises(AnsibleFilterError) as cm:
|
|
||||||
self.filter_module.canonical_domains_map(apps, self.primary)
|
|
||||||
self.assertIn("configured for both", str(cm.exception))
|
|
||||||
|
|
||||||
def test_alias_empty_apps(self):
|
def test_alias_empty_apps(self):
|
||||||
apps = {}
|
apps = {}
|
||||||
expected = {}
|
expected = {}
|
||||||
@ -105,13 +66,6 @@ class TestDomainFilters(unittest.TestCase):
|
|||||||
result = self.filter_module.alias_domains_map(apps, self.primary)
|
result = self.filter_module.alias_domains_map(apps, self.primary)
|
||||||
self.assertEqual(result, expected)
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
def test_invalid_canonical_type(self):
|
|
||||||
apps = {
|
|
||||||
'app1': {'domains': {'canonical': 123}}
|
|
||||||
}
|
|
||||||
with self.assertRaises(AnsibleFilterError):
|
|
||||||
self.filter_module.canonical_domains_map(apps, self.primary)
|
|
||||||
|
|
||||||
def test_invalid_aliases_type(self):
|
def test_invalid_aliases_type(self):
|
||||||
apps = {
|
apps = {
|
||||||
'app1': {'domains': {'aliases': 123}}
|
'app1': {'domains': {'aliases': 123}}
|
||||||
@ -119,5 +73,31 @@ class TestDomainFilters(unittest.TestCase):
|
|||||||
with self.assertRaises(AnsibleFilterError):
|
with self.assertRaises(AnsibleFilterError):
|
||||||
self.filter_module.alias_domains_map(apps, self.primary)
|
self.filter_module.alias_domains_map(apps, self.primary)
|
||||||
|
|
||||||
|
def test_alias_with_empty_domains_cfg(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {
|
||||||
|
'domains': {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expected = apps
|
||||||
|
result = self.filter_module.alias_domains_map(apps, self.primary)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_alias_with_canonical_dict_not_default(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {
|
||||||
|
'domains': {
|
||||||
|
'canonical': {
|
||||||
|
'one': 'one.com',
|
||||||
|
'two': 'two.com'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expected = {'app1': ['app1.example.com']}
|
||||||
|
result = self.filter_module.alias_domains_map(apps, self.primary)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
@ -8,7 +8,7 @@ sys.path.insert(
|
|||||||
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
||||||
)
|
)
|
||||||
|
|
||||||
from domain_filters import FilterModule
|
from generate_all_domains import FilterModule
|
||||||
|
|
||||||
class TestGenerateAllDomains(unittest.TestCase):
|
class TestGenerateAllDomains(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -8,7 +8,7 @@ sys.path.insert(
|
|||||||
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
||||||
)
|
)
|
||||||
|
|
||||||
from domain_filters import FilterModule
|
from generate_base_sld_domains import FilterModule
|
||||||
|
|
||||||
class TestGenerateBaseSldDomains(unittest.TestCase):
|
class TestGenerateBaseSldDomains(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
74
tests/unit/test_domain_filters_canonical.py
Normal file
74
tests/unit/test_domain_filters_canonical.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
# Add the filter_plugins directory to the import path
|
||||||
|
dir_path = os.path.abspath(
|
||||||
|
os.path.join(os.path.dirname(__file__), '../../filter_plugins')
|
||||||
|
)
|
||||||
|
sys.path.insert(0, dir_path)
|
||||||
|
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
from canonical_domains_map import FilterModule
|
||||||
|
|
||||||
|
class TestDomainFilters(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.filter_module = FilterModule()
|
||||||
|
# Sample primary domain
|
||||||
|
self.primary = 'example.com'
|
||||||
|
|
||||||
|
def test_canonical_empty_apps(self):
|
||||||
|
apps = {}
|
||||||
|
expected = {}
|
||||||
|
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_canonical_without_domains(self):
|
||||||
|
apps = {'app1': {}}
|
||||||
|
expected = {'app1': ['app1.example.com']}
|
||||||
|
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_canonical_with_list(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {
|
||||||
|
'domains': {'canonical': ['foo.com', 'bar.com']}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
||||||
|
self.assertCountEqual(
|
||||||
|
result['app1'],
|
||||||
|
['foo.com', 'bar.com']
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_canonical_with_dict(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {
|
||||||
|
'domains': {'canonical': {'one': 'one.com', 'two': 'two.com'}}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = self.filter_module.canonical_domains_map(apps, self.primary)
|
||||||
|
self.assertEqual(
|
||||||
|
result['app1'],
|
||||||
|
{'one': 'one.com', 'two': 'two.com'}
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_canonical_duplicate_raises(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {'domains': {'canonical': ['dup.com']}},
|
||||||
|
'app2': {'domains': {'canonical': ['dup.com']}},
|
||||||
|
}
|
||||||
|
with self.assertRaises(AnsibleFilterError) as cm:
|
||||||
|
self.filter_module.canonical_domains_map(apps, self.primary)
|
||||||
|
# Updated to match new exception message
|
||||||
|
self.assertIn("already configured for", str(cm.exception))
|
||||||
|
|
||||||
|
def test_invalid_canonical_type(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {'domains': {'canonical': 123}}
|
||||||
|
}
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.filter_module.canonical_domains_map(apps, self.primary)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Loading…
x
Reference in New Issue
Block a user