mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-06-25 11:45:32 +02:00
Optimized redirect function
This commit is contained in:
parent
1b50f73803
commit
03f3a31d21
94
filter_plugins/domain_redirect_mappings.py
Normal file
94
filter_plugins/domain_redirect_mappings.py
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {'domain_mappings': self.domain_mappings}
|
||||||
|
|
||||||
|
def domain_mappings(self, apps, primary_domain):
|
||||||
|
"""
|
||||||
|
Build a flat list of redirect mappings for all apps:
|
||||||
|
- source: each alias domain
|
||||||
|
- target: the first canonical domain (app.domains.canonical[0] or default)
|
||||||
|
|
||||||
|
Logic for computing aliases and canonicals is identical to alias_domains_map + canonical_domains_map.
|
||||||
|
"""
|
||||||
|
def parse_entry(domains_cfg, key, app_id):
|
||||||
|
if key not in domains_cfg:
|
||||||
|
return None
|
||||||
|
entry = domains_cfg[key]
|
||||||
|
if isinstance(entry, dict):
|
||||||
|
values = list(entry.values())
|
||||||
|
elif isinstance(entry, list):
|
||||||
|
values = entry
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}"
|
||||||
|
)
|
||||||
|
for d in values:
|
||||||
|
if not isinstance(d, str) or not d.strip():
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}"
|
||||||
|
)
|
||||||
|
return values
|
||||||
|
|
||||||
|
def default_domain(app_id, primary):
|
||||||
|
return f"{app_id}.{primary}"
|
||||||
|
|
||||||
|
# 1) Compute canonical domains per app (always as a list)
|
||||||
|
canonical_map = {}
|
||||||
|
for app_id, cfg in apps.items():
|
||||||
|
domains_cfg = cfg.get('domains') or {}
|
||||||
|
entry = domains_cfg.get('canonical')
|
||||||
|
if entry is None:
|
||||||
|
canonical_map[app_id] = [default_domain(app_id, primary_domain)]
|
||||||
|
elif isinstance(entry, dict):
|
||||||
|
canonical_map[app_id] = list(entry.values())
|
||||||
|
elif isinstance(entry, list):
|
||||||
|
canonical_map[app_id] = list(entry)
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Unexpected type for 'domains.canonical' in application '{app_id}': {type(entry).__name__}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2) Compute alias domains per app
|
||||||
|
alias_map = {}
|
||||||
|
for app_id, cfg in apps.items():
|
||||||
|
domains_cfg = cfg.get('domains')
|
||||||
|
if domains_cfg is None:
|
||||||
|
# no domains key → no aliases
|
||||||
|
alias_map[app_id] = []
|
||||||
|
continue
|
||||||
|
|
||||||
|
if isinstance(domains_cfg, dict) and not domains_cfg:
|
||||||
|
# empty domains dict → only default
|
||||||
|
alias_map[app_id] = [default_domain(app_id, primary_domain)]
|
||||||
|
continue
|
||||||
|
|
||||||
|
aliases = parse_entry(domains_cfg, 'aliases', app_id) or []
|
||||||
|
default = default_domain(app_id, primary_domain)
|
||||||
|
has_aliases = 'aliases' in domains_cfg
|
||||||
|
has_canonical = 'canonical' in domains_cfg
|
||||||
|
|
||||||
|
if has_aliases:
|
||||||
|
if default not in aliases:
|
||||||
|
aliases.append(default)
|
||||||
|
elif has_canonical:
|
||||||
|
canon = canonical_map.get(app_id, [])
|
||||||
|
if default not in canon and default not in aliases:
|
||||||
|
aliases.append(default)
|
||||||
|
|
||||||
|
alias_map[app_id] = aliases
|
||||||
|
|
||||||
|
# 3) Build flat list of {source, target} entries
|
||||||
|
mappings = []
|
||||||
|
for app_id, sources in alias_map.items():
|
||||||
|
# pick first canonical domain as target
|
||||||
|
canon_list = canonical_map.get(app_id, [])
|
||||||
|
target = canon_list[0] if canon_list else default_domain(app_id, primary_domain)
|
||||||
|
for src in sources:
|
||||||
|
mappings.append({
|
||||||
|
'source': src,
|
||||||
|
'target': target
|
||||||
|
})
|
||||||
|
|
||||||
|
return mappings
|
@ -1,10 +1,6 @@
|
|||||||
defaults_domains: "{{ defaults_applications | canonical_domains_map(primary_domain) }}"
|
defaults_domains: "{{ defaults_applications | canonical_domains_map(primary_domain) }}"
|
||||||
|
|
||||||
defaults_redirect_domain_mappings: >-
|
defaults_redirect_domain_mappings: "{{ applications | domain_mappings(primary_domain) }}"
|
||||||
{{ []
|
|
||||||
| add_redirect_if_group('lam', domains.ldap, domains.lam, group_names)
|
|
||||||
| add_redirect_if_group('phpmyldapadmin', domains.ldap, domains.phpmyldapadmin,group_names)
|
|
||||||
}}
|
|
||||||
|
|
||||||
# Domains which are deprecated and should be cleaned up
|
# Domains which are deprecated and should be cleaned up
|
||||||
deprecated_domains: []
|
deprecated_domains: []
|
@ -1,12 +1,10 @@
|
|||||||
version: "latest"
|
version: "latest"
|
||||||
credentials:
|
credentials:
|
||||||
# database_password: Password for the database
|
|
||||||
|
|
||||||
features:
|
features:
|
||||||
matomo: true
|
matomo: true
|
||||||
css: true
|
css: true
|
||||||
portfolio_iframe: false
|
portfolio_iframe: false
|
||||||
central_database: true
|
central_database: true
|
||||||
|
|
||||||
domains:
|
domains:
|
||||||
canonical:
|
canonical:
|
||||||
|
@ -17,13 +17,6 @@
|
|||||||
set_fact:
|
set_fact:
|
||||||
domains: "{{ defaults_domains | combine(domains | default({}, true), recursive=True) }}"
|
domains: "{{ defaults_domains | combine(domains | default({}, true), recursive=True) }}"
|
||||||
|
|
||||||
- name: "Merged Variables"
|
|
||||||
# Add new merged variables here
|
|
||||||
debug:
|
|
||||||
msg:
|
|
||||||
domains: "{{ defaults_domains }}"
|
|
||||||
when: enable_debug | bool
|
|
||||||
|
|
||||||
- name: Merge redirect domain definitions into dictionary
|
- name: Merge redirect domain definitions into dictionary
|
||||||
set_fact:
|
set_fact:
|
||||||
combined_mapping: >-
|
combined_mapping: >-
|
||||||
|
105
tests/unit/test_domain_mappings.py
Normal file
105
tests/unit/test_domain_mappings.py
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
# Add the filter_plugins directory to the import path
|
||||||
|
dir_path = os.path.abspath(
|
||||||
|
os.path.join(os.path.dirname(__file__), '../../filter_plugins')
|
||||||
|
)
|
||||||
|
sys.path.insert(0, dir_path)
|
||||||
|
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
from domain_redirect_mappings import FilterModule
|
||||||
|
|
||||||
|
class TestDomainMappings(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.filter = FilterModule()
|
||||||
|
self.primary = 'example.com'
|
||||||
|
|
||||||
|
def test_empty_apps(self):
|
||||||
|
apps = {}
|
||||||
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
self.assertEqual(result, [])
|
||||||
|
|
||||||
|
def test_app_without_domains(self):
|
||||||
|
apps = {'app1': {}}
|
||||||
|
# no domains key → no mappings
|
||||||
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
self.assertEqual(result, [])
|
||||||
|
|
||||||
|
def test_empty_domains_cfg(self):
|
||||||
|
apps = {'app1': {'domains': {}}}
|
||||||
|
default = 'app1.example.com'
|
||||||
|
expected = [
|
||||||
|
{'source': default, 'target': default}
|
||||||
|
]
|
||||||
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_explicit_aliases(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {
|
||||||
|
'domains': {'aliases': ['alias.com']}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default = 'app1.example.com'
|
||||||
|
expected = [
|
||||||
|
{'source': 'alias.com', 'target': default},
|
||||||
|
{'source': default, 'target': default},
|
||||||
|
]
|
||||||
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
# order not important
|
||||||
|
self.assertCountEqual(result, expected)
|
||||||
|
|
||||||
|
def test_canonical_not_default(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {
|
||||||
|
'domains': {'canonical': ['foo.com']}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expected = [
|
||||||
|
{'source': 'app1.example.com', 'target': 'foo.com'}
|
||||||
|
]
|
||||||
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_canonical_dict(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {
|
||||||
|
'domains': {
|
||||||
|
'canonical': {'one': 'one.com', 'two': 'two.com'}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
# first canonical key 'one' → one.com
|
||||||
|
expected = [
|
||||||
|
{'source': 'app1.example.com', 'target': 'one.com'}
|
||||||
|
]
|
||||||
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_multiple_apps(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {'domains': {'aliases': ['a1.com']}},
|
||||||
|
'app2': {'domains': {'canonical': ['c2.com']}},
|
||||||
|
}
|
||||||
|
expected = [
|
||||||
|
# app1
|
||||||
|
{'source': 'a1.com', 'target': 'app1.example.com'},
|
||||||
|
{'source': 'app1.example.com', 'target': 'app1.example.com'},
|
||||||
|
# app2
|
||||||
|
{'source': 'app2.example.com', 'target': 'c2.com'},
|
||||||
|
]
|
||||||
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
self.assertCountEqual(result, expected)
|
||||||
|
|
||||||
|
def test_invalid_aliases_type(self):
|
||||||
|
apps = {
|
||||||
|
'app1': {'domains': {'aliases': 123}}
|
||||||
|
}
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.filter.domain_mappings(apps, self.primary)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Loading…
x
Reference in New Issue
Block a user