Optimized code and solved bugs

This commit is contained in:
2025-05-20 04:19:10 +02:00
parent 2f1d6a5178
commit d5dd568994
21 changed files with 167 additions and 144 deletions

View File

@@ -1,8 +1,8 @@
from ansible.errors import AnsibleFilterError
import os
import sys
import yaml
class FilterModule(object):
def filters(self):
return {
@@ -15,48 +15,67 @@ class FilterModule(object):
1) directly in group_names, or
2) the application_id of any role reachable (recursively)
from any group in group_names via meta/dependencies.
Expects:
- applications: dict mapping application_id → config
- group_names: list of active role names
"""
# validate inputs
self._validate_inputs(applications, group_names)
roles_dir = self._get_roles_directory()
included_roles = self._collect_reachable_roles(group_names, roles_dir)
included_app_ids = self._gather_application_ids(included_roles, roles_dir)
return self._filter_applications(applications, group_names, included_app_ids)
def _validate_inputs(self, applications, group_names):
"""Validate the inputs for correct types."""
if not isinstance(applications, dict):
raise AnsibleFilterError(f"Expected applications as dict, got {type(applications).__name__}")
if not isinstance(group_names, (list, tuple)):
raise AnsibleFilterError(f"Expected group_names as list/tuple, got {type(group_names).__name__}")
# locate roles directory (assume plugin sits in filter_plugins/)
def _get_roles_directory(self):
"""Locate and return the roles directory."""
plugin_dir = os.path.dirname(__file__)
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
roles_dir = os.path.join(project_root, 'roles')
# recursively collect all roles reachable from the given groups
def collect_roles(role, seen):
if role in seen:
return
seen.add(role)
meta_file = os.path.join(roles_dir, role, 'meta', 'main.yml')
if not os.path.isfile(meta_file):
return
try:
with open(meta_file) as f:
meta = yaml.safe_load(f) or {}
except Exception:
return
for dep in meta.get('dependencies', []):
if isinstance(dep, str):
dep_name = dep
elif isinstance(dep, dict):
dep_name = dep.get('role') or dep.get('name')
else:
continue
collect_roles(dep_name, seen)
return os.path.join(project_root, 'roles')
def _collect_reachable_roles(self, group_names, roles_dir):
"""Recursively collect all roles reachable from the given groups via meta/dependencies."""
included_roles = set()
for grp in group_names:
collect_roles(grp, included_roles)
for group in group_names:
self._collect_roles_from_group(group, included_roles, roles_dir)
return included_roles
# gather application_ids from those roles
def _collect_roles_from_group(self, group, seen, roles_dir):
"""Recursively collect roles from a specific group."""
if group in seen:
return
seen.add(group)
meta_file = os.path.join(roles_dir, group, 'meta', 'main.yml')
if not os.path.isfile(meta_file):
return
try:
with open(meta_file) as f:
meta = yaml.safe_load(f) or {}
except Exception:
return
for dep in meta.get('dependencies', []):
dep_name = self._get_dependency_name(dep)
if dep_name:
self._collect_roles_from_group(dep_name, seen, roles_dir)
def _get_dependency_name(self, dependency):
"""Extract the dependency role name from the meta data."""
if isinstance(dependency, str):
return dependency
elif isinstance(dependency, dict):
return dependency.get('role') or dependency.get('name')
return None
def _gather_application_ids(self, included_roles, roles_dir):
"""Gather application_ids from the roles."""
included_app_ids = set()
for role in included_roles:
vars_file = os.path.join(roles_dir, role, 'vars', 'main.yml')
@@ -67,14 +86,17 @@ class FilterModule(object):
vars_data = yaml.safe_load(f) or {}
except Exception:
continue
app_id = vars_data.get('application_id')
if isinstance(app_id, str) and app_id:
included_app_ids.add(app_id)
# build filtered result: include any application whose key is in group_names or in included_app_ids
return included_app_ids
def _filter_applications(self, applications, group_names, included_app_ids):
"""Filter and return the applications that match the conditions."""
result = {}
for app_key, cfg in applications.items():
if app_key in group_names or app_key in included_app_ids:
result[app_key] = cfg
return result
return result

View File

@@ -5,71 +5,79 @@ class FilterModule(object):
return {'canonical_domains_map': self.canonical_domains_map}
def canonical_domains_map(self, apps, primary_domain):
def parse_entry(domains_cfg, key, app_id):
if key not in domains_cfg:
return None
entry = domains_cfg[key]
if isinstance(entry, dict):
values = list(entry.values())
elif isinstance(entry, list):
values = entry
else:
raise AnsibleFilterError(
f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}"
)
for d in values:
if not isinstance(d, str) or not d.strip():
raise AnsibleFilterError(
f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}"
)
return values
"""
Maps applications to their canonical domains, checking for conflicts
and ensuring all domains are valid and unique across applications.
"""
result = {}
seen = {}
seen_domains = {}
for app_id, cfg in apps.items():
domains_cfg = cfg.get('domains')
if not domains_cfg or 'canonical' not in domains_cfg:
default = f"{app_id}.{primary_domain}"
if default in seen:
raise AnsibleFilterError(
f"Domain '{default}' is already configured for '{seen[default]}' and '{app_id}'"
)
seen[default] = app_id
result[app_id] = [default]
self._add_default_domain(app_id, primary_domain, seen_domains, result)
continue
entry = domains_cfg['canonical']
if isinstance(entry, dict):
for name, domain in entry.items():
if not isinstance(domain, str) or not domain.strip():
raise AnsibleFilterError(
f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}"
)
if domain in seen:
raise AnsibleFilterError(
f"Domain '{domain}' is already configured for '{seen[domain]}' and '{app_id}'"
)
seen[domain] = app_id
result[app_id] = entry.copy()
elif isinstance(entry, list):
for domain in entry:
if not isinstance(domain, str) or not domain.strip():
raise AnsibleFilterError(
f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}"
)
if domain in seen:
raise AnsibleFilterError(
f"Domain '{domain}' is already configured for '{seen[domain]}' and '{app_id}'"
)
seen[domain] = app_id
result[app_id] = list(entry)
else:
raise AnsibleFilterError(
f"Unexpected type for 'domains.canonical' in application '{app_id}': {type(entry).__name__}"
)
canonical_domains = domains_cfg['canonical']
self._process_canonical_domains(app_id, canonical_domains, seen_domains, result)
return result
def _add_default_domain(self, app_id, primary_domain, seen_domains, result):
"""
Add the default domain for an application if no canonical domains are defined.
Ensures the domain is unique across applications.
"""
default_domain = f"{app_id}.{primary_domain}"
if default_domain in seen_domains:
raise AnsibleFilterError(
f"Domain '{default_domain}' is already configured for "
f"'{seen_domains[default_domain]}' and '{app_id}'"
)
seen_domains[default_domain] = app_id
result[app_id] = [default_domain]
def _process_canonical_domains(self, app_id, canonical_domains, seen_domains, result):
"""
Process the canonical domains for an application, handling both lists and dicts,
and ensuring each domain is unique.
"""
if isinstance(canonical_domains, dict):
self._process_canonical_domains_dict(app_id, canonical_domains, seen_domains, result)
elif isinstance(canonical_domains, list):
self._process_canonical_domains_list(app_id, canonical_domains, seen_domains, result)
else:
raise AnsibleFilterError(
f"Unexpected type for 'domains.canonical' in application '{app_id}': "
f"{type(canonical_domains).__name__}"
)
def _process_canonical_domains_dict(self, app_id, domains_dict, seen_domains, result):
"""
Process a dictionary of canonical domains for an application.
"""
for name, domain in domains_dict.items():
self._validate_and_check_domain(app_id, domain, seen_domains)
result[app_id] = domains_dict.copy()
def _process_canonical_domains_list(self, app_id, domains_list, seen_domains, result):
"""
Process a list of canonical domains for an application.
"""
for domain in domains_list:
self._validate_and_check_domain(app_id, domain, seen_domains)
result[app_id] = list(domains_list)
def _validate_and_check_domain(self, app_id, domain, seen_domains):
"""
Validate the domain and check if it has already been assigned to another application.
"""
if not isinstance(domain, str) or not domain.strip():
raise AnsibleFilterError(
f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}"
)
if domain in seen_domains:
raise AnsibleFilterError(
f"Domain '{domain}' is already configured for '{seen_domains[domain]}' and '{app_id}'"
)
seen_domains[domain] = app_id