mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-06-25 11:45:32 +02:00
Compare commits
4 Commits
03f3a31d21
...
3c7825fd23
Author | SHA1 | Date | |
---|---|---|---|
3c7825fd23 | |||
865f3577d4 | |||
f748f9cef1 | |||
efe994a4c5 |
@ -3,6 +3,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import os
|
import os
|
||||||
import yaml
|
import yaml
|
||||||
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
def load_yaml_file(path):
|
def load_yaml_file(path):
|
||||||
@ -36,7 +37,13 @@ def main():
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
vars_data = load_yaml_file(vars_main)
|
vars_data = load_yaml_file(vars_main)
|
||||||
|
try:
|
||||||
application_id = vars_data.get("application_id")
|
application_id = vars_data.get("application_id")
|
||||||
|
except Exception as e:
|
||||||
|
# print the exception message
|
||||||
|
print(f"Warning: failed to read application_id from {vars_data} in {vars_main}.\nException: {e}", file=sys.stderr)
|
||||||
|
# exit with status 0
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
if not application_id:
|
if not application_id:
|
||||||
print(f"[!] Skipping {role_name}: application_id not defined in vars/main.yml")
|
print(f"[!] Skipping {role_name}: application_id not defined in vars/main.yml")
|
||||||
|
80
filter_plugins/applications_if_group_and_deps.py
Normal file
80
filter_plugins/applications_if_group_and_deps.py
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {
|
||||||
|
'applications_if_group_and_deps': self.applications_if_group_and_deps,
|
||||||
|
}
|
||||||
|
|
||||||
|
def applications_if_group_and_deps(self, applications, group_names):
|
||||||
|
"""
|
||||||
|
Return only those applications whose key is either:
|
||||||
|
1) directly in group_names, or
|
||||||
|
2) the application_id of any role reachable (recursively)
|
||||||
|
from any group in group_names via meta/dependencies.
|
||||||
|
Expects:
|
||||||
|
- applications: dict mapping application_id → config
|
||||||
|
- group_names: list of active role names
|
||||||
|
"""
|
||||||
|
# validate inputs
|
||||||
|
if not isinstance(applications, dict):
|
||||||
|
raise AnsibleFilterError(f"Expected applications as dict, got {type(applications).__name__}")
|
||||||
|
if not isinstance(group_names, (list, tuple)):
|
||||||
|
raise AnsibleFilterError(f"Expected group_names as list/tuple, got {type(group_names).__name__}")
|
||||||
|
|
||||||
|
# locate roles directory (assume plugin sits in filter_plugins/)
|
||||||
|
plugin_dir = os.path.dirname(__file__)
|
||||||
|
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
|
||||||
|
roles_dir = os.path.join(project_root, 'roles')
|
||||||
|
|
||||||
|
# recursively collect all roles reachable from the given groups
|
||||||
|
def collect_roles(role, seen):
|
||||||
|
if role in seen:
|
||||||
|
return
|
||||||
|
seen.add(role)
|
||||||
|
meta_file = os.path.join(roles_dir, role, 'meta', 'main.yml')
|
||||||
|
if not os.path.isfile(meta_file):
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
with open(meta_file) as f:
|
||||||
|
meta = yaml.safe_load(f) or {}
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
for dep in meta.get('dependencies', []):
|
||||||
|
if isinstance(dep, str):
|
||||||
|
dep_name = dep
|
||||||
|
elif isinstance(dep, dict):
|
||||||
|
dep_name = dep.get('role') or dep.get('name')
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
collect_roles(dep_name, seen)
|
||||||
|
|
||||||
|
included_roles = set()
|
||||||
|
for grp in group_names:
|
||||||
|
collect_roles(grp, included_roles)
|
||||||
|
|
||||||
|
# gather application_ids from those roles
|
||||||
|
included_app_ids = set()
|
||||||
|
for role in included_roles:
|
||||||
|
vars_file = os.path.join(roles_dir, role, 'vars', 'main.yml')
|
||||||
|
if not os.path.isfile(vars_file):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
with open(vars_file) as f:
|
||||||
|
vars_data = yaml.safe_load(f) or {}
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
app_id = vars_data.get('application_id')
|
||||||
|
if isinstance(app_id, str) and app_id:
|
||||||
|
included_app_ids.add(app_id)
|
||||||
|
|
||||||
|
# build filtered result: include any application whose key is in group_names or in included_app_ids
|
||||||
|
result = {}
|
||||||
|
for app_key, cfg in applications.items():
|
||||||
|
if app_key in group_names or app_key in included_app_ids:
|
||||||
|
result[app_key] = cfg
|
||||||
|
|
||||||
|
return result
|
@ -126,7 +126,7 @@ class FilterModule(object):
|
|||||||
self.is_feature_enabled(applications, 'portfolio_iframe', application_id)
|
self.is_feature_enabled(applications, 'portfolio_iframe', application_id)
|
||||||
and directive == 'frame-ancestors'
|
and directive == 'frame-ancestors'
|
||||||
):
|
):
|
||||||
domain = domains.get(application_id) # e.g. "sub.example.com" or "example.com"
|
domain = domains.get('portfolio')[0] # e.g. "sub.example.com" or "example.com"
|
||||||
# Extract the second-level + top-level domain and prefix with "*."
|
# Extract the second-level + top-level domain and prefix with "*."
|
||||||
sld_tld = ".".join(domain.split(".")[-2:]) # yields "example.com"
|
sld_tld = ".".join(domain.split(".")[-2:]) # yields "example.com"
|
||||||
tokens.append(f"*.{sld_tld}") # yields "*.example.com"
|
tokens.append(f"*.{sld_tld}") # yields "*.example.com"
|
||||||
|
@ -8,9 +8,8 @@ class FilterModule(object):
|
|||||||
"""
|
"""
|
||||||
Build a flat list of redirect mappings for all apps:
|
Build a flat list of redirect mappings for all apps:
|
||||||
- source: each alias domain
|
- source: each alias domain
|
||||||
- target: the first canonical domain (app.domains.canonical[0] or default)
|
- target: the first canonical domain
|
||||||
|
Skip mappings where source == target, since they make no sense.
|
||||||
Logic for computing aliases and canonicals is identical to alias_domains_map + canonical_domains_map.
|
|
||||||
"""
|
"""
|
||||||
def parse_entry(domains_cfg, key, app_id):
|
def parse_entry(domains_cfg, key, app_id):
|
||||||
if key not in domains_cfg:
|
if key not in domains_cfg:
|
||||||
@ -55,12 +54,9 @@ class FilterModule(object):
|
|||||||
for app_id, cfg in apps.items():
|
for app_id, cfg in apps.items():
|
||||||
domains_cfg = cfg.get('domains')
|
domains_cfg = cfg.get('domains')
|
||||||
if domains_cfg is None:
|
if domains_cfg is None:
|
||||||
# no domains key → no aliases
|
|
||||||
alias_map[app_id] = []
|
alias_map[app_id] = []
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if isinstance(domains_cfg, dict) and not domains_cfg:
|
if isinstance(domains_cfg, dict) and not domains_cfg:
|
||||||
# empty domains dict → only default
|
|
||||||
alias_map[app_id] = [default_domain(app_id, primary_domain)]
|
alias_map[app_id] = [default_domain(app_id, primary_domain)]
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -79,13 +75,16 @@ class FilterModule(object):
|
|||||||
|
|
||||||
alias_map[app_id] = aliases
|
alias_map[app_id] = aliases
|
||||||
|
|
||||||
# 3) Build flat list of {source, target} entries
|
# 3) Build flat list of {source, target} entries,
|
||||||
|
# skipping self-mappings
|
||||||
mappings = []
|
mappings = []
|
||||||
for app_id, sources in alias_map.items():
|
for app_id, sources in alias_map.items():
|
||||||
# pick first canonical domain as target
|
|
||||||
canon_list = canonical_map.get(app_id, [])
|
canon_list = canonical_map.get(app_id, [])
|
||||||
target = canon_list[0] if canon_list else default_domain(app_id, primary_domain)
|
target = canon_list[0] if canon_list else default_domain(app_id, primary_domain)
|
||||||
for src in sources:
|
for src in sources:
|
||||||
|
if src == target:
|
||||||
|
# skip self-redirects
|
||||||
|
continue
|
||||||
mappings.append({
|
mappings.append({
|
||||||
'source': src,
|
'source': src,
|
||||||
'target': target
|
'target': target
|
||||||
|
@ -5,33 +5,40 @@ class FilterModule(object):
|
|||||||
def filters(self):
|
def filters(self):
|
||||||
return {'generate_base_sld_domains': self.generate_base_sld_domains}
|
return {'generate_base_sld_domains': self.generate_base_sld_domains}
|
||||||
|
|
||||||
def generate_base_sld_domains(self, domains_dict, redirect_mappings):
|
def generate_base_sld_domains(self, domains_list):
|
||||||
"""
|
"""
|
||||||
Flatten domains_dict und redirect_mappings, extrahiere SLDs (z.B. example.com),
|
Given a list of hostnames, extract the second-level domain (SLD.TLD) for any hostname
|
||||||
dedupe und sortiere.
|
with two or more labels, return single-label hostnames as-is, and reject IPs,
|
||||||
|
empty or malformed strings, and non-strings. Deduplicate and sort.
|
||||||
"""
|
"""
|
||||||
def _flatten(domains):
|
if not isinstance(domains_list, list):
|
||||||
flat = []
|
raise AnsibleFilterError(
|
||||||
for v in (domains or {}).values():
|
f"generate_base_sld_domains expected a list, got {type(domains_list).__name__}"
|
||||||
if isinstance(v, str):
|
)
|
||||||
flat.append(v)
|
|
||||||
elif isinstance(v, list):
|
|
||||||
flat.extend(v)
|
|
||||||
elif isinstance(v, dict):
|
|
||||||
flat.extend(v.values())
|
|
||||||
return flat
|
|
||||||
|
|
||||||
try:
|
ip_pattern = re.compile(r'^\d{1,3}(?:\.\d{1,3}){3}$')
|
||||||
flat = _flatten(domains_dict)
|
results = set()
|
||||||
for mapping in redirect_mappings or []:
|
|
||||||
src = mapping.get('source')
|
|
||||||
if isinstance(src, str):
|
|
||||||
flat.append(src)
|
|
||||||
elif isinstance(src, list):
|
|
||||||
flat.extend(src)
|
|
||||||
|
|
||||||
pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$')
|
for hostname in domains_list:
|
||||||
slds = {m.group(1) for d in flat if (m := pattern.match(d))}
|
# type check
|
||||||
return sorted(slds)
|
if not isinstance(hostname, str):
|
||||||
except Exception as exc:
|
raise AnsibleFilterError(f"Invalid domain entry (not a string): {hostname!r}")
|
||||||
raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}")
|
|
||||||
|
# malformed or empty
|
||||||
|
if not hostname or hostname.startswith('.') or hostname.endswith('.') or '..' in hostname:
|
||||||
|
raise AnsibleFilterError(f"Invalid domain entry (malformed): {hostname!r}")
|
||||||
|
|
||||||
|
# IP addresses disallowed
|
||||||
|
if ip_pattern.match(hostname):
|
||||||
|
raise AnsibleFilterError(f"IP addresses not allowed: {hostname!r}")
|
||||||
|
|
||||||
|
# single-label hostnames
|
||||||
|
labels = hostname.split('.')
|
||||||
|
if len(labels) == 1:
|
||||||
|
results.add(hostname)
|
||||||
|
else:
|
||||||
|
# always keep only the last two labels (SLD.TLD)
|
||||||
|
sld = ".".join(labels[-2:])
|
||||||
|
results.add(sld)
|
||||||
|
|
||||||
|
return sorted(results)
|
@ -1,97 +0,0 @@
|
|||||||
from ansible.errors import AnsibleFilterError
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
class FilterModule(object):
|
|
||||||
|
|
||||||
def filters(self):
|
|
||||||
return {
|
|
||||||
"add_domain_if_group": self.add_domain_if_group,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def add_domain_if_group(domains_dict, domain_key, domain_value, group_names):
|
|
||||||
"""
|
|
||||||
Add {domain_key: domain_value} to domains_dict if either:
|
|
||||||
1) domain_key is in group_names (direct inclusion), or
|
|
||||||
2) domain_key is among collected application_id values of roles
|
|
||||||
reachable from any group in group_names via recursive dependencies.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
domains_dict: existing dict of domains
|
|
||||||
domain_key: name of the application to check
|
|
||||||
domain_value: domain or dict/list of domains to assign
|
|
||||||
group_names: list of active group (role/application) names
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
result = dict(domains_dict)
|
|
||||||
|
|
||||||
# Direct group match: if the application name itself is in group_names
|
|
||||||
if domain_key in group_names:
|
|
||||||
result[domain_key] = domain_value
|
|
||||||
return result
|
|
||||||
|
|
||||||
# Determine plugin directory based on filter plugin module if available
|
|
||||||
plugin_dir = None
|
|
||||||
for module in sys.modules.values():
|
|
||||||
fm = getattr(module, 'FilterModule', None)
|
|
||||||
if fm is not None:
|
|
||||||
try:
|
|
||||||
# Access staticmethod, compare underlying function
|
|
||||||
if getattr(fm, 'add_domain_if_group') is DomainFilterUtil.add_domain_if_group:
|
|
||||||
plugin_dir = os.path.dirname(module.__file__)
|
|
||||||
break
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if plugin_dir:
|
|
||||||
# The plugin_dir is the filter_plugins directory; project_root is one level up
|
|
||||||
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
|
|
||||||
else:
|
|
||||||
# Fallback: locate project root relative to this utility file
|
|
||||||
plugin_dir = os.path.dirname(__file__)
|
|
||||||
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
|
|
||||||
|
|
||||||
roles_dir = os.path.join(project_root, 'roles')
|
|
||||||
|
|
||||||
# Collect all roles reachable from the active groups
|
|
||||||
def collect_roles(role_name, collected):
|
|
||||||
if role_name in collected:
|
|
||||||
return
|
|
||||||
collected.add(role_name)
|
|
||||||
meta_path = os.path.join(roles_dir, role_name, 'meta', 'main.yml')
|
|
||||||
if os.path.isfile(meta_path):
|
|
||||||
with open(meta_path) as f:
|
|
||||||
meta = yaml.safe_load(f) or {}
|
|
||||||
for dep in meta.get('dependencies', []):
|
|
||||||
if isinstance(dep, str):
|
|
||||||
dep_name = dep
|
|
||||||
elif isinstance(dep, dict):
|
|
||||||
dep_name = dep.get('role') or dep.get('name')
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
collect_roles(dep_name, collected)
|
|
||||||
|
|
||||||
included_roles = set()
|
|
||||||
for grp in group_names:
|
|
||||||
collect_roles(grp, included_roles)
|
|
||||||
|
|
||||||
# Gather application_ids from each included role
|
|
||||||
app_ids = set()
|
|
||||||
for role in included_roles:
|
|
||||||
vars_main = os.path.join(roles_dir, role, 'vars', 'main.yml')
|
|
||||||
if os.path.isfile(vars_main):
|
|
||||||
with open(vars_main) as f:
|
|
||||||
vars_data = yaml.safe_load(f) or {}
|
|
||||||
app_id = vars_data.get('application_id')
|
|
||||||
if app_id:
|
|
||||||
app_ids.add(app_id)
|
|
||||||
|
|
||||||
# Indirect inclusion: match by application_id
|
|
||||||
if domain_key in app_ids:
|
|
||||||
result[domain_key] = domain_value
|
|
||||||
|
|
||||||
return result
|
|
||||||
except Exception as exc:
|
|
||||||
raise AnsibleFilterError(f"add_domain_if_group failed: {exc}")
|
|
122
filter_plugins/load_configuration.py
Normal file
122
filter_plugins/load_configuration.py
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
import os
|
||||||
|
import yaml
|
||||||
|
import re
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
# in-memory cache: application_id → (parsed_yaml, is_nested)
|
||||||
|
_cfg_cache = {}
|
||||||
|
|
||||||
|
def load_configuration(application_id, key):
|
||||||
|
if not isinstance(key, str):
|
||||||
|
raise AnsibleFilterError("Key must be a dotted-string, e.g. 'features.matomo'")
|
||||||
|
|
||||||
|
# locate roles/
|
||||||
|
here = os.path.dirname(__file__)
|
||||||
|
root = os.path.abspath(os.path.join(here, '..'))
|
||||||
|
roles_dir = os.path.join(root, 'roles')
|
||||||
|
if not os.path.isdir(roles_dir):
|
||||||
|
raise AnsibleFilterError(f"Roles directory not found at {roles_dir}")
|
||||||
|
|
||||||
|
# first time? load & cache
|
||||||
|
if application_id not in _cfg_cache:
|
||||||
|
config_path = None
|
||||||
|
|
||||||
|
# 1) primary: vars/main.yml declares it
|
||||||
|
for role in os.listdir(roles_dir):
|
||||||
|
mv = os.path.join(roles_dir, role, 'vars', 'main.yml')
|
||||||
|
if os.path.exists(mv):
|
||||||
|
try:
|
||||||
|
md = yaml.safe_load(open(mv)) or {}
|
||||||
|
except Exception:
|
||||||
|
md = {}
|
||||||
|
if md.get('application_id') == application_id:
|
||||||
|
cf = os.path.join(roles_dir, role, 'vars', 'configuration.yml')
|
||||||
|
if not os.path.exists(cf):
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Role '{role}' declares '{application_id}' but missing configuration.yml"
|
||||||
|
)
|
||||||
|
config_path = cf
|
||||||
|
break
|
||||||
|
|
||||||
|
# 2) fallback nested
|
||||||
|
if config_path is None:
|
||||||
|
for role in os.listdir(roles_dir):
|
||||||
|
cf = os.path.join(roles_dir, role, 'vars', 'configuration.yml')
|
||||||
|
if not os.path.exists(cf):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
dd = yaml.safe_load(open(cf)) or {}
|
||||||
|
except Exception:
|
||||||
|
dd = {}
|
||||||
|
if isinstance(dd, dict) and application_id in dd:
|
||||||
|
config_path = cf
|
||||||
|
break
|
||||||
|
|
||||||
|
# 3) fallback flat
|
||||||
|
if config_path is None:
|
||||||
|
for role in os.listdir(roles_dir):
|
||||||
|
cf = os.path.join(roles_dir, role, 'vars', 'configuration.yml')
|
||||||
|
if not os.path.exists(cf):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
dd = yaml.safe_load(open(cf)) or {}
|
||||||
|
except Exception:
|
||||||
|
dd = {}
|
||||||
|
# flat style: dict with all non-dict values
|
||||||
|
if isinstance(dd, dict) and not any(isinstance(v, dict) for v in dd.values()):
|
||||||
|
config_path = cf
|
||||||
|
break
|
||||||
|
|
||||||
|
if config_path is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# parse once
|
||||||
|
try:
|
||||||
|
parsed = yaml.safe_load(open(config_path)) or {}
|
||||||
|
except Exception as e:
|
||||||
|
raise AnsibleFilterError(f"Error loading configuration.yml at {config_path}: {e}")
|
||||||
|
|
||||||
|
# detect nested vs flat
|
||||||
|
is_nested = isinstance(parsed, dict) and (application_id in parsed)
|
||||||
|
_cfg_cache[application_id] = (parsed, is_nested)
|
||||||
|
|
||||||
|
parsed, is_nested = _cfg_cache[application_id]
|
||||||
|
|
||||||
|
# pick base entry
|
||||||
|
entry = parsed[application_id] if is_nested else parsed
|
||||||
|
|
||||||
|
# resolve dotted key
|
||||||
|
key_parts = key.split('.')
|
||||||
|
for part in key_parts:
|
||||||
|
# Check if part has an index (e.g., domains.canonical[0])
|
||||||
|
match = re.match(r'([^\[]+)\[([0-9]+)\]', part)
|
||||||
|
if match:
|
||||||
|
part, index = match.groups()
|
||||||
|
index = int(index)
|
||||||
|
if isinstance(entry, dict) and part in entry:
|
||||||
|
entry = entry[part]
|
||||||
|
# Check if entry is a list and access the index
|
||||||
|
if isinstance(entry, list) and 0 <= index < len(entry):
|
||||||
|
entry = entry[index]
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Index '{index}' out of range for key '{part}' in application '{application_id}'"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Key '{part}' not found under application '{application_id}'"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if isinstance(entry, dict) and part in entry:
|
||||||
|
entry = entry[part]
|
||||||
|
else:
|
||||||
|
raise AnsibleFilterError(
|
||||||
|
f"Key '{part}' not found under application '{application_id}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
return entry
|
||||||
|
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {'load_configuration': load_configuration}
|
42
filter_plugins/merge_mapping.py
Normal file
42
filter_plugins/merge_mapping.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
# filter_plugins/merge_mapping.py
|
||||||
|
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
def merge_mapping(list1, list2, key_name='source'):
|
||||||
|
"""
|
||||||
|
Merge two lists of dicts on a given key.
|
||||||
|
- list1, list2: each must be a List[Dict]
|
||||||
|
- key_name: the field to match on
|
||||||
|
If both lists contain an item with the same key_name value,
|
||||||
|
their dictionaries are merged (fields from list2 overwrite or add to list1).
|
||||||
|
"""
|
||||||
|
if not isinstance(list1, list) or not isinstance(list2, list):
|
||||||
|
raise AnsibleFilterError("merge_mapping expects two lists")
|
||||||
|
|
||||||
|
merged = {}
|
||||||
|
# First, copy items from list1
|
||||||
|
for item in list1:
|
||||||
|
if key_name not in item:
|
||||||
|
raise AnsibleFilterError(f"Item {item} is missing the key '{key_name}'")
|
||||||
|
merged[item[key_name]] = item.copy()
|
||||||
|
|
||||||
|
# Then merge in items from list2
|
||||||
|
for item in list2:
|
||||||
|
if key_name not in item:
|
||||||
|
raise AnsibleFilterError(f"Item {item} is missing the key '{key_name}'")
|
||||||
|
k = item[key_name]
|
||||||
|
if k in merged:
|
||||||
|
# update will overwrite existing fields or add new ones
|
||||||
|
merged[k].update(item)
|
||||||
|
else:
|
||||||
|
merged[k] = item.copy()
|
||||||
|
|
||||||
|
# Return as a list of dicts again
|
||||||
|
return list(merged.values())
|
||||||
|
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
def filters(self):
|
||||||
|
return {
|
||||||
|
'merge_mapping': merge_mapping,
|
||||||
|
}
|
@ -1,37 +0,0 @@
|
|||||||
# roles/<your-role>/filter_plugins/redirect_filters.py
|
|
||||||
from ansible.errors import AnsibleFilterError
|
|
||||||
|
|
||||||
class FilterModule(object):
|
|
||||||
"""
|
|
||||||
Custom filters for redirect domain mappings
|
|
||||||
"""
|
|
||||||
|
|
||||||
def filters(self):
|
|
||||||
return {
|
|
||||||
"add_redirect_if_group": self.add_redirect_if_group,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def add_redirect_if_group(redirect_list, group, source, target, group_names):
|
|
||||||
"""
|
|
||||||
Append {"source": source, "target": target} to *redirect_list*
|
|
||||||
**only** if *group* is contained in *group_names*.
|
|
||||||
|
|
||||||
Usage in Jinja:
|
|
||||||
{{ redirect_list
|
|
||||||
| add_redirect_if_group('lam',
|
|
||||||
'ldap.' ~ primary_domain,
|
|
||||||
domains | get_domain('lam'),
|
|
||||||
group_names) }}
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Make a copy so we don’t mutate the original list in place
|
|
||||||
redirects = list(redirect_list)
|
|
||||||
|
|
||||||
if group in group_names:
|
|
||||||
redirects.append({"source": source, "target": target})
|
|
||||||
|
|
||||||
return redirects
|
|
||||||
|
|
||||||
except Exception as exc:
|
|
||||||
raise AnsibleFilterError(f"add_redirect_if_group failed: {exc}")
|
|
@ -50,7 +50,7 @@ ports:
|
|||||||
keycloak: 8032
|
keycloak: 8032
|
||||||
lam: 8033
|
lam: 8033
|
||||||
phpmyadmin: 8034
|
phpmyadmin: 8034
|
||||||
snipe_it: 8035
|
snipe-it: 8035
|
||||||
sphinx: 8036
|
sphinx: 8036
|
||||||
phpldapadmin: 8037
|
phpldapadmin: 8037
|
||||||
fusiondirectory: 8038
|
fusiondirectory: 8038
|
||||||
|
@ -62,7 +62,7 @@ defaults_networks:
|
|||||||
subnet: 192.168.102.128/28
|
subnet: 192.168.102.128/28
|
||||||
pgadmin:
|
pgadmin:
|
||||||
subnet: 192.168.102.144/28
|
subnet: 192.168.102.144/28
|
||||||
snipe_it:
|
snipe-it:
|
||||||
subnet: 192.168.102.160/28
|
subnet: 192.168.102.160/28
|
||||||
taiga:
|
taiga:
|
||||||
subnet: 192.168.102.176/28
|
subnet: 192.168.102.176/28
|
||||||
|
@ -1,6 +1,2 @@
|
|||||||
defaults_domains: "{{ defaults_applications | canonical_domains_map(primary_domain) }}"
|
|
||||||
|
|
||||||
defaults_redirect_domain_mappings: "{{ applications | domain_mappings(primary_domain) }}"
|
|
||||||
|
|
||||||
# Domains which are deprecated and should be cleaned up
|
# Domains which are deprecated and should be cleaned up
|
||||||
deprecated_domains: []
|
deprecated_domains: []
|
@ -9,12 +9,12 @@ defaults_service_provider:
|
|||||||
city: "Cybertown"
|
city: "Cybertown"
|
||||||
postal_code: "00001"
|
postal_code: "00001"
|
||||||
country: "Nexusland"
|
country: "Nexusland"
|
||||||
logo: "{{ applications.assets_server.url | safe_var | safe_join('img/logo.png') }}"
|
logo: "{{ applications['assets-server'].url | safe_var | safe_join('img/logo.png') }}"
|
||||||
platform:
|
platform:
|
||||||
titel: "CyMaIS Demo"
|
titel: "CyMaIS Demo"
|
||||||
subtitel: "The Future of Self-Hosted Infrastructure. Secure. Automated. Sovereign."
|
subtitel: "The Future of Self-Hosted Infrastructure. Secure. Automated. Sovereign."
|
||||||
logo: "{{ applications.assets_server.url | safe_var | safe_join('img/logo.png') }}"
|
logo: "{{ applications['assets-server'].url | safe_var | safe_join('img/logo.png') }}"
|
||||||
favicon: "{{ applications.assets_server.url | safe_var | safe_join('img/favicon.ico') }}"
|
favicon: "{{ applications['assets-server'].url | safe_var | safe_join('img/favicon.ico') }}"
|
||||||
contact:
|
contact:
|
||||||
bluesky: >-
|
bluesky: >-
|
||||||
{{ ('@' ~ users.administrator.username ~ '.' ~ domains.bluesky.api)
|
{{ ('@' ~ users.administrator.username ~ '.' ~ domains.bluesky.api)
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
---
|
|
||||||
- name: "include docker-compose role"
|
- name: "include docker-compose role"
|
||||||
include_role:
|
include_role:
|
||||||
name: docker-compose
|
name: docker-compose
|
||||||
@ -10,8 +9,8 @@
|
|||||||
domain: "{{ item.domain }}"
|
domain: "{{ item.domain }}"
|
||||||
http_port: "{{ item.http_port }}"
|
http_port: "{{ item.http_port }}"
|
||||||
loop:
|
loop:
|
||||||
- { domain: domains.[application_id].api, http_port: ports.localhost.http.bluesky_api }
|
- { domain: "{{domains.[application_id].api", http_port: "{{ports.localhost.http.bluesky_api}}" }
|
||||||
- { domain: domains.[application_id].web, http_port: ports.localhost.http.bluesky_web }
|
- { domain: "{{domains.[application_id].web}}", http_port: "{{ports.localhost.http.bluesky_web}}" }
|
||||||
|
|
||||||
# The following lines should be removed when the following issue is closed:
|
# The following lines should be removed when the following issue is closed:
|
||||||
# https://github.com/bluesky-social/pds/issues/52
|
# https://github.com/bluesky-social/pds/issues/52
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
{# receives https certificate and setup proxy with domain replace #}
|
|
||||||
|
|
||||||
- name: "include role receive certbot certificate"
|
- name: "include role receive certbot certificate"
|
||||||
include_role:
|
include_role:
|
||||||
name: nginx-https-get-cert
|
name: nginx-https-get-cert
|
||||||
|
@ -116,7 +116,7 @@ portfolio_menu_categories:
|
|||||||
- accounting
|
- accounting
|
||||||
- invoices
|
- invoices
|
||||||
- akaunting
|
- akaunting
|
||||||
- snipe_it
|
- snipe-it
|
||||||
|
|
||||||
Events:
|
Events:
|
||||||
description: "Event and ticket management tools"
|
description: "Event and ticket management tools"
|
||||||
|
@ -5,12 +5,12 @@ services:
|
|||||||
{% include 'roles/docker-central-database/templates/services/' + database_type + '.yml.j2' %}
|
{% include 'roles/docker-central-database/templates/services/' + database_type + '.yml.j2' %}
|
||||||
|
|
||||||
application:
|
application:
|
||||||
image: grokability/snipe-it:{{applications.snipe_it.version}}
|
image: grokability/snipe-it:{{applications[application_id].version}}
|
||||||
{% include 'roles/docker-compose/templates/services/base.yml.j2' %}
|
{% include 'roles/docker-compose/templates/services/base.yml.j2' %}
|
||||||
volumes:
|
volumes:
|
||||||
- data:/var/lib/snipeit
|
- data:/var/lib/snipeit
|
||||||
ports:
|
ports:
|
||||||
- "127.0.0.1:{{ports.localhost.http.snipe_it}}:80"
|
- "127.0.0.1:{{ports.localhost.http[application_id]}}:80"
|
||||||
{% include 'templates/docker/container/depends-on-database-redis.yml.j2' %}
|
{% include 'templates/docker/container/depends-on-database-redis.yml.j2' %}
|
||||||
{% include 'templates/docker/container/networks.yml.j2' %}
|
{% include 'templates/docker/container/networks.yml.j2' %}
|
||||||
healthcheck:
|
healthcheck:
|
3
roles/docker-snipe-it/vars/main.yml
Normal file
3
roles/docker-snipe-it/vars/main.yml
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
application_id: "snipe-it"
|
||||||
|
database_password: "{{applications[application_id].credentials.database_password}}"
|
||||||
|
database_type: "mariadb"
|
@ -1,3 +0,0 @@
|
|||||||
application_id: "snipe_it"
|
|
||||||
database_password: "{{applications.snipe_it.credentials.database_password}}"
|
|
||||||
database_type: "mariadb"
|
|
@ -47,7 +47,7 @@ for filename in os.listdir(config_path):
|
|||||||
# Prepare the URL and expected status codes
|
# Prepare the URL and expected status codes
|
||||||
url = f"{{ web_protocol }}://{domain}"
|
url = f"{{ web_protocol }}://{domain}"
|
||||||
|
|
||||||
redirected_domains = [domain['source'] for domain in {{redirect_domain_mappings}}]
|
redirected_domains = [domain['source'] for domain in {{current_play_redirect_domain_mappings}}]
|
||||||
{%- if domains.mailu | safe_var | bool %}
|
{%- if domains.mailu | safe_var | bool %}
|
||||||
redirected_domains.append("{{domains | get_domain('mailu')}}")
|
redirected_domains.append("{{domains | get_domain('mailu')}}")
|
||||||
{%- endif %}
|
{%- endif %}
|
||||||
|
@ -5,3 +5,4 @@ caa_entries:
|
|||||||
# value: "letsencrypt.org"
|
# value: "letsencrypt.org"
|
||||||
# - tag: iodef
|
# - tag: iodef
|
||||||
# value: "mailto:{{ users.administrator.email }}"
|
# value: "mailto:{{ users.administrator.email }}"
|
||||||
|
base_sld_domains: "{{ current_play_domains_all | generate_base_sld_domains }}"
|
@ -8,7 +8,7 @@
|
|||||||
- name: Generate SAN certificate with certbundle
|
- name: Generate SAN certificate with certbundle
|
||||||
command: >-
|
command: >-
|
||||||
certbundle
|
certbundle
|
||||||
--domains "{{ all_domains | join(',') }}"
|
--domains "{{ current_play_domains_all | join(',') }}"
|
||||||
--certbot-email "{{ users.administrator.email }}"
|
--certbot-email "{{ users.administrator.email }}"
|
||||||
--certbot-acme-challenge-method "{{ certbot_acme_challenge_method }}"
|
--certbot-acme-challenge-method "{{ certbot_acme_challenge_method }}"
|
||||||
--chunk-size 100
|
--chunk-size 100
|
||||||
|
@ -7,7 +7,7 @@ _paq.push(["trackPageView"]);
|
|||||||
_paq.push(["trackAllContentImpressions"]);
|
_paq.push(["trackAllContentImpressions"]);
|
||||||
_paq.push(["enableLinkTracking"]);
|
_paq.push(["enableLinkTracking"]);
|
||||||
(function() {
|
(function() {
|
||||||
var u="//{{domains.matomo}}/";
|
var u="//{{ domains | get_domain('matomo') }}/";
|
||||||
_paq.push(["setTrackerUrl", u+"matomo.php"]);
|
_paq.push(["setTrackerUrl", u+"matomo.php"]);
|
||||||
_paq.push(["setSiteId", "{{matomo_site_id}}"]);
|
_paq.push(["setSiteId", "{{matomo_site_id}}"]);
|
||||||
var d=document, g=d.createElement("script"), s=d.getElementsByTagName("script")[0];
|
var d=document, g=d.createElement("script"), s=d.getElementsByTagName("script")[0];
|
||||||
|
@ -1,2 +1,2 @@
|
|||||||
base_domain: "{{ domain | regex_replace('^(?:.*\\.)?(.+\\..+)$', '\\1') }}"
|
base_domain: "{{ domain | regex_replace('^(?:.*\\.)?(.+\\..+)$', '\\1') }}"
|
||||||
verification_url: "{{ web_protocol }}://{{domains.matomo}}/index.php?module=API&method=SitesManager.getSitesIdFromSiteUrl&url=https://{{base_domain}}&format=json&token_auth={{applications.matomo.credentials.auth_token}}"
|
verification_url: "{{ web_protocol }}://{{domains | get_domain('matomo')}}/index.php?module=API&method=SitesManager.getSitesIdFromSiteUrl&url=https://{{base_domain}}&format=json&token_auth={{applications.matomo.credentials.auth_token}}"
|
@ -1,7 +1,7 @@
|
|||||||
---
|
---
|
||||||
- name: Filter www-prefixed domains from all_domains
|
- name: Filter www-prefixed domains from current_play_domains_all
|
||||||
set_fact:
|
set_fact:
|
||||||
www_domains: "{{ all_domains | select('match', '^www\\.') | list }}"
|
www_domains: "{{ current_play_domains_all | select('match', '^www\\.') | list }}"
|
||||||
|
|
||||||
- name: Include nginx-redirect-domains role for www-to-bare redirects
|
- name: Include nginx-redirect-domains role for www-to-bare redirects
|
||||||
include_role:
|
include_role:
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
source_directory: "{{ playbook_dir }}/assets" # Directory from which the assets will be copied
|
source_directory: "{{ playbook_dir }}/assets"
|
||||||
url: >-
|
url: "{{ web_protocol ~ '://' ~ 'file-server'
|
||||||
{{ (web_protocol ~ '://' ~ domains.file_server | safe_var ~ '/assets')
|
| load_configuration('domains.canonical[0]') ~ '/assets' }}"
|
||||||
if domains.file_server | safe_var else '' }}
|
|
@ -1,3 +1,3 @@
|
|||||||
application_id: "assets_server" # Application identifier
|
application_id: "assets-server" # Application identifier
|
||||||
source_directory: "{{ applications[application_id].source_directory }}/" # Source directory from which the files are coming from
|
source_directory: "{{ applications[application_id].source_directory }}/" # Source directory from which the files are coming from
|
||||||
target_directory: "{{ nginx.directories.data.files }}assets" # Directory to which the files will be copied
|
target_directory: "{{ nginx.directories.data.files }}assets" # Directory to which the files will be copied
|
||||||
|
@ -31,5 +31,5 @@ The Nginx File Server role is ideal for hosting static files, sharing resources
|
|||||||
|
|
||||||
- [Nginx Official Website](https://nginx.org/)
|
- [Nginx Official Website](https://nginx.org/)
|
||||||
- [Let's Encrypt](https://letsencrypt.org/)
|
- [Let's Encrypt](https://letsencrypt.org/)
|
||||||
- [HTTP File Server (Wikipedia)](https://en.wikipedia.org/wiki/HTTP_File_Server)
|
- [HTTP File Server (Wikipedia)](https://en.wikipedia.org/wiki/HTTP_file-server)
|
||||||
- [HTTPS (Wikipedia)](https://en.wikipedia.org/wiki/HTTPS)
|
- [HTTPS (Wikipedia)](https://en.wikipedia.org/wiki/HTTPS)
|
||||||
|
@ -1,2 +1,2 @@
|
|||||||
application_id: "file_server"
|
application_id: "file-server"
|
||||||
domain: "{{ domains | get_domain(application_id) }}"
|
domain: "{{ domains | get_domain(application_id) }}"
|
@ -9,44 +9,55 @@
|
|||||||
set_fact:
|
set_fact:
|
||||||
system_email: "{{ default_system_email | combine(system_email | default({}, true), recursive=True) }}"
|
system_email: "{{ default_system_email | combine(system_email | default({}, true), recursive=True) }}"
|
||||||
|
|
||||||
|
- name: Merge current play applications
|
||||||
|
set_fact:
|
||||||
|
current_play_applications: >-
|
||||||
|
{{
|
||||||
|
defaults_applications |
|
||||||
|
combine(applications | default({}, true), recursive=True) |
|
||||||
|
applications_if_group_and_deps(group_names)
|
||||||
|
}}
|
||||||
|
|
||||||
|
- name: Merge current play domain definitions
|
||||||
|
set_fact:
|
||||||
|
current_play_domains: >-
|
||||||
|
{{ current_play_applications |
|
||||||
|
canonical_domains_map(primary_domain) |
|
||||||
|
combine(domains | default({}, true), recursive=True)
|
||||||
|
}}
|
||||||
|
|
||||||
|
- name: Set current play all domains incl. www redirect if enabled
|
||||||
|
set_fact:
|
||||||
|
current_play_domains_all: >-
|
||||||
|
{{
|
||||||
|
current_play_domains |
|
||||||
|
generate_all_domains(
|
||||||
|
('www_redirect' in group_names)
|
||||||
|
)
|
||||||
|
}}
|
||||||
|
|
||||||
|
- name: Set current play redirect domain mappings
|
||||||
|
set_fact:
|
||||||
|
current_play_redirect_domain_mappings: >-
|
||||||
|
{{
|
||||||
|
current_play_applications |
|
||||||
|
domain_mappings(primary_domain) |
|
||||||
|
merge_mapping(redirect_domain_mappings, 'source')
|
||||||
|
}}
|
||||||
|
|
||||||
- name: Merge application definitions
|
- name: Merge application definitions
|
||||||
set_fact:
|
set_fact:
|
||||||
applications: "{{ defaults_applications | combine(applications | default({}, true), recursive=True) }}"
|
applications: "{{ defaults_applications | combine(applications | default({}, true), recursive=True) }}"
|
||||||
|
|
||||||
- name: Merge domain definitions
|
- name: Merge domain definitions for all domains
|
||||||
set_fact:
|
set_fact:
|
||||||
domains: "{{ defaults_domains | combine(domains | default({}, true), recursive=True) }}"
|
domains: >-
|
||||||
|
|
||||||
- name: Merge redirect domain definitions into dictionary
|
|
||||||
set_fact:
|
|
||||||
combined_mapping: >-
|
|
||||||
{{
|
{{
|
||||||
(defaults_redirect_domain_mappings | items2dict(key_name='source', value_name='target'))
|
defaults_applications |
|
||||||
| combine(
|
canonical_domains_map(primary_domain) |
|
||||||
(redirect_domain_mappings | default([]) | items2dict(key_name='source', value_name='target')),
|
combine(domains | default({}, true), recursive=True)
|
||||||
recursive=True
|
|
||||||
)
|
|
||||||
}}
|
}}
|
||||||
|
|
||||||
- name: Transform combined mapping to list with source and target keys
|
|
||||||
set_fact:
|
|
||||||
redirect_domain_mappings: "{{ redirect_domain_mappings | default([]) + [ {'source': item.key, 'target': item.value} ] }}"
|
|
||||||
loop: "{{ combined_mapping | dict2items }}"
|
|
||||||
|
|
||||||
# @todo implement
|
|
||||||
# - name: Ensure features.integrated is set based on group membership
|
|
||||||
# set_fact:
|
|
||||||
# applications: "{{ applications | combine({ item.key: updated_app }, recursive=True) }}"
|
|
||||||
# vars:
|
|
||||||
# original_app: "{{ applications[item.key] | default({}) }}"
|
|
||||||
# original_features: "{{ original_app.features | default({}) }}"
|
|
||||||
# needs_integration: original_features.integrated is not defined
|
|
||||||
# updated_features: >-
|
|
||||||
# {{ original_features | combine({'integrated': (item.key in group_names)}) if needs_integration else original_features }}
|
|
||||||
# updated_app: >-
|
|
||||||
# {{ original_app | combine({'features': updated_features}) }}
|
|
||||||
# loop: "{{ applications | dict2items }}"
|
|
||||||
|
|
||||||
- name: Merge networks definitions
|
- name: Merge networks definitions
|
||||||
set_fact:
|
set_fact:
|
||||||
networks: "{{ defaults_networks | combine(networks | default({}, true), recursive=True) }}"
|
networks: "{{ defaults_networks | combine(networks | default({}, true), recursive=True) }}"
|
||||||
@ -63,34 +74,6 @@
|
|||||||
set_fact:
|
set_fact:
|
||||||
service_provider: "{{ defaults_service_provider | combine(service_provider | default({}, true), recursive=True) }}"
|
service_provider: "{{ defaults_service_provider | combine(service_provider | default({}, true), recursive=True) }}"
|
||||||
|
|
||||||
- name: Build base_sld_domains (sld.tld) in one go
|
|
||||||
set_fact:
|
|
||||||
base_sld_domains: >-
|
|
||||||
{{ domains
|
|
||||||
| generate_base_sld_domains(redirect_domain_mappings)
|
|
||||||
}}
|
|
||||||
|
|
||||||
- name: Set all domains incl. www redirect if enabled
|
|
||||||
set_fact:
|
|
||||||
all_domains: >-
|
|
||||||
{{ domains
|
|
||||||
| generate_all_domains(
|
|
||||||
('www_redirect' in group_names)
|
|
||||||
)
|
|
||||||
}}
|
|
||||||
|
|
||||||
- name: "Merged Variables"
|
|
||||||
# Add new merged variables here
|
|
||||||
debug:
|
|
||||||
msg:
|
|
||||||
domains: "{{ domains }}"
|
|
||||||
applications: "{{ applications }}"
|
|
||||||
oidc: "{{ oidc }}"
|
|
||||||
service_provider: "{{ service_provider }}"
|
|
||||||
users: "{{ users }}"
|
|
||||||
all_domains: "{{ all_domains }}"
|
|
||||||
when: enable_debug | bool
|
|
||||||
|
|
||||||
- name: init root user
|
- name: init root user
|
||||||
include_role:
|
include_role:
|
||||||
name: user-root
|
name: user-root
|
||||||
|
@ -32,7 +32,7 @@
|
|||||||
include_role:
|
include_role:
|
||||||
name: nginx-redirect-domains
|
name: nginx-redirect-domains
|
||||||
vars:
|
vars:
|
||||||
domain_mappings: "{{redirect_domain_mappings}}"
|
domain_mappings: "{{current_play_redirect_domain_mappings}}"
|
||||||
|
|
||||||
- name: setup www redirect
|
- name: setup www redirect
|
||||||
when: ("www_redirect" in group_names)
|
when: ("www_redirect" in group_names)
|
||||||
|
47
tests/integration/test_yaml_syntax.py
Normal file
47
tests/integration/test_yaml_syntax.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
class TestYamlSyntax(unittest.TestCase):
|
||||||
|
def test_all_yml_files_are_valid_yaml(self):
|
||||||
|
"""
|
||||||
|
Walk the entire repository, find all *.yml files and try to parse them
|
||||||
|
with yaml.safe_load(). Fail the test if any file contains invalid YAML.
|
||||||
|
"""
|
||||||
|
repo_root = os.path.abspath(
|
||||||
|
os.path.join(os.path.dirname(__file__), '..', '..')
|
||||||
|
)
|
||||||
|
|
||||||
|
invalid = []
|
||||||
|
|
||||||
|
for dirpath, dirnames, filenames in os.walk(repo_root):
|
||||||
|
# skip hidden directories (like .git, .venv, etc.)
|
||||||
|
dirnames[:] = [d for d in dirnames if not d.startswith('.')]
|
||||||
|
for fname in filenames:
|
||||||
|
if not fname.endswith('.yml'):
|
||||||
|
continue
|
||||||
|
full = os.path.join(dirpath, fname)
|
||||||
|
# skip any large auto‐generated files if needed:
|
||||||
|
# if 'some/path/to/skip' in full: continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(full, 'r') as f:
|
||||||
|
yaml.safe_load(f)
|
||||||
|
except yaml.YAMLError as e:
|
||||||
|
invalid.append((full, str(e)))
|
||||||
|
except Exception as e:
|
||||||
|
invalid.append((full, f"Unexpected error: {e}"))
|
||||||
|
|
||||||
|
if invalid:
|
||||||
|
msg_lines = [
|
||||||
|
f"{path}: {err.splitlines()[0]}" # just the first line of the error
|
||||||
|
for path, err in invalid
|
||||||
|
]
|
||||||
|
self.fail(
|
||||||
|
"Found invalid YAML in the following files:\n" +
|
||||||
|
"\n".join(msg_lines)
|
||||||
|
)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
83
tests/unit/test_applications_if_group_and_deps.py
Normal file
83
tests/unit/test_applications_if_group_and_deps.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch, mock_open
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
# ensure filter_plugins is on the path
|
||||||
|
dir_path = os.path.abspath(
|
||||||
|
os.path.join(os.path.dirname(__file__), '../../filter_plugins')
|
||||||
|
)
|
||||||
|
sys.path.insert(0, dir_path)
|
||||||
|
|
||||||
|
from applications_if_group_and_deps import FilterModule
|
||||||
|
|
||||||
|
class TestApplicationsIfGroupAndDeps(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.filter = FilterModule()
|
||||||
|
# minimal applications dict
|
||||||
|
self.apps = {
|
||||||
|
'app1': {'foo': 'bar'},
|
||||||
|
'app2': {'baz': 'qux'},
|
||||||
|
'roleA': {'some': 'cfg'},
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_invalid_inputs(self):
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.filter.applications_if_group_and_deps('not a dict', [])
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.filter.applications_if_group_and_deps({}, 'not a list')
|
||||||
|
|
||||||
|
def test_direct_inclusion(self):
|
||||||
|
# if an app key is directly in group_names it should be returned
|
||||||
|
groups = ['app1', 'unrelated']
|
||||||
|
result = self.filter.applications_if_group_and_deps(self.apps, groups)
|
||||||
|
self.assertEqual(set(result.keys()), {'app1'})
|
||||||
|
|
||||||
|
@patch('applications_if_group_and_deps.yaml.safe_load')
|
||||||
|
@patch('applications_if_group_and_deps.open', new_callable=mock_open)
|
||||||
|
@patch('applications_if_group_and_deps.os.path.isfile')
|
||||||
|
def test_indirect_inclusion_via_dependencies(self, mock_isfile, mock_file, mock_yaml):
|
||||||
|
"""
|
||||||
|
Simulate that group 'groupX' has a dependency on 'roleA', and that
|
||||||
|
roleA's vars/main.yml contains application_id: 'roleA'.
|
||||||
|
Then passing group_names=['groupX'] should include 'roleA'.
|
||||||
|
"""
|
||||||
|
# pretend both meta/main.yml and vars/main.yml exist
|
||||||
|
mock_isfile.return_value = True
|
||||||
|
|
||||||
|
# safe_load() calls:
|
||||||
|
# 1) groupX/meta/main.yml → dependencies ['roleA']
|
||||||
|
# 2) roleA/meta/main.yml → dependencies []
|
||||||
|
# 3) roleA/vars/main.yml → application_id 'roleA'
|
||||||
|
mock_yaml.side_effect = [
|
||||||
|
{'dependencies': ['roleA']},
|
||||||
|
{'dependencies': []},
|
||||||
|
{'application_id': 'roleA'}
|
||||||
|
]
|
||||||
|
|
||||||
|
result = self.filter.applications_if_group_and_deps(self.apps, ['groupX'])
|
||||||
|
self.assertEqual(set(result.keys()), {'roleA'})
|
||||||
|
|
||||||
|
@patch('applications_if_group_and_deps.yaml.safe_load')
|
||||||
|
@patch('applications_if_group_and_deps.open', new_callable=mock_open)
|
||||||
|
@patch('applications_if_group_and_deps.os.path.isfile')
|
||||||
|
def test_no_vars_file(self, mock_isfile, mock_file, mock_yaml):
|
||||||
|
"""
|
||||||
|
If a meta/main.yml dependency exists but vars/main.yml is missing,
|
||||||
|
that role won't contribute an application_id, so nothing is returned.
|
||||||
|
"""
|
||||||
|
# meta exists, vars does not
|
||||||
|
def isfile_side(path):
|
||||||
|
return path.endswith('meta/main.yml')
|
||||||
|
mock_isfile.side_effect = isfile_side
|
||||||
|
|
||||||
|
# meta declares dependency
|
||||||
|
mock_yaml.return_value = {'dependencies': ['roleA']}
|
||||||
|
|
||||||
|
result = self.filter.applications_if_group_and_deps(self.apps, ['groupX'])
|
||||||
|
self.assertEqual(result, {})
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
@ -175,7 +175,7 @@ class TestCspFilters(unittest.TestCase):
|
|||||||
# Ensure feature enabled and domain set
|
# Ensure feature enabled and domain set
|
||||||
self.apps['app1']['features']['portfolio_iframe'] = True
|
self.apps['app1']['features']['portfolio_iframe'] = True
|
||||||
# simulate a subdomain for the application
|
# simulate a subdomain for the application
|
||||||
self.domains['app1'] = 'sub.domain-example.com'
|
self.domains['portfolio'] = ['domain-example.com']
|
||||||
|
|
||||||
header = self.filter.build_csp_header(self.apps, 'app1', self.domains, web_protocol='https')
|
header = self.filter.build_csp_header(self.apps, 'app1', self.domains, web_protocol='https')
|
||||||
# Expect '*.domain-example.com' in the frame-ancestors directive
|
# Expect '*.domain-example.com' in the frame-ancestors directive
|
||||||
|
@ -5,44 +5,66 @@ import os
|
|||||||
# Ensure filter_plugins directory is on the path
|
# Ensure filter_plugins directory is on the path
|
||||||
sys.path.insert(
|
sys.path.insert(
|
||||||
0,
|
0,
|
||||||
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
os.path.abspath(os.path.join(os.path.dirname(__file__), '../filter_plugins'))
|
||||||
)
|
)
|
||||||
|
|
||||||
from generate_base_sld_domains import FilterModule
|
from generate_base_sld_domains import FilterModule
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
class TestGenerateBaseSldDomains(unittest.TestCase):
|
class TestGenerateBaseSldDomains(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.filter = FilterModule().generate_base_sld_domains
|
self.filter = FilterModule().generate_base_sld_domains
|
||||||
|
|
||||||
def test_simple_string_and_redirect(self):
|
def test_simple_list(self):
|
||||||
domains = {'app': 'sub.example.com'}
|
domains = [
|
||||||
redirects = [{'source': 'alias.example.com'}]
|
'sub.example.com',
|
||||||
result = self.filter(domains, redirects)
|
'alias.example.com',
|
||||||
|
'example.com'
|
||||||
|
]
|
||||||
|
result = self.filter(domains)
|
||||||
self.assertEqual(result, ['example.com'])
|
self.assertEqual(result, ['example.com'])
|
||||||
|
|
||||||
def test_without_redirect_mappings(self):
|
def test_mixed_tlds_and_subdomains(self):
|
||||||
domains = {
|
domains = [
|
||||||
'a': 'a.co',
|
'a.co', 'b.co', 'sub.b.co', 'x.co', 'www.x.co'
|
||||||
'b': ['b.co', 'sub.c.co'],
|
]
|
||||||
'c': {'x': 'x.co'}
|
result = self.filter(domains)
|
||||||
}
|
self.assertEqual(result, ['a.co', 'b.co', 'x.co'])
|
||||||
result = self.filter(domains, None)
|
|
||||||
self.assertEqual(result, ['a.co', 'b.co', 'c.co', 'x.co'])
|
|
||||||
|
|
||||||
def test_redirect_list_sources(self):
|
def test_invalid_non_string_raise(self):
|
||||||
domains = {'app': 'app.domain.org'}
|
for bad in [42, None]:
|
||||||
redirects = [{'source': ['alias.domain.org', 'deep.sub.example.net']}]
|
with self.assertRaises(AnsibleFilterError):
|
||||||
result = self.filter(domains, redirects)
|
self.filter([bad])
|
||||||
self.assertEqual(result, ['domain.org', 'example.net'])
|
|
||||||
|
|
||||||
def test_duplicate_entries_and_sorting(self):
|
def test_localhost_allowed(self):
|
||||||
domains = {
|
domains = ['localhost']
|
||||||
'x': ['one.com', 'sub.one.com'],
|
result = self.filter(domains)
|
||||||
'y': 'two.com',
|
self.assertEqual(result, ['localhost'])
|
||||||
'z': {'k': 'one.com'}
|
|
||||||
}
|
def test_ip_raises(self):
|
||||||
redirects = [{'source': 'deep.two.com'}]
|
with self.assertRaises(AnsibleFilterError):
|
||||||
result = self.filter(domains, redirects)
|
self.filter(['127.0.0.1'])
|
||||||
|
|
||||||
|
def test_nested_subdomains(self):
|
||||||
|
domains = ['sub.sub2.one']
|
||||||
|
result = self.filter(domains)
|
||||||
|
self.assertEqual(result, ['sub2.one'])
|
||||||
|
|
||||||
|
def test_deeply_nested_subdomains(self):
|
||||||
|
domains = ['sub3.sub2.sub1.one']
|
||||||
|
result = self.filter(domains)
|
||||||
|
self.assertEqual(result, ['sub1.one'])
|
||||||
|
|
||||||
|
def test_empty_and_malformed_raise(self):
|
||||||
|
for bad in ['', '.', '...']:
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.filter([bad])
|
||||||
|
|
||||||
|
def test_sorting_and_duplicates(self):
|
||||||
|
domains = [
|
||||||
|
'one.com', 'sub.one.com', 'two.com', 'deep.two.com', 'one.com'
|
||||||
|
]
|
||||||
|
result = self.filter(domains)
|
||||||
self.assertEqual(result, ['one.com', 'two.com'])
|
self.assertEqual(result, ['one.com', 'two.com'])
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -30,9 +30,7 @@ class TestDomainMappings(unittest.TestCase):
|
|||||||
def test_empty_domains_cfg(self):
|
def test_empty_domains_cfg(self):
|
||||||
apps = {'app1': {'domains': {}}}
|
apps = {'app1': {'domains': {}}}
|
||||||
default = 'app1.example.com'
|
default = 'app1.example.com'
|
||||||
expected = [
|
expected = []
|
||||||
{'source': default, 'target': default}
|
|
||||||
]
|
|
||||||
result = self.filter.domain_mappings(apps, self.primary)
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
self.assertEqual(result, expected)
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
@ -45,7 +43,6 @@ class TestDomainMappings(unittest.TestCase):
|
|||||||
default = 'app1.example.com'
|
default = 'app1.example.com'
|
||||||
expected = [
|
expected = [
|
||||||
{'source': 'alias.com', 'target': default},
|
{'source': 'alias.com', 'target': default},
|
||||||
{'source': default, 'target': default},
|
|
||||||
]
|
]
|
||||||
result = self.filter.domain_mappings(apps, self.primary)
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
# order not important
|
# order not important
|
||||||
@ -84,10 +81,7 @@ class TestDomainMappings(unittest.TestCase):
|
|||||||
'app2': {'domains': {'canonical': ['c2.com']}},
|
'app2': {'domains': {'canonical': ['c2.com']}},
|
||||||
}
|
}
|
||||||
expected = [
|
expected = [
|
||||||
# app1
|
|
||||||
{'source': 'a1.com', 'target': 'app1.example.com'},
|
{'source': 'a1.com', 'target': 'app1.example.com'},
|
||||||
{'source': 'app1.example.com', 'target': 'app1.example.com'},
|
|
||||||
# app2
|
|
||||||
{'source': 'app2.example.com', 'target': 'c2.com'},
|
{'source': 'app2.example.com', 'target': 'c2.com'},
|
||||||
]
|
]
|
||||||
result = self.filter.domain_mappings(apps, self.primary)
|
result = self.filter.domain_mappings(apps, self.primary)
|
||||||
|
@ -1,51 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from filter_plugins.group_domain_filters import FilterModule
|
|
||||||
|
|
||||||
class TestAddDomainIfGroup(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self.filter = FilterModule().filters()["add_domain_if_group"]
|
|
||||||
|
|
||||||
def test_add_string_value(self):
|
|
||||||
result = self.filter({}, "akaunting", "accounting.example.org", ["akaunting"])
|
|
||||||
self.assertEqual(result, {"akaunting": "accounting.example.org"})
|
|
||||||
|
|
||||||
def test_add_list_value(self):
|
|
||||||
result = self.filter({}, "mastodon", ["microblog.example.org"], ["mastodon"])
|
|
||||||
self.assertEqual(result, {"mastodon": ["microblog.example.org"]})
|
|
||||||
|
|
||||||
def test_add_dict_value(self):
|
|
||||||
result = self.filter({}, "bluesky", {"web": "bskyweb.example.org", "api": "bluesky.example.org"}, ["bluesky"])
|
|
||||||
self.assertEqual(result, {"bluesky": {"web": "bskyweb.example.org", "api": "bluesky.example.org"}})
|
|
||||||
|
|
||||||
def test_ignore_if_not_in_group(self):
|
|
||||||
result = self.filter({}, "akaunting", "accounting.example.org", ["wordpress"])
|
|
||||||
self.assertEqual(result, {})
|
|
||||||
|
|
||||||
def test_merge_with_existing(self):
|
|
||||||
initial = {"wordpress": ["blog.example.org"]}
|
|
||||||
result = self.filter(initial, "akaunting", "accounting.example.org", ["akaunting"])
|
|
||||||
self.assertEqual(result, {
|
|
||||||
"wordpress": ["blog.example.org"],
|
|
||||||
"akaunting": "accounting.example.org"
|
|
||||||
})
|
|
||||||
|
|
||||||
def test_dict_is_not_mutated(self):
|
|
||||||
base = {"keycloak": "auth.example.org"}
|
|
||||||
copy = dict(base) # make a copy for comparison
|
|
||||||
_ = self.filter(base, "akaunting", "accounting.example.org", ["akaunting"])
|
|
||||||
self.assertEqual(base, copy) # original must stay unchanged
|
|
||||||
|
|
||||||
def test_multiple_adds_accumulate(self):
|
|
||||||
result = {}
|
|
||||||
result = self.filter(result, "akaunting", "accounting.example.org", ["akaunting", "wordpress"])
|
|
||||||
result = self.filter(result, "wordpress", ["blog.example.org"], ["akaunting", "wordpress"])
|
|
||||||
result = self.filter(result, "bluesky", {"web": "bskyweb.example.org", "api": "bluesky.example.org"}, ["bluesky"])
|
|
||||||
self.assertEqual(result, {
|
|
||||||
"akaunting": "accounting.example.org",
|
|
||||||
"wordpress": ["blog.example.org"],
|
|
||||||
"bluesky": {"web": "bskyweb.example.org", "api": "bluesky.example.org"},
|
|
||||||
})
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
@ -1,79 +0,0 @@
|
|||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import shutil
|
|
||||||
import yaml
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
# Import the filter module
|
|
||||||
import filter_plugins.group_domain_filters as gdf_module
|
|
||||||
|
|
||||||
class TestAddDomainIfGroupRecursive(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
# Create a temporary project structure
|
|
||||||
self.tempdir = tempfile.mkdtemp()
|
|
||||||
fp_dir = os.path.join(self.tempdir, 'filter_plugins')
|
|
||||||
roles_dir = os.path.join(self.tempdir, 'roles')
|
|
||||||
os.makedirs(fp_dir, exist_ok=True)
|
|
||||||
os.makedirs(roles_dir, exist_ok=True)
|
|
||||||
# Point module __file__ so plugin_dir resolves correctly
|
|
||||||
gdf_module.__file__ = os.path.join(fp_dir, 'group_domain_filters.py')
|
|
||||||
self.roles_dir = roles_dir
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
shutil.rmtree(self.tempdir)
|
|
||||||
|
|
||||||
def write_role(self, role_name, dependencies, application_id):
|
|
||||||
"""
|
|
||||||
Helper: write a role directory with meta/main.yml and vars/main.yml
|
|
||||||
"""
|
|
||||||
meta_dir = os.path.join(self.roles_dir, role_name, 'meta')
|
|
||||||
vars_dir = os.path.join(self.roles_dir, role_name, 'vars')
|
|
||||||
os.makedirs(meta_dir, exist_ok=True)
|
|
||||||
os.makedirs(vars_dir, exist_ok=True)
|
|
||||||
# Write meta/main.yml
|
|
||||||
with open(os.path.join(meta_dir, 'main.yml'), 'w') as f:
|
|
||||||
yaml.safe_dump({'dependencies': dependencies}, f)
|
|
||||||
# Write vars/main.yml
|
|
||||||
with open(os.path.join(vars_dir, 'main.yml'), 'w') as f:
|
|
||||||
yaml.safe_dump({'application_id': application_id}, f)
|
|
||||||
|
|
||||||
def test_direct_application_id_in_group_names(self):
|
|
||||||
# If domain_key (application_id) is directly in group_names
|
|
||||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'app1', 'domain1', ['app1'])
|
|
||||||
self.assertEqual(result, {'app1': 'domain1'})
|
|
||||||
|
|
||||||
def test_indirect_dependency_application_id(self):
|
|
||||||
# roleA depends on roleB; roleB has application_id 'appB'
|
|
||||||
self.write_role('roleA', ['roleB'], 'appA')
|
|
||||||
self.write_role('roleB', [], 'appB')
|
|
||||||
# group_names includes roleA, so appB should be reachable
|
|
||||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appB', 'domainB', ['roleA'])
|
|
||||||
self.assertEqual(result, {'appB': 'domainB'})
|
|
||||||
|
|
||||||
def test_multi_level_dependency_application_id(self):
|
|
||||||
# roleX -> roleY -> roleZ; roleZ id is 'appZ'
|
|
||||||
self.write_role('roleX', ['roleY'], 'appX')
|
|
||||||
self.write_role('roleY', ['roleZ'], 'appY')
|
|
||||||
self.write_role('roleZ', [], 'appZ')
|
|
||||||
# Starting from roleX, appZ reachable
|
|
||||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appZ', 'domainZ', ['roleX'])
|
|
||||||
self.assertEqual(result, {'appZ': 'domainZ'})
|
|
||||||
|
|
||||||
def test_domain_key_for_parent_role(self):
|
|
||||||
# roleParent has app 'appP', and depends on roleChild('appC')
|
|
||||||
self.write_role('roleParent', ['roleChild'], 'appP')
|
|
||||||
self.write_role('roleChild', [], 'appC')
|
|
||||||
# Even appP reachable via deps of roleParent (including itself)
|
|
||||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appP', 'domainP', ['roleParent'])
|
|
||||||
self.assertEqual(result, {'appP': 'domainP'})
|
|
||||||
|
|
||||||
def test_no_inclusion_for_unrelated(self):
|
|
||||||
# Unrelated roles
|
|
||||||
self.write_role('roleC', ['roleD'], 'appC')
|
|
||||||
self.write_role('roleD', [], 'appD')
|
|
||||||
# group_names does not include 'roleC' or 'roleD'
|
|
||||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appC', 'domainC', ['otherRole'])
|
|
||||||
self.assertEqual(result, {})
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
118
tests/unit/test_load_configuration.py
Normal file
118
tests/unit/test_load_configuration.py
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch, mock_open
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
# make sure our plugin is on PYTHONPATH
|
||||||
|
root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
||||||
|
sys.path.insert(0, root)
|
||||||
|
|
||||||
|
import load_configuration
|
||||||
|
from load_configuration import FilterModule, _cfg_cache
|
||||||
|
|
||||||
|
class TestLoadConfigurationFilter(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
_cfg_cache.clear()
|
||||||
|
self.f = FilterModule().filters()['load_configuration']
|
||||||
|
self.app = 'html_server'
|
||||||
|
self.nested_cfg = {
|
||||||
|
'html_server': {
|
||||||
|
'features': {'matomo': True},
|
||||||
|
'domains': {'canonical': ['html.example.com']}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.flat_cfg = {
|
||||||
|
'features': {'matomo': False},
|
||||||
|
'domains': {'canonical': ['flat.example.com']}
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_invalid_key(self):
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.f(self.app, None)
|
||||||
|
|
||||||
|
@patch('load_configuration.os.path.isdir', return_value=False)
|
||||||
|
def test_no_roles_dir(self, _):
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.f(self.app, 'features.matomo')
|
||||||
|
|
||||||
|
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||||
|
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||||
|
@patch('load_configuration.os.path.exists', return_value=False)
|
||||||
|
def test_no_matching_role(self, *_):
|
||||||
|
self.assertIsNone(self.f(self.app, 'features.matomo'))
|
||||||
|
|
||||||
|
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||||
|
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||||
|
@patch('load_configuration.os.path.exists')
|
||||||
|
@patch('load_configuration.open', new_callable=mock_open)
|
||||||
|
@patch('load_configuration.yaml.safe_load')
|
||||||
|
def test_primary_missing_conf(self, mock_yaml, mock_file, mock_exists, *_):
|
||||||
|
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml')
|
||||||
|
mock_yaml.return_value = {'application_id': self.app}
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.f(self.app, 'features.matomo')
|
||||||
|
|
||||||
|
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||||
|
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||||
|
@patch('load_configuration.os.path.exists')
|
||||||
|
@patch('load_configuration.open', new_callable=mock_open)
|
||||||
|
@patch('load_configuration.yaml.safe_load')
|
||||||
|
def test_primary_and_cache(self, mock_yaml, mock_file, mock_exists, *_):
|
||||||
|
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml') or p.endswith('vars/configuration.yml')
|
||||||
|
mock_yaml.side_effect = [
|
||||||
|
{'application_id': self.app}, # main.yml
|
||||||
|
self.nested_cfg # configuration.yml
|
||||||
|
]
|
||||||
|
# first load
|
||||||
|
self.assertTrue(self.f(self.app, 'features.matomo'))
|
||||||
|
self.assertIn(self.app, _cfg_cache)
|
||||||
|
mock_yaml.reset_mock()
|
||||||
|
# from cache
|
||||||
|
self.assertEqual(self.f(self.app, 'domains.canonical'),
|
||||||
|
['html.example.com'])
|
||||||
|
mock_yaml.assert_not_called()
|
||||||
|
|
||||||
|
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||||
|
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||||
|
@patch('load_configuration.os.path.exists', return_value=True)
|
||||||
|
@patch('load_configuration.open', mock_open(read_data="html_server: {}"))
|
||||||
|
@patch('load_configuration.yaml.safe_load', return_value={'html_server': {}})
|
||||||
|
def test_key_not_found_after_load(self, *_):
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
self.f(self.app, 'does.not.exist')
|
||||||
|
|
||||||
|
@patch('load_configuration.os.listdir', return_value=['r2'])
|
||||||
|
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||||
|
@patch('load_configuration.os.path.exists')
|
||||||
|
@patch('load_configuration.open', new_callable=mock_open)
|
||||||
|
@patch('load_configuration.yaml.safe_load')
|
||||||
|
def test_fallback_nested(self, mock_yaml, mock_file, mock_exists, *_):
|
||||||
|
mock_exists.side_effect = lambda p: p.endswith('vars/configuration.yml')
|
||||||
|
mock_yaml.return_value = self.nested_cfg
|
||||||
|
# nested fallback must work
|
||||||
|
self.assertTrue(self.f(self.app, 'features.matomo'))
|
||||||
|
self.assertEqual(self.f(self.app, 'domains.canonical'),
|
||||||
|
['html.example.com'])
|
||||||
|
|
||||||
|
@patch('load_configuration.os.listdir', return_value=['r4'])
|
||||||
|
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||||
|
@patch('load_configuration.os.path.exists')
|
||||||
|
@patch('load_configuration.open', new_callable=mock_open)
|
||||||
|
@patch('load_configuration.yaml.safe_load')
|
||||||
|
def test_fallback_with_indexed_key(self, mock_yaml, mock_file, mock_exists, *_):
|
||||||
|
# Testing with an indexed key like domains.canonical[0]
|
||||||
|
mock_exists.side_effect = lambda p: p.endswith('vars/configuration.yml')
|
||||||
|
mock_yaml.return_value = {
|
||||||
|
'file-server': {
|
||||||
|
'domains': {
|
||||||
|
'canonical': ['files.example.com', 'extra.example.com']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
# should get the first element of the canonical domains list
|
||||||
|
self.assertEqual(self.f('file-server', 'domains.canonical[0]'),
|
||||||
|
'files.example.com')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
45
tests/unit/test_merge_mapping.py
Normal file
45
tests/unit/test_merge_mapping.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import unittest
|
||||||
|
from filter_plugins.merge_mapping import merge_mapping
|
||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
class TestMergeMappingFilter(unittest.TestCase):
|
||||||
|
def test_basic_merge_overwrites_and_adds(self):
|
||||||
|
list1 = [
|
||||||
|
{'source': 'a', 'target': 1},
|
||||||
|
{'source': 'b', 'target': 2},
|
||||||
|
]
|
||||||
|
list2 = [
|
||||||
|
{'source': 'b', 'target': 3},
|
||||||
|
{'source': 'c', 'target': 4},
|
||||||
|
]
|
||||||
|
result = merge_mapping(list1, list2, 'source')
|
||||||
|
result_dict = {item['source']: item['target'] for item in result}
|
||||||
|
self.assertEqual(result_dict, {'a': 1, 'b': 3, 'c': 4})
|
||||||
|
|
||||||
|
def test_merge_preserves_and_overwrites_fields(self):
|
||||||
|
list1 = [{'source': 'x', 'value': 100, 'flag': True}]
|
||||||
|
list2 = [{'source': 'x', 'value': 200, 'note': 'updated'}]
|
||||||
|
result = merge_mapping(list1, list2, 'source')
|
||||||
|
self.assertEqual(len(result), 1)
|
||||||
|
merged = result[0]
|
||||||
|
self.assertEqual(merged['value'], 200)
|
||||||
|
self.assertTrue(merged['flag'])
|
||||||
|
self.assertEqual(merged['note'], 'updated')
|
||||||
|
|
||||||
|
def test_empty_lists_return_empty(self):
|
||||||
|
self.assertEqual(merge_mapping([], [], 'source'), [])
|
||||||
|
|
||||||
|
def test_missing_key_raises_error(self):
|
||||||
|
list1 = [{'target': 'no_source'}]
|
||||||
|
list2 = []
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
merge_mapping(list1, list2, 'source')
|
||||||
|
|
||||||
|
def test_non_list_inputs_raise_error(self):
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
merge_mapping("not a list", [], 'source')
|
||||||
|
with self.assertRaises(AnsibleFilterError):
|
||||||
|
merge_mapping([], "not a list", 'source')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
@ -1,57 +0,0 @@
|
|||||||
import os
|
|
||||||
import sys
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
sys.path.insert(
|
|
||||||
0,
|
|
||||||
os.path.abspath(
|
|
||||||
os.path.join(os.path.dirname(__file__), "../../")
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
from filter_plugins.redirect_filters import FilterModule
|
|
||||||
|
|
||||||
|
|
||||||
class TestAddRedirectIfGroup(unittest.TestCase):
|
|
||||||
"""Unit-tests for the add_redirect_if_group filter."""
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
# Obtain the callable once for reuse
|
|
||||||
self.add_redirect = FilterModule().filters()["add_redirect_if_group"]
|
|
||||||
|
|
||||||
def test_appends_redirect_when_group_present(self):
|
|
||||||
original = [{"source": "a", "target": "b"}]
|
|
||||||
result = self.add_redirect(
|
|
||||||
original,
|
|
||||||
group="lam",
|
|
||||||
source="ldap.example.com",
|
|
||||||
target="lam.example.com",
|
|
||||||
group_names=["lam", "other"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Original list must stay unchanged
|
|
||||||
self.assertEqual(len(original), 1)
|
|
||||||
# Result list must contain the extra entry
|
|
||||||
self.assertEqual(len(result), 2)
|
|
||||||
self.assertIn(
|
|
||||||
{"source": "ldap.example.com", "target": "lam.example.com"}, result
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_keeps_list_unchanged_when_group_absent(self):
|
|
||||||
original = [{"source": "a", "target": "b"}]
|
|
||||||
result = self.add_redirect(
|
|
||||||
original,
|
|
||||||
group="lam",
|
|
||||||
source="ldap.example.com",
|
|
||||||
target="lam.example.com",
|
|
||||||
group_names=["unrelated"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# No new entries
|
|
||||||
self.assertEqual(result, original)
|
|
||||||
# But ensure a new list object was returned (no in-place mutation)
|
|
||||||
self.assertIsNot(result, original)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
Loading…
x
Reference in New Issue
Block a user