In between commit domain restruturing

This commit is contained in:
2025-05-19 17:17:57 +02:00
parent cc3f5d75ea
commit 37dcc5f74e
63 changed files with 771 additions and 242 deletions

View File

@@ -1,6 +1,6 @@
def is_feature_enabled(applications, feature: str, application_id: str) -> bool:
def is_feature_enabled(applications: dict, feature: str, application_id: str) -> bool:
"""
Check if a generic feature is enabled for the given application.
Return True if applications[application_id].features[feature] is truthy.
"""
app = applications.get(application_id, {})
return bool(app.get('features', {}).get(feature, False))
@@ -31,4 +31,4 @@ class FilterModule(object):
return {
'is_feature_enabled': is_feature_enabled,
'get_docker_compose': get_docker_compose,
}
}

View File

@@ -12,9 +12,10 @@ class FilterModule(object):
'build_csp_header': self.build_csp_header,
}
def is_feature_enabled(self, applications, feature: str, application_id: str) -> bool:
@staticmethod
def is_feature_enabled(applications: dict, feature: str, application_id: str) -> bool:
"""
Check if a generic feature is enabled for the given application.
Return True if applications[application_id].features[feature] is truthy.
"""
app = applications.get(application_id, {})
return bool(app.get('features', {}).get(feature, False))
@@ -99,7 +100,7 @@ class FilterModule(object):
for directive in directives:
tokens = ["'self'"]
# unsafe-eval / unsafe-inline flags
flags = self.get_csp_flags(applications, application_id, directive)
tokens += flags

View File

@@ -3,16 +3,101 @@ from ansible.errors import AnsibleFilterError
class FilterModule(object):
"""
Custom Ansible filter plugin:
- generate_all_domains: Flatten, dedupe, sort domains with optional www prefixes
- generate_base_sld_domains: Extract unique sld.tld domains from values and redirect sources
Ansible Filter Plugin for Domain Processing
This plugin provides filters to manage and transform domain configurations for applications:
- generate_all_domains(domains_dict, include_www=True):
Flattens nested domain values (string, list, or dict), optionally adds 'www.' prefixes,
removes duplicates, and returns a sorted list of unique domains.
- generate_base_sld_domains(domains_dict, redirect_mappings):
Flattens domains and redirect mappings, extracts second-level + top-level domains (SLDs),
deduplicates, and returns a sorted list of base domains.
- canonical_domains_map(apps, primary_domain):
Builds a mapping of application IDs to their canonical domains using
DomainUtils.canonical_list, enforcing uniqueness and detecting conflicts.
- alias_domains_map(apps, primary_domain):
Generates alias domains for each application via DomainUtils.alias_list,
based on their canonical domains and provided configurations.
"""
def filters(self):
return {
'generate_all_domains': self.generate_all_domains,
'generate_base_sld_domains': self.generate_base_sld_domains,
'generate_all_domains': self.generate_all_domains,
'generate_base_sld_domains': self.generate_base_sld_domains,
'canonical_domains_map': self.canonical_domains_map,
'alias_domains_map': self.alias_domains_map,
}
@staticmethod
def parse_entry(domains_cfg, key, app_id):
"""
Extract list of strings from domains_cfg[key], which may be dict or list.
Returns None if key not in domains_cfg.
Raises AnsibleFilterError on invalid type or empty/invalid values.
"""
if key not in domains_cfg:
return None
entry = domains_cfg[key]
if isinstance(entry, dict):
values = list(entry.values())
elif isinstance(entry, list):
values = entry
else:
raise AnsibleFilterError(
f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}"
)
for d in values:
if not isinstance(d, str) or not d.strip():
raise AnsibleFilterError(
f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}"
)
return values
@staticmethod
def default_domain(app_id, primary_domain):
"""
Returns the default domain string for an application.
"""
return f"{app_id}.{primary_domain}"
@classmethod
def canonical_list(cls, domains_cfg, app_id, primary_domain):
"""
Returns the list of canonical domains: parsed entry or default.
"""
domains = cls.parse_entry(domains_cfg, 'canonical', app_id)
if domains is None:
return [cls.default_domain(app_id, primary_domain)]
return domains
@classmethod
def alias_list(cls, domains_cfg, app_id, primary_domain, canonical_domains=None):
"""
Returns the list of alias domains based on:
- explicit aliases entry
- presence of canonical entry and default not in canonical
Always ensures default domain in aliases when appropriate.
"""
default = cls.default_domain(app_id, primary_domain)
aliases = cls.parse_entry(domains_cfg, 'aliases', app_id) or []
has_aliases = 'aliases' in domains_cfg
has_canonical = 'canonical' in domains_cfg
if has_aliases:
if default not in aliases:
aliases.append(default)
elif has_canonical:
# use provided canonical_domains if given otherwise parse
canon = canonical_domains if canonical_domains is not None else cls.parse_entry(domains_cfg, 'canonical', app_id)
if default not in (canon or []):
aliases.append(default)
# else: neither defined -> empty list
return aliases
@staticmethod
def generate_all_domains(domains_dict, include_www=True):
@@ -67,3 +152,33 @@ class FilterModule(object):
elif isinstance(val, dict):
flat.extend(val.values())
return flat
def canonical_domains_map(self, apps, primary_domain):
result = {}
seen = {}
for app_id, app_cfg in apps.items():
domains_cfg = app_cfg.get('domains', {}) or {}
domains = self.canonical_list(domains_cfg, app_id, primary_domain)
for d in domains:
if d in seen:
raise AnsibleFilterError(
f"Domain '{d}' is configured for both '{seen[d]}' and '{app_id}'"
)
seen[d] = app_id
result[app_id] = domains
return result
def alias_domains_map(self, apps, primary_domain):
result = {}
# wir können die canonical_map vorab holen…
canonical_map = self.canonical_domains_map(apps, primary_domain)
for app_id, app_cfg in apps.items():
domains_cfg = app_cfg.get('domains', {}) or {}
aliases = self.alias_list(
domains_cfg,
app_id,
primary_domain,
canonical_domains=canonical_map.get(app_id),
)
result[app_id] = aliases
return result

View File

@@ -1,16 +1,9 @@
from ansible.errors import AnsibleFilterError
import sys
import os
import yaml
from ansible.errors import AnsibleFilterError
class FilterModule(object):
"""
Custom filters for conditional domain assignments, handling both direct group matches
and recursive role dependency resolution.
Determines if a given application_id (domain_key) should have its domain added by checking:
- If domain_key is explicitly listed in group_names, or
- If domain_key matches any application_id of roles reachable from active groups via dependencies.
"""
def filters(self):
return {
@@ -39,9 +32,27 @@ class FilterModule(object):
result[domain_key] = domain_value
return result
# Setup roles directory path
plugin_dir = os.path.dirname(__file__)
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
# Determine plugin directory based on filter plugin module if available
plugin_dir = None
for module in sys.modules.values():
fm = getattr(module, 'FilterModule', None)
if fm is not None:
try:
# Access staticmethod, compare underlying function
if getattr(fm, 'add_domain_if_group') is DomainFilterUtil.add_domain_if_group:
plugin_dir = os.path.dirname(module.__file__)
break
except Exception:
continue
if plugin_dir:
# The plugin_dir is the filter_plugins directory; project_root is one level up
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
else:
# Fallback: locate project root relative to this utility file
plugin_dir = os.path.dirname(__file__)
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
roles_dir = os.path.join(project_root, 'roles')
# Collect all roles reachable from the active groups
@@ -83,4 +94,4 @@ class FilterModule(object):
return result
except Exception as exc:
raise AnsibleFilterError(f"add_domain_if_group failed: {exc}")
raise AnsibleFilterError(f"add_domain_if_group failed: {exc}")