mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-06-25 11:45:32 +02:00
Compare commits
4 Commits
03f3a31d21
...
3c7825fd23
Author | SHA1 | Date | |
---|---|---|---|
3c7825fd23 | |||
865f3577d4 | |||
f748f9cef1 | |||
efe994a4c5 |
@ -3,6 +3,7 @@
|
||||
import argparse
|
||||
import os
|
||||
import yaml
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
def load_yaml_file(path):
|
||||
@ -36,7 +37,13 @@ def main():
|
||||
continue
|
||||
|
||||
vars_data = load_yaml_file(vars_main)
|
||||
application_id = vars_data.get("application_id")
|
||||
try:
|
||||
application_id = vars_data.get("application_id")
|
||||
except Exception as e:
|
||||
# print the exception message
|
||||
print(f"Warning: failed to read application_id from {vars_data} in {vars_main}.\nException: {e}", file=sys.stderr)
|
||||
# exit with status 0
|
||||
sys.exit(1)
|
||||
|
||||
if not application_id:
|
||||
print(f"[!] Skipping {role_name}: application_id not defined in vars/main.yml")
|
||||
|
80
filter_plugins/applications_if_group_and_deps.py
Normal file
80
filter_plugins/applications_if_group_and_deps.py
Normal file
@ -0,0 +1,80 @@
|
||||
from ansible.errors import AnsibleFilterError
|
||||
import os
|
||||
import sys
|
||||
import yaml
|
||||
|
||||
class FilterModule(object):
|
||||
def filters(self):
|
||||
return {
|
||||
'applications_if_group_and_deps': self.applications_if_group_and_deps,
|
||||
}
|
||||
|
||||
def applications_if_group_and_deps(self, applications, group_names):
|
||||
"""
|
||||
Return only those applications whose key is either:
|
||||
1) directly in group_names, or
|
||||
2) the application_id of any role reachable (recursively)
|
||||
from any group in group_names via meta/dependencies.
|
||||
Expects:
|
||||
- applications: dict mapping application_id → config
|
||||
- group_names: list of active role names
|
||||
"""
|
||||
# validate inputs
|
||||
if not isinstance(applications, dict):
|
||||
raise AnsibleFilterError(f"Expected applications as dict, got {type(applications).__name__}")
|
||||
if not isinstance(group_names, (list, tuple)):
|
||||
raise AnsibleFilterError(f"Expected group_names as list/tuple, got {type(group_names).__name__}")
|
||||
|
||||
# locate roles directory (assume plugin sits in filter_plugins/)
|
||||
plugin_dir = os.path.dirname(__file__)
|
||||
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
|
||||
roles_dir = os.path.join(project_root, 'roles')
|
||||
|
||||
# recursively collect all roles reachable from the given groups
|
||||
def collect_roles(role, seen):
|
||||
if role in seen:
|
||||
return
|
||||
seen.add(role)
|
||||
meta_file = os.path.join(roles_dir, role, 'meta', 'main.yml')
|
||||
if not os.path.isfile(meta_file):
|
||||
return
|
||||
try:
|
||||
with open(meta_file) as f:
|
||||
meta = yaml.safe_load(f) or {}
|
||||
except Exception:
|
||||
return
|
||||
for dep in meta.get('dependencies', []):
|
||||
if isinstance(dep, str):
|
||||
dep_name = dep
|
||||
elif isinstance(dep, dict):
|
||||
dep_name = dep.get('role') or dep.get('name')
|
||||
else:
|
||||
continue
|
||||
collect_roles(dep_name, seen)
|
||||
|
||||
included_roles = set()
|
||||
for grp in group_names:
|
||||
collect_roles(grp, included_roles)
|
||||
|
||||
# gather application_ids from those roles
|
||||
included_app_ids = set()
|
||||
for role in included_roles:
|
||||
vars_file = os.path.join(roles_dir, role, 'vars', 'main.yml')
|
||||
if not os.path.isfile(vars_file):
|
||||
continue
|
||||
try:
|
||||
with open(vars_file) as f:
|
||||
vars_data = yaml.safe_load(f) or {}
|
||||
except Exception:
|
||||
continue
|
||||
app_id = vars_data.get('application_id')
|
||||
if isinstance(app_id, str) and app_id:
|
||||
included_app_ids.add(app_id)
|
||||
|
||||
# build filtered result: include any application whose key is in group_names or in included_app_ids
|
||||
result = {}
|
||||
for app_key, cfg in applications.items():
|
||||
if app_key in group_names or app_key in included_app_ids:
|
||||
result[app_key] = cfg
|
||||
|
||||
return result
|
@ -126,7 +126,7 @@ class FilterModule(object):
|
||||
self.is_feature_enabled(applications, 'portfolio_iframe', application_id)
|
||||
and directive == 'frame-ancestors'
|
||||
):
|
||||
domain = domains.get(application_id) # e.g. "sub.example.com" or "example.com"
|
||||
domain = domains.get('portfolio')[0] # e.g. "sub.example.com" or "example.com"
|
||||
# Extract the second-level + top-level domain and prefix with "*."
|
||||
sld_tld = ".".join(domain.split(".")[-2:]) # yields "example.com"
|
||||
tokens.append(f"*.{sld_tld}") # yields "*.example.com"
|
||||
|
@ -8,9 +8,8 @@ class FilterModule(object):
|
||||
"""
|
||||
Build a flat list of redirect mappings for all apps:
|
||||
- source: each alias domain
|
||||
- target: the first canonical domain (app.domains.canonical[0] or default)
|
||||
|
||||
Logic for computing aliases and canonicals is identical to alias_domains_map + canonical_domains_map.
|
||||
- target: the first canonical domain
|
||||
Skip mappings where source == target, since they make no sense.
|
||||
"""
|
||||
def parse_entry(domains_cfg, key, app_id):
|
||||
if key not in domains_cfg:
|
||||
@ -55,12 +54,9 @@ class FilterModule(object):
|
||||
for app_id, cfg in apps.items():
|
||||
domains_cfg = cfg.get('domains')
|
||||
if domains_cfg is None:
|
||||
# no domains key → no aliases
|
||||
alias_map[app_id] = []
|
||||
continue
|
||||
|
||||
if isinstance(domains_cfg, dict) and not domains_cfg:
|
||||
# empty domains dict → only default
|
||||
alias_map[app_id] = [default_domain(app_id, primary_domain)]
|
||||
continue
|
||||
|
||||
@ -79,13 +75,16 @@ class FilterModule(object):
|
||||
|
||||
alias_map[app_id] = aliases
|
||||
|
||||
# 3) Build flat list of {source, target} entries
|
||||
# 3) Build flat list of {source, target} entries,
|
||||
# skipping self-mappings
|
||||
mappings = []
|
||||
for app_id, sources in alias_map.items():
|
||||
# pick first canonical domain as target
|
||||
canon_list = canonical_map.get(app_id, [])
|
||||
target = canon_list[0] if canon_list else default_domain(app_id, primary_domain)
|
||||
for src in sources:
|
||||
if src == target:
|
||||
# skip self-redirects
|
||||
continue
|
||||
mappings.append({
|
||||
'source': src,
|
||||
'target': target
|
||||
|
@ -5,33 +5,40 @@ class FilterModule(object):
|
||||
def filters(self):
|
||||
return {'generate_base_sld_domains': self.generate_base_sld_domains}
|
||||
|
||||
def generate_base_sld_domains(self, domains_dict, redirect_mappings):
|
||||
def generate_base_sld_domains(self, domains_list):
|
||||
"""
|
||||
Flatten domains_dict und redirect_mappings, extrahiere SLDs (z.B. example.com),
|
||||
dedupe und sortiere.
|
||||
Given a list of hostnames, extract the second-level domain (SLD.TLD) for any hostname
|
||||
with two or more labels, return single-label hostnames as-is, and reject IPs,
|
||||
empty or malformed strings, and non-strings. Deduplicate and sort.
|
||||
"""
|
||||
def _flatten(domains):
|
||||
flat = []
|
||||
for v in (domains or {}).values():
|
||||
if isinstance(v, str):
|
||||
flat.append(v)
|
||||
elif isinstance(v, list):
|
||||
flat.extend(v)
|
||||
elif isinstance(v, dict):
|
||||
flat.extend(v.values())
|
||||
return flat
|
||||
if not isinstance(domains_list, list):
|
||||
raise AnsibleFilterError(
|
||||
f"generate_base_sld_domains expected a list, got {type(domains_list).__name__}"
|
||||
)
|
||||
|
||||
try:
|
||||
flat = _flatten(domains_dict)
|
||||
for mapping in redirect_mappings or []:
|
||||
src = mapping.get('source')
|
||||
if isinstance(src, str):
|
||||
flat.append(src)
|
||||
elif isinstance(src, list):
|
||||
flat.extend(src)
|
||||
ip_pattern = re.compile(r'^\d{1,3}(?:\.\d{1,3}){3}$')
|
||||
results = set()
|
||||
|
||||
pattern = re.compile(r'^(?:.*\.)?([^.]+\.[^.]+)$')
|
||||
slds = {m.group(1) for d in flat if (m := pattern.match(d))}
|
||||
return sorted(slds)
|
||||
except Exception as exc:
|
||||
raise AnsibleFilterError(f"generate_base_sld_domains failed: {exc}")
|
||||
for hostname in domains_list:
|
||||
# type check
|
||||
if not isinstance(hostname, str):
|
||||
raise AnsibleFilterError(f"Invalid domain entry (not a string): {hostname!r}")
|
||||
|
||||
# malformed or empty
|
||||
if not hostname or hostname.startswith('.') or hostname.endswith('.') or '..' in hostname:
|
||||
raise AnsibleFilterError(f"Invalid domain entry (malformed): {hostname!r}")
|
||||
|
||||
# IP addresses disallowed
|
||||
if ip_pattern.match(hostname):
|
||||
raise AnsibleFilterError(f"IP addresses not allowed: {hostname!r}")
|
||||
|
||||
# single-label hostnames
|
||||
labels = hostname.split('.')
|
||||
if len(labels) == 1:
|
||||
results.add(hostname)
|
||||
else:
|
||||
# always keep only the last two labels (SLD.TLD)
|
||||
sld = ".".join(labels[-2:])
|
||||
results.add(sld)
|
||||
|
||||
return sorted(results)
|
@ -1,97 +0,0 @@
|
||||
from ansible.errors import AnsibleFilterError
|
||||
import sys
|
||||
import os
|
||||
import yaml
|
||||
|
||||
class FilterModule(object):
|
||||
|
||||
def filters(self):
|
||||
return {
|
||||
"add_domain_if_group": self.add_domain_if_group,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def add_domain_if_group(domains_dict, domain_key, domain_value, group_names):
|
||||
"""
|
||||
Add {domain_key: domain_value} to domains_dict if either:
|
||||
1) domain_key is in group_names (direct inclusion), or
|
||||
2) domain_key is among collected application_id values of roles
|
||||
reachable from any group in group_names via recursive dependencies.
|
||||
|
||||
Parameters:
|
||||
domains_dict: existing dict of domains
|
||||
domain_key: name of the application to check
|
||||
domain_value: domain or dict/list of domains to assign
|
||||
group_names: list of active group (role/application) names
|
||||
"""
|
||||
try:
|
||||
result = dict(domains_dict)
|
||||
|
||||
# Direct group match: if the application name itself is in group_names
|
||||
if domain_key in group_names:
|
||||
result[domain_key] = domain_value
|
||||
return result
|
||||
|
||||
# Determine plugin directory based on filter plugin module if available
|
||||
plugin_dir = None
|
||||
for module in sys.modules.values():
|
||||
fm = getattr(module, 'FilterModule', None)
|
||||
if fm is not None:
|
||||
try:
|
||||
# Access staticmethod, compare underlying function
|
||||
if getattr(fm, 'add_domain_if_group') is DomainFilterUtil.add_domain_if_group:
|
||||
plugin_dir = os.path.dirname(module.__file__)
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if plugin_dir:
|
||||
# The plugin_dir is the filter_plugins directory; project_root is one level up
|
||||
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
|
||||
else:
|
||||
# Fallback: locate project root relative to this utility file
|
||||
plugin_dir = os.path.dirname(__file__)
|
||||
project_root = os.path.abspath(os.path.join(plugin_dir, '..'))
|
||||
|
||||
roles_dir = os.path.join(project_root, 'roles')
|
||||
|
||||
# Collect all roles reachable from the active groups
|
||||
def collect_roles(role_name, collected):
|
||||
if role_name in collected:
|
||||
return
|
||||
collected.add(role_name)
|
||||
meta_path = os.path.join(roles_dir, role_name, 'meta', 'main.yml')
|
||||
if os.path.isfile(meta_path):
|
||||
with open(meta_path) as f:
|
||||
meta = yaml.safe_load(f) or {}
|
||||
for dep in meta.get('dependencies', []):
|
||||
if isinstance(dep, str):
|
||||
dep_name = dep
|
||||
elif isinstance(dep, dict):
|
||||
dep_name = dep.get('role') or dep.get('name')
|
||||
else:
|
||||
continue
|
||||
collect_roles(dep_name, collected)
|
||||
|
||||
included_roles = set()
|
||||
for grp in group_names:
|
||||
collect_roles(grp, included_roles)
|
||||
|
||||
# Gather application_ids from each included role
|
||||
app_ids = set()
|
||||
for role in included_roles:
|
||||
vars_main = os.path.join(roles_dir, role, 'vars', 'main.yml')
|
||||
if os.path.isfile(vars_main):
|
||||
with open(vars_main) as f:
|
||||
vars_data = yaml.safe_load(f) or {}
|
||||
app_id = vars_data.get('application_id')
|
||||
if app_id:
|
||||
app_ids.add(app_id)
|
||||
|
||||
# Indirect inclusion: match by application_id
|
||||
if domain_key in app_ids:
|
||||
result[domain_key] = domain_value
|
||||
|
||||
return result
|
||||
except Exception as exc:
|
||||
raise AnsibleFilterError(f"add_domain_if_group failed: {exc}")
|
122
filter_plugins/load_configuration.py
Normal file
122
filter_plugins/load_configuration.py
Normal file
@ -0,0 +1,122 @@
|
||||
import os
|
||||
import yaml
|
||||
import re
|
||||
from ansible.errors import AnsibleFilterError
|
||||
|
||||
# in-memory cache: application_id → (parsed_yaml, is_nested)
|
||||
_cfg_cache = {}
|
||||
|
||||
def load_configuration(application_id, key):
|
||||
if not isinstance(key, str):
|
||||
raise AnsibleFilterError("Key must be a dotted-string, e.g. 'features.matomo'")
|
||||
|
||||
# locate roles/
|
||||
here = os.path.dirname(__file__)
|
||||
root = os.path.abspath(os.path.join(here, '..'))
|
||||
roles_dir = os.path.join(root, 'roles')
|
||||
if not os.path.isdir(roles_dir):
|
||||
raise AnsibleFilterError(f"Roles directory not found at {roles_dir}")
|
||||
|
||||
# first time? load & cache
|
||||
if application_id not in _cfg_cache:
|
||||
config_path = None
|
||||
|
||||
# 1) primary: vars/main.yml declares it
|
||||
for role in os.listdir(roles_dir):
|
||||
mv = os.path.join(roles_dir, role, 'vars', 'main.yml')
|
||||
if os.path.exists(mv):
|
||||
try:
|
||||
md = yaml.safe_load(open(mv)) or {}
|
||||
except Exception:
|
||||
md = {}
|
||||
if md.get('application_id') == application_id:
|
||||
cf = os.path.join(roles_dir, role, 'vars', 'configuration.yml')
|
||||
if not os.path.exists(cf):
|
||||
raise AnsibleFilterError(
|
||||
f"Role '{role}' declares '{application_id}' but missing configuration.yml"
|
||||
)
|
||||
config_path = cf
|
||||
break
|
||||
|
||||
# 2) fallback nested
|
||||
if config_path is None:
|
||||
for role in os.listdir(roles_dir):
|
||||
cf = os.path.join(roles_dir, role, 'vars', 'configuration.yml')
|
||||
if not os.path.exists(cf):
|
||||
continue
|
||||
try:
|
||||
dd = yaml.safe_load(open(cf)) or {}
|
||||
except Exception:
|
||||
dd = {}
|
||||
if isinstance(dd, dict) and application_id in dd:
|
||||
config_path = cf
|
||||
break
|
||||
|
||||
# 3) fallback flat
|
||||
if config_path is None:
|
||||
for role in os.listdir(roles_dir):
|
||||
cf = os.path.join(roles_dir, role, 'vars', 'configuration.yml')
|
||||
if not os.path.exists(cf):
|
||||
continue
|
||||
try:
|
||||
dd = yaml.safe_load(open(cf)) or {}
|
||||
except Exception:
|
||||
dd = {}
|
||||
# flat style: dict with all non-dict values
|
||||
if isinstance(dd, dict) and not any(isinstance(v, dict) for v in dd.values()):
|
||||
config_path = cf
|
||||
break
|
||||
|
||||
if config_path is None:
|
||||
return None
|
||||
|
||||
# parse once
|
||||
try:
|
||||
parsed = yaml.safe_load(open(config_path)) or {}
|
||||
except Exception as e:
|
||||
raise AnsibleFilterError(f"Error loading configuration.yml at {config_path}: {e}")
|
||||
|
||||
# detect nested vs flat
|
||||
is_nested = isinstance(parsed, dict) and (application_id in parsed)
|
||||
_cfg_cache[application_id] = (parsed, is_nested)
|
||||
|
||||
parsed, is_nested = _cfg_cache[application_id]
|
||||
|
||||
# pick base entry
|
||||
entry = parsed[application_id] if is_nested else parsed
|
||||
|
||||
# resolve dotted key
|
||||
key_parts = key.split('.')
|
||||
for part in key_parts:
|
||||
# Check if part has an index (e.g., domains.canonical[0])
|
||||
match = re.match(r'([^\[]+)\[([0-9]+)\]', part)
|
||||
if match:
|
||||
part, index = match.groups()
|
||||
index = int(index)
|
||||
if isinstance(entry, dict) and part in entry:
|
||||
entry = entry[part]
|
||||
# Check if entry is a list and access the index
|
||||
if isinstance(entry, list) and 0 <= index < len(entry):
|
||||
entry = entry[index]
|
||||
else:
|
||||
raise AnsibleFilterError(
|
||||
f"Index '{index}' out of range for key '{part}' in application '{application_id}'"
|
||||
)
|
||||
else:
|
||||
raise AnsibleFilterError(
|
||||
f"Key '{part}' not found under application '{application_id}'"
|
||||
)
|
||||
else:
|
||||
if isinstance(entry, dict) and part in entry:
|
||||
entry = entry[part]
|
||||
else:
|
||||
raise AnsibleFilterError(
|
||||
f"Key '{part}' not found under application '{application_id}'"
|
||||
)
|
||||
|
||||
return entry
|
||||
|
||||
|
||||
class FilterModule(object):
|
||||
def filters(self):
|
||||
return {'load_configuration': load_configuration}
|
42
filter_plugins/merge_mapping.py
Normal file
42
filter_plugins/merge_mapping.py
Normal file
@ -0,0 +1,42 @@
|
||||
# filter_plugins/merge_mapping.py
|
||||
|
||||
from ansible.errors import AnsibleFilterError
|
||||
|
||||
def merge_mapping(list1, list2, key_name='source'):
|
||||
"""
|
||||
Merge two lists of dicts on a given key.
|
||||
- list1, list2: each must be a List[Dict]
|
||||
- key_name: the field to match on
|
||||
If both lists contain an item with the same key_name value,
|
||||
their dictionaries are merged (fields from list2 overwrite or add to list1).
|
||||
"""
|
||||
if not isinstance(list1, list) or not isinstance(list2, list):
|
||||
raise AnsibleFilterError("merge_mapping expects two lists")
|
||||
|
||||
merged = {}
|
||||
# First, copy items from list1
|
||||
for item in list1:
|
||||
if key_name not in item:
|
||||
raise AnsibleFilterError(f"Item {item} is missing the key '{key_name}'")
|
||||
merged[item[key_name]] = item.copy()
|
||||
|
||||
# Then merge in items from list2
|
||||
for item in list2:
|
||||
if key_name not in item:
|
||||
raise AnsibleFilterError(f"Item {item} is missing the key '{key_name}'")
|
||||
k = item[key_name]
|
||||
if k in merged:
|
||||
# update will overwrite existing fields or add new ones
|
||||
merged[k].update(item)
|
||||
else:
|
||||
merged[k] = item.copy()
|
||||
|
||||
# Return as a list of dicts again
|
||||
return list(merged.values())
|
||||
|
||||
|
||||
class FilterModule(object):
|
||||
def filters(self):
|
||||
return {
|
||||
'merge_mapping': merge_mapping,
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
# roles/<your-role>/filter_plugins/redirect_filters.py
|
||||
from ansible.errors import AnsibleFilterError
|
||||
|
||||
class FilterModule(object):
|
||||
"""
|
||||
Custom filters for redirect domain mappings
|
||||
"""
|
||||
|
||||
def filters(self):
|
||||
return {
|
||||
"add_redirect_if_group": self.add_redirect_if_group,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def add_redirect_if_group(redirect_list, group, source, target, group_names):
|
||||
"""
|
||||
Append {"source": source, "target": target} to *redirect_list*
|
||||
**only** if *group* is contained in *group_names*.
|
||||
|
||||
Usage in Jinja:
|
||||
{{ redirect_list
|
||||
| add_redirect_if_group('lam',
|
||||
'ldap.' ~ primary_domain,
|
||||
domains | get_domain('lam'),
|
||||
group_names) }}
|
||||
"""
|
||||
try:
|
||||
# Make a copy so we don’t mutate the original list in place
|
||||
redirects = list(redirect_list)
|
||||
|
||||
if group in group_names:
|
||||
redirects.append({"source": source, "target": target})
|
||||
|
||||
return redirects
|
||||
|
||||
except Exception as exc:
|
||||
raise AnsibleFilterError(f"add_redirect_if_group failed: {exc}")
|
@ -50,7 +50,7 @@ ports:
|
||||
keycloak: 8032
|
||||
lam: 8033
|
||||
phpmyadmin: 8034
|
||||
snipe_it: 8035
|
||||
snipe-it: 8035
|
||||
sphinx: 8036
|
||||
phpldapadmin: 8037
|
||||
fusiondirectory: 8038
|
||||
|
@ -62,7 +62,7 @@ defaults_networks:
|
||||
subnet: 192.168.102.128/28
|
||||
pgadmin:
|
||||
subnet: 192.168.102.144/28
|
||||
snipe_it:
|
||||
snipe-it:
|
||||
subnet: 192.168.102.160/28
|
||||
taiga:
|
||||
subnet: 192.168.102.176/28
|
||||
|
@ -1,6 +1,2 @@
|
||||
defaults_domains: "{{ defaults_applications | canonical_domains_map(primary_domain) }}"
|
||||
|
||||
defaults_redirect_domain_mappings: "{{ applications | domain_mappings(primary_domain) }}"
|
||||
|
||||
# Domains which are deprecated and should be cleaned up
|
||||
deprecated_domains: []
|
@ -9,12 +9,12 @@ defaults_service_provider:
|
||||
city: "Cybertown"
|
||||
postal_code: "00001"
|
||||
country: "Nexusland"
|
||||
logo: "{{ applications.assets_server.url | safe_var | safe_join('img/logo.png') }}"
|
||||
logo: "{{ applications['assets-server'].url | safe_var | safe_join('img/logo.png') }}"
|
||||
platform:
|
||||
titel: "CyMaIS Demo"
|
||||
subtitel: "The Future of Self-Hosted Infrastructure. Secure. Automated. Sovereign."
|
||||
logo: "{{ applications.assets_server.url | safe_var | safe_join('img/logo.png') }}"
|
||||
favicon: "{{ applications.assets_server.url | safe_var | safe_join('img/favicon.ico') }}"
|
||||
logo: "{{ applications['assets-server'].url | safe_var | safe_join('img/logo.png') }}"
|
||||
favicon: "{{ applications['assets-server'].url | safe_var | safe_join('img/favicon.ico') }}"
|
||||
contact:
|
||||
bluesky: >-
|
||||
{{ ('@' ~ users.administrator.username ~ '.' ~ domains.bluesky.api)
|
||||
|
@ -1,4 +1,3 @@
|
||||
---
|
||||
- name: "include docker-compose role"
|
||||
include_role:
|
||||
name: docker-compose
|
||||
@ -10,8 +9,8 @@
|
||||
domain: "{{ item.domain }}"
|
||||
http_port: "{{ item.http_port }}"
|
||||
loop:
|
||||
- { domain: domains.[application_id].api, http_port: ports.localhost.http.bluesky_api }
|
||||
- { domain: domains.[application_id].web, http_port: ports.localhost.http.bluesky_web }
|
||||
- { domain: "{{domains.[application_id].api", http_port: "{{ports.localhost.http.bluesky_api}}" }
|
||||
- { domain: "{{domains.[application_id].web}}", http_port: "{{ports.localhost.http.bluesky_web}}" }
|
||||
|
||||
# The following lines should be removed when the following issue is closed:
|
||||
# https://github.com/bluesky-social/pds/issues/52
|
||||
|
@ -5,4 +5,4 @@ social_app_path: "{{ docker_compose.directories.services }}/social-
|
||||
# https://github.com/bluesky-social/pds/issues/52
|
||||
pdsadmin_folder_path: "{{ docker_compose.directories.volumes }}/pdsadmin"
|
||||
pdsadmin_file_path: "{{pdsadmin_folder_path}}/pdsadmin"
|
||||
pdsadmin_temporary_tar_path: "/tmp/pdsadmin.tar.gz"
|
||||
pdsadmin_temporary_tar_path: "/tmp/pdsadmin.tar.gz"
|
||||
|
@ -1,5 +1,3 @@
|
||||
{# receives https certificate and setup proxy with domain replace #}
|
||||
|
||||
- name: "include role receive certbot certificate"
|
||||
include_role:
|
||||
name: nginx-https-get-cert
|
||||
|
@ -116,7 +116,7 @@ portfolio_menu_categories:
|
||||
- accounting
|
||||
- invoices
|
||||
- akaunting
|
||||
- snipe_it
|
||||
- snipe-it
|
||||
|
||||
Events:
|
||||
description: "Event and ticket management tools"
|
||||
|
@ -5,12 +5,12 @@ services:
|
||||
{% include 'roles/docker-central-database/templates/services/' + database_type + '.yml.j2' %}
|
||||
|
||||
application:
|
||||
image: grokability/snipe-it:{{applications.snipe_it.version}}
|
||||
image: grokability/snipe-it:{{applications[application_id].version}}
|
||||
{% include 'roles/docker-compose/templates/services/base.yml.j2' %}
|
||||
volumes:
|
||||
- data:/var/lib/snipeit
|
||||
ports:
|
||||
- "127.0.0.1:{{ports.localhost.http.snipe_it}}:80"
|
||||
- "127.0.0.1:{{ports.localhost.http[application_id]}}:80"
|
||||
{% include 'templates/docker/container/depends-on-database-redis.yml.j2' %}
|
||||
{% include 'templates/docker/container/networks.yml.j2' %}
|
||||
healthcheck:
|
3
roles/docker-snipe-it/vars/main.yml
Normal file
3
roles/docker-snipe-it/vars/main.yml
Normal file
@ -0,0 +1,3 @@
|
||||
application_id: "snipe-it"
|
||||
database_password: "{{applications[application_id].credentials.database_password}}"
|
||||
database_type: "mariadb"
|
@ -1,3 +0,0 @@
|
||||
application_id: "snipe_it"
|
||||
database_password: "{{applications.snipe_it.credentials.database_password}}"
|
||||
database_type: "mariadb"
|
@ -47,7 +47,7 @@ for filename in os.listdir(config_path):
|
||||
# Prepare the URL and expected status codes
|
||||
url = f"{{ web_protocol }}://{domain}"
|
||||
|
||||
redirected_domains = [domain['source'] for domain in {{redirect_domain_mappings}}]
|
||||
redirected_domains = [domain['source'] for domain in {{current_play_redirect_domain_mappings}}]
|
||||
{%- if domains.mailu | safe_var | bool %}
|
||||
redirected_domains.append("{{domains | get_domain('mailu')}}")
|
||||
{%- endif %}
|
||||
|
@ -4,4 +4,5 @@ caa_entries:
|
||||
# - tag: issuewild
|
||||
# value: "letsencrypt.org"
|
||||
# - tag: iodef
|
||||
# value: "mailto:{{ users.administrator.email }}"
|
||||
# value: "mailto:{{ users.administrator.email }}"
|
||||
base_sld_domains: "{{ current_play_domains_all | generate_base_sld_domains }}"
|
@ -8,7 +8,7 @@
|
||||
- name: Generate SAN certificate with certbundle
|
||||
command: >-
|
||||
certbundle
|
||||
--domains "{{ all_domains | join(',') }}"
|
||||
--domains "{{ current_play_domains_all | join(',') }}"
|
||||
--certbot-email "{{ users.administrator.email }}"
|
||||
--certbot-acme-challenge-method "{{ certbot_acme_challenge_method }}"
|
||||
--chunk-size 100
|
||||
|
@ -7,7 +7,7 @@ _paq.push(["trackPageView"]);
|
||||
_paq.push(["trackAllContentImpressions"]);
|
||||
_paq.push(["enableLinkTracking"]);
|
||||
(function() {
|
||||
var u="//{{domains.matomo}}/";
|
||||
var u="//{{ domains | get_domain('matomo') }}/";
|
||||
_paq.push(["setTrackerUrl", u+"matomo.php"]);
|
||||
_paq.push(["setSiteId", "{{matomo_site_id}}"]);
|
||||
var d=document, g=d.createElement("script"), s=d.getElementsByTagName("script")[0];
|
||||
|
@ -1,2 +1,2 @@
|
||||
base_domain: "{{ domain | regex_replace('^(?:.*\\.)?(.+\\..+)$', '\\1') }}"
|
||||
verification_url: "{{ web_protocol }}://{{domains.matomo}}/index.php?module=API&method=SitesManager.getSitesIdFromSiteUrl&url=https://{{base_domain}}&format=json&token_auth={{applications.matomo.credentials.auth_token}}"
|
||||
verification_url: "{{ web_protocol }}://{{domains | get_domain('matomo')}}/index.php?module=API&method=SitesManager.getSitesIdFromSiteUrl&url=https://{{base_domain}}&format=json&token_auth={{applications.matomo.credentials.auth_token}}"
|
@ -1,7 +1,7 @@
|
||||
---
|
||||
- name: Filter www-prefixed domains from all_domains
|
||||
- name: Filter www-prefixed domains from current_play_domains_all
|
||||
set_fact:
|
||||
www_domains: "{{ all_domains | select('match', '^www\\.') | list }}"
|
||||
www_domains: "{{ current_play_domains_all | select('match', '^www\\.') | list }}"
|
||||
|
||||
- name: Include nginx-redirect-domains role for www-to-bare redirects
|
||||
include_role:
|
||||
|
@ -1,4 +1,3 @@
|
||||
source_directory: "{{ playbook_dir }}/assets" # Directory from which the assets will be copied
|
||||
url: >-
|
||||
{{ (web_protocol ~ '://' ~ domains.file_server | safe_var ~ '/assets')
|
||||
if domains.file_server | safe_var else '' }}
|
||||
source_directory: "{{ playbook_dir }}/assets"
|
||||
url: "{{ web_protocol ~ '://' ~ 'file-server'
|
||||
| load_configuration('domains.canonical[0]') ~ '/assets' }}"
|
@ -1,3 +1,3 @@
|
||||
application_id: "assets_server" # Application identifier
|
||||
application_id: "assets-server" # Application identifier
|
||||
source_directory: "{{ applications[application_id].source_directory }}/" # Source directory from which the files are coming from
|
||||
target_directory: "{{ nginx.directories.data.files }}assets" # Directory to which the files will be copied
|
||||
|
@ -31,5 +31,5 @@ The Nginx File Server role is ideal for hosting static files, sharing resources
|
||||
|
||||
- [Nginx Official Website](https://nginx.org/)
|
||||
- [Let's Encrypt](https://letsencrypt.org/)
|
||||
- [HTTP File Server (Wikipedia)](https://en.wikipedia.org/wiki/HTTP_File_Server)
|
||||
- [HTTP File Server (Wikipedia)](https://en.wikipedia.org/wiki/HTTP_file-server)
|
||||
- [HTTPS (Wikipedia)](https://en.wikipedia.org/wiki/HTTPS)
|
||||
|
@ -1,2 +1,2 @@
|
||||
application_id: "file_server"
|
||||
domain: "{{ domains | get_domain(application_id) }}"
|
||||
application_id: "file-server"
|
||||
domain: "{{ domains | get_domain(application_id) }}"
|
||||
|
@ -9,44 +9,55 @@
|
||||
set_fact:
|
||||
system_email: "{{ default_system_email | combine(system_email | default({}, true), recursive=True) }}"
|
||||
|
||||
- name: Merge current play applications
|
||||
set_fact:
|
||||
current_play_applications: >-
|
||||
{{
|
||||
defaults_applications |
|
||||
combine(applications | default({}, true), recursive=True) |
|
||||
applications_if_group_and_deps(group_names)
|
||||
}}
|
||||
|
||||
- name: Merge current play domain definitions
|
||||
set_fact:
|
||||
current_play_domains: >-
|
||||
{{ current_play_applications |
|
||||
canonical_domains_map(primary_domain) |
|
||||
combine(domains | default({}, true), recursive=True)
|
||||
}}
|
||||
|
||||
- name: Set current play all domains incl. www redirect if enabled
|
||||
set_fact:
|
||||
current_play_domains_all: >-
|
||||
{{
|
||||
current_play_domains |
|
||||
generate_all_domains(
|
||||
('www_redirect' in group_names)
|
||||
)
|
||||
}}
|
||||
|
||||
- name: Set current play redirect domain mappings
|
||||
set_fact:
|
||||
current_play_redirect_domain_mappings: >-
|
||||
{{
|
||||
current_play_applications |
|
||||
domain_mappings(primary_domain) |
|
||||
merge_mapping(redirect_domain_mappings, 'source')
|
||||
}}
|
||||
|
||||
- name: Merge application definitions
|
||||
set_fact:
|
||||
applications: "{{ defaults_applications | combine(applications | default({}, true), recursive=True) }}"
|
||||
|
||||
- name: Merge domain definitions
|
||||
- name: Merge domain definitions for all domains
|
||||
set_fact:
|
||||
domains: "{{ defaults_domains | combine(domains | default({}, true), recursive=True) }}"
|
||||
|
||||
- name: Merge redirect domain definitions into dictionary
|
||||
set_fact:
|
||||
combined_mapping: >-
|
||||
{{
|
||||
(defaults_redirect_domain_mappings | items2dict(key_name='source', value_name='target'))
|
||||
| combine(
|
||||
(redirect_domain_mappings | default([]) | items2dict(key_name='source', value_name='target')),
|
||||
recursive=True
|
||||
)
|
||||
domains: >-
|
||||
{{
|
||||
defaults_applications |
|
||||
canonical_domains_map(primary_domain) |
|
||||
combine(domains | default({}, true), recursive=True)
|
||||
}}
|
||||
|
||||
- name: Transform combined mapping to list with source and target keys
|
||||
set_fact:
|
||||
redirect_domain_mappings: "{{ redirect_domain_mappings | default([]) + [ {'source': item.key, 'target': item.value} ] }}"
|
||||
loop: "{{ combined_mapping | dict2items }}"
|
||||
|
||||
# @todo implement
|
||||
# - name: Ensure features.integrated is set based on group membership
|
||||
# set_fact:
|
||||
# applications: "{{ applications | combine({ item.key: updated_app }, recursive=True) }}"
|
||||
# vars:
|
||||
# original_app: "{{ applications[item.key] | default({}) }}"
|
||||
# original_features: "{{ original_app.features | default({}) }}"
|
||||
# needs_integration: original_features.integrated is not defined
|
||||
# updated_features: >-
|
||||
# {{ original_features | combine({'integrated': (item.key in group_names)}) if needs_integration else original_features }}
|
||||
# updated_app: >-
|
||||
# {{ original_app | combine({'features': updated_features}) }}
|
||||
# loop: "{{ applications | dict2items }}"
|
||||
|
||||
- name: Merge networks definitions
|
||||
set_fact:
|
||||
networks: "{{ defaults_networks | combine(networks | default({}, true), recursive=True) }}"
|
||||
@ -63,34 +74,6 @@
|
||||
set_fact:
|
||||
service_provider: "{{ defaults_service_provider | combine(service_provider | default({}, true), recursive=True) }}"
|
||||
|
||||
- name: Build base_sld_domains (sld.tld) in one go
|
||||
set_fact:
|
||||
base_sld_domains: >-
|
||||
{{ domains
|
||||
| generate_base_sld_domains(redirect_domain_mappings)
|
||||
}}
|
||||
|
||||
- name: Set all domains incl. www redirect if enabled
|
||||
set_fact:
|
||||
all_domains: >-
|
||||
{{ domains
|
||||
| generate_all_domains(
|
||||
('www_redirect' in group_names)
|
||||
)
|
||||
}}
|
||||
|
||||
- name: "Merged Variables"
|
||||
# Add new merged variables here
|
||||
debug:
|
||||
msg:
|
||||
domains: "{{ domains }}"
|
||||
applications: "{{ applications }}"
|
||||
oidc: "{{ oidc }}"
|
||||
service_provider: "{{ service_provider }}"
|
||||
users: "{{ users }}"
|
||||
all_domains: "{{ all_domains }}"
|
||||
when: enable_debug | bool
|
||||
|
||||
- name: init root user
|
||||
include_role:
|
||||
name: user-root
|
||||
|
@ -32,7 +32,7 @@
|
||||
include_role:
|
||||
name: nginx-redirect-domains
|
||||
vars:
|
||||
domain_mappings: "{{redirect_domain_mappings}}"
|
||||
domain_mappings: "{{current_play_redirect_domain_mappings}}"
|
||||
|
||||
- name: setup www redirect
|
||||
when: ("www_redirect" in group_names)
|
||||
|
47
tests/integration/test_yaml_syntax.py
Normal file
47
tests/integration/test_yaml_syntax.py
Normal file
@ -0,0 +1,47 @@
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
import yaml
|
||||
|
||||
class TestYamlSyntax(unittest.TestCase):
|
||||
def test_all_yml_files_are_valid_yaml(self):
|
||||
"""
|
||||
Walk the entire repository, find all *.yml files and try to parse them
|
||||
with yaml.safe_load(). Fail the test if any file contains invalid YAML.
|
||||
"""
|
||||
repo_root = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), '..', '..')
|
||||
)
|
||||
|
||||
invalid = []
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(repo_root):
|
||||
# skip hidden directories (like .git, .venv, etc.)
|
||||
dirnames[:] = [d for d in dirnames if not d.startswith('.')]
|
||||
for fname in filenames:
|
||||
if not fname.endswith('.yml'):
|
||||
continue
|
||||
full = os.path.join(dirpath, fname)
|
||||
# skip any large auto‐generated files if needed:
|
||||
# if 'some/path/to/skip' in full: continue
|
||||
|
||||
try:
|
||||
with open(full, 'r') as f:
|
||||
yaml.safe_load(f)
|
||||
except yaml.YAMLError as e:
|
||||
invalid.append((full, str(e)))
|
||||
except Exception as e:
|
||||
invalid.append((full, f"Unexpected error: {e}"))
|
||||
|
||||
if invalid:
|
||||
msg_lines = [
|
||||
f"{path}: {err.splitlines()[0]}" # just the first line of the error
|
||||
for path, err in invalid
|
||||
]
|
||||
self.fail(
|
||||
"Found invalid YAML in the following files:\n" +
|
||||
"\n".join(msg_lines)
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
83
tests/unit/test_applications_if_group_and_deps.py
Normal file
83
tests/unit/test_applications_if_group_and_deps.py
Normal file
@ -0,0 +1,83 @@
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from unittest.mock import patch, mock_open
|
||||
from ansible.errors import AnsibleFilterError
|
||||
|
||||
# ensure filter_plugins is on the path
|
||||
dir_path = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), '../../filter_plugins')
|
||||
)
|
||||
sys.path.insert(0, dir_path)
|
||||
|
||||
from applications_if_group_and_deps import FilterModule
|
||||
|
||||
class TestApplicationsIfGroupAndDeps(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.filter = FilterModule()
|
||||
# minimal applications dict
|
||||
self.apps = {
|
||||
'app1': {'foo': 'bar'},
|
||||
'app2': {'baz': 'qux'},
|
||||
'roleA': {'some': 'cfg'},
|
||||
}
|
||||
|
||||
def test_invalid_inputs(self):
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.filter.applications_if_group_and_deps('not a dict', [])
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.filter.applications_if_group_and_deps({}, 'not a list')
|
||||
|
||||
def test_direct_inclusion(self):
|
||||
# if an app key is directly in group_names it should be returned
|
||||
groups = ['app1', 'unrelated']
|
||||
result = self.filter.applications_if_group_and_deps(self.apps, groups)
|
||||
self.assertEqual(set(result.keys()), {'app1'})
|
||||
|
||||
@patch('applications_if_group_and_deps.yaml.safe_load')
|
||||
@patch('applications_if_group_and_deps.open', new_callable=mock_open)
|
||||
@patch('applications_if_group_and_deps.os.path.isfile')
|
||||
def test_indirect_inclusion_via_dependencies(self, mock_isfile, mock_file, mock_yaml):
|
||||
"""
|
||||
Simulate that group 'groupX' has a dependency on 'roleA', and that
|
||||
roleA's vars/main.yml contains application_id: 'roleA'.
|
||||
Then passing group_names=['groupX'] should include 'roleA'.
|
||||
"""
|
||||
# pretend both meta/main.yml and vars/main.yml exist
|
||||
mock_isfile.return_value = True
|
||||
|
||||
# safe_load() calls:
|
||||
# 1) groupX/meta/main.yml → dependencies ['roleA']
|
||||
# 2) roleA/meta/main.yml → dependencies []
|
||||
# 3) roleA/vars/main.yml → application_id 'roleA'
|
||||
mock_yaml.side_effect = [
|
||||
{'dependencies': ['roleA']},
|
||||
{'dependencies': []},
|
||||
{'application_id': 'roleA'}
|
||||
]
|
||||
|
||||
result = self.filter.applications_if_group_and_deps(self.apps, ['groupX'])
|
||||
self.assertEqual(set(result.keys()), {'roleA'})
|
||||
|
||||
@patch('applications_if_group_and_deps.yaml.safe_load')
|
||||
@patch('applications_if_group_and_deps.open', new_callable=mock_open)
|
||||
@patch('applications_if_group_and_deps.os.path.isfile')
|
||||
def test_no_vars_file(self, mock_isfile, mock_file, mock_yaml):
|
||||
"""
|
||||
If a meta/main.yml dependency exists but vars/main.yml is missing,
|
||||
that role won't contribute an application_id, so nothing is returned.
|
||||
"""
|
||||
# meta exists, vars does not
|
||||
def isfile_side(path):
|
||||
return path.endswith('meta/main.yml')
|
||||
mock_isfile.side_effect = isfile_side
|
||||
|
||||
# meta declares dependency
|
||||
mock_yaml.return_value = {'dependencies': ['roleA']}
|
||||
|
||||
result = self.filter.applications_if_group_and_deps(self.apps, ['groupX'])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -175,7 +175,7 @@ class TestCspFilters(unittest.TestCase):
|
||||
# Ensure feature enabled and domain set
|
||||
self.apps['app1']['features']['portfolio_iframe'] = True
|
||||
# simulate a subdomain for the application
|
||||
self.domains['app1'] = 'sub.domain-example.com'
|
||||
self.domains['portfolio'] = ['domain-example.com']
|
||||
|
||||
header = self.filter.build_csp_header(self.apps, 'app1', self.domains, web_protocol='https')
|
||||
# Expect '*.domain-example.com' in the frame-ancestors directive
|
||||
|
@ -5,44 +5,66 @@ import os
|
||||
# Ensure filter_plugins directory is on the path
|
||||
sys.path.insert(
|
||||
0,
|
||||
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
||||
os.path.abspath(os.path.join(os.path.dirname(__file__), '../filter_plugins'))
|
||||
)
|
||||
|
||||
from generate_base_sld_domains import FilterModule
|
||||
from ansible.errors import AnsibleFilterError
|
||||
|
||||
class TestGenerateBaseSldDomains(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.filter = FilterModule().generate_base_sld_domains
|
||||
|
||||
def test_simple_string_and_redirect(self):
|
||||
domains = {'app': 'sub.example.com'}
|
||||
redirects = [{'source': 'alias.example.com'}]
|
||||
result = self.filter(domains, redirects)
|
||||
def test_simple_list(self):
|
||||
domains = [
|
||||
'sub.example.com',
|
||||
'alias.example.com',
|
||||
'example.com'
|
||||
]
|
||||
result = self.filter(domains)
|
||||
self.assertEqual(result, ['example.com'])
|
||||
|
||||
def test_without_redirect_mappings(self):
|
||||
domains = {
|
||||
'a': 'a.co',
|
||||
'b': ['b.co', 'sub.c.co'],
|
||||
'c': {'x': 'x.co'}
|
||||
}
|
||||
result = self.filter(domains, None)
|
||||
self.assertEqual(result, ['a.co', 'b.co', 'c.co', 'x.co'])
|
||||
def test_mixed_tlds_and_subdomains(self):
|
||||
domains = [
|
||||
'a.co', 'b.co', 'sub.b.co', 'x.co', 'www.x.co'
|
||||
]
|
||||
result = self.filter(domains)
|
||||
self.assertEqual(result, ['a.co', 'b.co', 'x.co'])
|
||||
|
||||
def test_redirect_list_sources(self):
|
||||
domains = {'app': 'app.domain.org'}
|
||||
redirects = [{'source': ['alias.domain.org', 'deep.sub.example.net']}]
|
||||
result = self.filter(domains, redirects)
|
||||
self.assertEqual(result, ['domain.org', 'example.net'])
|
||||
def test_invalid_non_string_raise(self):
|
||||
for bad in [42, None]:
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.filter([bad])
|
||||
|
||||
def test_duplicate_entries_and_sorting(self):
|
||||
domains = {
|
||||
'x': ['one.com', 'sub.one.com'],
|
||||
'y': 'two.com',
|
||||
'z': {'k': 'one.com'}
|
||||
}
|
||||
redirects = [{'source': 'deep.two.com'}]
|
||||
result = self.filter(domains, redirects)
|
||||
def test_localhost_allowed(self):
|
||||
domains = ['localhost']
|
||||
result = self.filter(domains)
|
||||
self.assertEqual(result, ['localhost'])
|
||||
|
||||
def test_ip_raises(self):
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.filter(['127.0.0.1'])
|
||||
|
||||
def test_nested_subdomains(self):
|
||||
domains = ['sub.sub2.one']
|
||||
result = self.filter(domains)
|
||||
self.assertEqual(result, ['sub2.one'])
|
||||
|
||||
def test_deeply_nested_subdomains(self):
|
||||
domains = ['sub3.sub2.sub1.one']
|
||||
result = self.filter(domains)
|
||||
self.assertEqual(result, ['sub1.one'])
|
||||
|
||||
def test_empty_and_malformed_raise(self):
|
||||
for bad in ['', '.', '...']:
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.filter([bad])
|
||||
|
||||
def test_sorting_and_duplicates(self):
|
||||
domains = [
|
||||
'one.com', 'sub.one.com', 'two.com', 'deep.two.com', 'one.com'
|
||||
]
|
||||
result = self.filter(domains)
|
||||
self.assertEqual(result, ['one.com', 'two.com'])
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -30,9 +30,7 @@ class TestDomainMappings(unittest.TestCase):
|
||||
def test_empty_domains_cfg(self):
|
||||
apps = {'app1': {'domains': {}}}
|
||||
default = 'app1.example.com'
|
||||
expected = [
|
||||
{'source': default, 'target': default}
|
||||
]
|
||||
expected = []
|
||||
result = self.filter.domain_mappings(apps, self.primary)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
@ -45,7 +43,6 @@ class TestDomainMappings(unittest.TestCase):
|
||||
default = 'app1.example.com'
|
||||
expected = [
|
||||
{'source': 'alias.com', 'target': default},
|
||||
{'source': default, 'target': default},
|
||||
]
|
||||
result = self.filter.domain_mappings(apps, self.primary)
|
||||
# order not important
|
||||
@ -84,10 +81,7 @@ class TestDomainMappings(unittest.TestCase):
|
||||
'app2': {'domains': {'canonical': ['c2.com']}},
|
||||
}
|
||||
expected = [
|
||||
# app1
|
||||
{'source': 'a1.com', 'target': 'app1.example.com'},
|
||||
{'source': 'app1.example.com', 'target': 'app1.example.com'},
|
||||
# app2
|
||||
{'source': 'app2.example.com', 'target': 'c2.com'},
|
||||
]
|
||||
result = self.filter.domain_mappings(apps, self.primary)
|
||||
|
@ -1,51 +0,0 @@
|
||||
import unittest
|
||||
|
||||
from filter_plugins.group_domain_filters import FilterModule
|
||||
|
||||
class TestAddDomainIfGroup(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.filter = FilterModule().filters()["add_domain_if_group"]
|
||||
|
||||
def test_add_string_value(self):
|
||||
result = self.filter({}, "akaunting", "accounting.example.org", ["akaunting"])
|
||||
self.assertEqual(result, {"akaunting": "accounting.example.org"})
|
||||
|
||||
def test_add_list_value(self):
|
||||
result = self.filter({}, "mastodon", ["microblog.example.org"], ["mastodon"])
|
||||
self.assertEqual(result, {"mastodon": ["microblog.example.org"]})
|
||||
|
||||
def test_add_dict_value(self):
|
||||
result = self.filter({}, "bluesky", {"web": "bskyweb.example.org", "api": "bluesky.example.org"}, ["bluesky"])
|
||||
self.assertEqual(result, {"bluesky": {"web": "bskyweb.example.org", "api": "bluesky.example.org"}})
|
||||
|
||||
def test_ignore_if_not_in_group(self):
|
||||
result = self.filter({}, "akaunting", "accounting.example.org", ["wordpress"])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
def test_merge_with_existing(self):
|
||||
initial = {"wordpress": ["blog.example.org"]}
|
||||
result = self.filter(initial, "akaunting", "accounting.example.org", ["akaunting"])
|
||||
self.assertEqual(result, {
|
||||
"wordpress": ["blog.example.org"],
|
||||
"akaunting": "accounting.example.org"
|
||||
})
|
||||
|
||||
def test_dict_is_not_mutated(self):
|
||||
base = {"keycloak": "auth.example.org"}
|
||||
copy = dict(base) # make a copy for comparison
|
||||
_ = self.filter(base, "akaunting", "accounting.example.org", ["akaunting"])
|
||||
self.assertEqual(base, copy) # original must stay unchanged
|
||||
|
||||
def test_multiple_adds_accumulate(self):
|
||||
result = {}
|
||||
result = self.filter(result, "akaunting", "accounting.example.org", ["akaunting", "wordpress"])
|
||||
result = self.filter(result, "wordpress", ["blog.example.org"], ["akaunting", "wordpress"])
|
||||
result = self.filter(result, "bluesky", {"web": "bskyweb.example.org", "api": "bluesky.example.org"}, ["bluesky"])
|
||||
self.assertEqual(result, {
|
||||
"akaunting": "accounting.example.org",
|
||||
"wordpress": ["blog.example.org"],
|
||||
"bluesky": {"web": "bskyweb.example.org", "api": "bluesky.example.org"},
|
||||
})
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -1,79 +0,0 @@
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
import yaml
|
||||
import unittest
|
||||
|
||||
# Import the filter module
|
||||
import filter_plugins.group_domain_filters as gdf_module
|
||||
|
||||
class TestAddDomainIfGroupRecursive(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Create a temporary project structure
|
||||
self.tempdir = tempfile.mkdtemp()
|
||||
fp_dir = os.path.join(self.tempdir, 'filter_plugins')
|
||||
roles_dir = os.path.join(self.tempdir, 'roles')
|
||||
os.makedirs(fp_dir, exist_ok=True)
|
||||
os.makedirs(roles_dir, exist_ok=True)
|
||||
# Point module __file__ so plugin_dir resolves correctly
|
||||
gdf_module.__file__ = os.path.join(fp_dir, 'group_domain_filters.py')
|
||||
self.roles_dir = roles_dir
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir)
|
||||
|
||||
def write_role(self, role_name, dependencies, application_id):
|
||||
"""
|
||||
Helper: write a role directory with meta/main.yml and vars/main.yml
|
||||
"""
|
||||
meta_dir = os.path.join(self.roles_dir, role_name, 'meta')
|
||||
vars_dir = os.path.join(self.roles_dir, role_name, 'vars')
|
||||
os.makedirs(meta_dir, exist_ok=True)
|
||||
os.makedirs(vars_dir, exist_ok=True)
|
||||
# Write meta/main.yml
|
||||
with open(os.path.join(meta_dir, 'main.yml'), 'w') as f:
|
||||
yaml.safe_dump({'dependencies': dependencies}, f)
|
||||
# Write vars/main.yml
|
||||
with open(os.path.join(vars_dir, 'main.yml'), 'w') as f:
|
||||
yaml.safe_dump({'application_id': application_id}, f)
|
||||
|
||||
def test_direct_application_id_in_group_names(self):
|
||||
# If domain_key (application_id) is directly in group_names
|
||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'app1', 'domain1', ['app1'])
|
||||
self.assertEqual(result, {'app1': 'domain1'})
|
||||
|
||||
def test_indirect_dependency_application_id(self):
|
||||
# roleA depends on roleB; roleB has application_id 'appB'
|
||||
self.write_role('roleA', ['roleB'], 'appA')
|
||||
self.write_role('roleB', [], 'appB')
|
||||
# group_names includes roleA, so appB should be reachable
|
||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appB', 'domainB', ['roleA'])
|
||||
self.assertEqual(result, {'appB': 'domainB'})
|
||||
|
||||
def test_multi_level_dependency_application_id(self):
|
||||
# roleX -> roleY -> roleZ; roleZ id is 'appZ'
|
||||
self.write_role('roleX', ['roleY'], 'appX')
|
||||
self.write_role('roleY', ['roleZ'], 'appY')
|
||||
self.write_role('roleZ', [], 'appZ')
|
||||
# Starting from roleX, appZ reachable
|
||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appZ', 'domainZ', ['roleX'])
|
||||
self.assertEqual(result, {'appZ': 'domainZ'})
|
||||
|
||||
def test_domain_key_for_parent_role(self):
|
||||
# roleParent has app 'appP', and depends on roleChild('appC')
|
||||
self.write_role('roleParent', ['roleChild'], 'appP')
|
||||
self.write_role('roleChild', [], 'appC')
|
||||
# Even appP reachable via deps of roleParent (including itself)
|
||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appP', 'domainP', ['roleParent'])
|
||||
self.assertEqual(result, {'appP': 'domainP'})
|
||||
|
||||
def test_no_inclusion_for_unrelated(self):
|
||||
# Unrelated roles
|
||||
self.write_role('roleC', ['roleD'], 'appC')
|
||||
self.write_role('roleD', [], 'appD')
|
||||
# group_names does not include 'roleC' or 'roleD'
|
||||
result = gdf_module.FilterModule.add_domain_if_group({}, 'appC', 'domainC', ['otherRole'])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
118
tests/unit/test_load_configuration.py
Normal file
118
tests/unit/test_load_configuration.py
Normal file
@ -0,0 +1,118 @@
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from unittest.mock import patch, mock_open
|
||||
from ansible.errors import AnsibleFilterError
|
||||
|
||||
# make sure our plugin is on PYTHONPATH
|
||||
root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
|
||||
sys.path.insert(0, root)
|
||||
|
||||
import load_configuration
|
||||
from load_configuration import FilterModule, _cfg_cache
|
||||
|
||||
class TestLoadConfigurationFilter(unittest.TestCase):
|
||||
def setUp(self):
|
||||
_cfg_cache.clear()
|
||||
self.f = FilterModule().filters()['load_configuration']
|
||||
self.app = 'html_server'
|
||||
self.nested_cfg = {
|
||||
'html_server': {
|
||||
'features': {'matomo': True},
|
||||
'domains': {'canonical': ['html.example.com']}
|
||||
}
|
||||
}
|
||||
self.flat_cfg = {
|
||||
'features': {'matomo': False},
|
||||
'domains': {'canonical': ['flat.example.com']}
|
||||
}
|
||||
|
||||
def test_invalid_key(self):
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.f(self.app, None)
|
||||
|
||||
@patch('load_configuration.os.path.isdir', return_value=False)
|
||||
def test_no_roles_dir(self, _):
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.f(self.app, 'features.matomo')
|
||||
|
||||
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||
@patch('load_configuration.os.path.exists', return_value=False)
|
||||
def test_no_matching_role(self, *_):
|
||||
self.assertIsNone(self.f(self.app, 'features.matomo'))
|
||||
|
||||
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||
@patch('load_configuration.os.path.exists')
|
||||
@patch('load_configuration.open', new_callable=mock_open)
|
||||
@patch('load_configuration.yaml.safe_load')
|
||||
def test_primary_missing_conf(self, mock_yaml, mock_file, mock_exists, *_):
|
||||
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml')
|
||||
mock_yaml.return_value = {'application_id': self.app}
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.f(self.app, 'features.matomo')
|
||||
|
||||
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||
@patch('load_configuration.os.path.exists')
|
||||
@patch('load_configuration.open', new_callable=mock_open)
|
||||
@patch('load_configuration.yaml.safe_load')
|
||||
def test_primary_and_cache(self, mock_yaml, mock_file, mock_exists, *_):
|
||||
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml') or p.endswith('vars/configuration.yml')
|
||||
mock_yaml.side_effect = [
|
||||
{'application_id': self.app}, # main.yml
|
||||
self.nested_cfg # configuration.yml
|
||||
]
|
||||
# first load
|
||||
self.assertTrue(self.f(self.app, 'features.matomo'))
|
||||
self.assertIn(self.app, _cfg_cache)
|
||||
mock_yaml.reset_mock()
|
||||
# from cache
|
||||
self.assertEqual(self.f(self.app, 'domains.canonical'),
|
||||
['html.example.com'])
|
||||
mock_yaml.assert_not_called()
|
||||
|
||||
@patch('load_configuration.os.listdir', return_value=['r1'])
|
||||
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||
@patch('load_configuration.os.path.exists', return_value=True)
|
||||
@patch('load_configuration.open', mock_open(read_data="html_server: {}"))
|
||||
@patch('load_configuration.yaml.safe_load', return_value={'html_server': {}})
|
||||
def test_key_not_found_after_load(self, *_):
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
self.f(self.app, 'does.not.exist')
|
||||
|
||||
@patch('load_configuration.os.listdir', return_value=['r2'])
|
||||
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||
@patch('load_configuration.os.path.exists')
|
||||
@patch('load_configuration.open', new_callable=mock_open)
|
||||
@patch('load_configuration.yaml.safe_load')
|
||||
def test_fallback_nested(self, mock_yaml, mock_file, mock_exists, *_):
|
||||
mock_exists.side_effect = lambda p: p.endswith('vars/configuration.yml')
|
||||
mock_yaml.return_value = self.nested_cfg
|
||||
# nested fallback must work
|
||||
self.assertTrue(self.f(self.app, 'features.matomo'))
|
||||
self.assertEqual(self.f(self.app, 'domains.canonical'),
|
||||
['html.example.com'])
|
||||
|
||||
@patch('load_configuration.os.listdir', return_value=['r4'])
|
||||
@patch('load_configuration.os.path.isdir', return_value=True)
|
||||
@patch('load_configuration.os.path.exists')
|
||||
@patch('load_configuration.open', new_callable=mock_open)
|
||||
@patch('load_configuration.yaml.safe_load')
|
||||
def test_fallback_with_indexed_key(self, mock_yaml, mock_file, mock_exists, *_):
|
||||
# Testing with an indexed key like domains.canonical[0]
|
||||
mock_exists.side_effect = lambda p: p.endswith('vars/configuration.yml')
|
||||
mock_yaml.return_value = {
|
||||
'file-server': {
|
||||
'domains': {
|
||||
'canonical': ['files.example.com', 'extra.example.com']
|
||||
}
|
||||
}
|
||||
}
|
||||
# should get the first element of the canonical domains list
|
||||
self.assertEqual(self.f('file-server', 'domains.canonical[0]'),
|
||||
'files.example.com')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
45
tests/unit/test_merge_mapping.py
Normal file
45
tests/unit/test_merge_mapping.py
Normal file
@ -0,0 +1,45 @@
|
||||
import unittest
|
||||
from filter_plugins.merge_mapping import merge_mapping
|
||||
from ansible.errors import AnsibleFilterError
|
||||
|
||||
class TestMergeMappingFilter(unittest.TestCase):
|
||||
def test_basic_merge_overwrites_and_adds(self):
|
||||
list1 = [
|
||||
{'source': 'a', 'target': 1},
|
||||
{'source': 'b', 'target': 2},
|
||||
]
|
||||
list2 = [
|
||||
{'source': 'b', 'target': 3},
|
||||
{'source': 'c', 'target': 4},
|
||||
]
|
||||
result = merge_mapping(list1, list2, 'source')
|
||||
result_dict = {item['source']: item['target'] for item in result}
|
||||
self.assertEqual(result_dict, {'a': 1, 'b': 3, 'c': 4})
|
||||
|
||||
def test_merge_preserves_and_overwrites_fields(self):
|
||||
list1 = [{'source': 'x', 'value': 100, 'flag': True}]
|
||||
list2 = [{'source': 'x', 'value': 200, 'note': 'updated'}]
|
||||
result = merge_mapping(list1, list2, 'source')
|
||||
self.assertEqual(len(result), 1)
|
||||
merged = result[0]
|
||||
self.assertEqual(merged['value'], 200)
|
||||
self.assertTrue(merged['flag'])
|
||||
self.assertEqual(merged['note'], 'updated')
|
||||
|
||||
def test_empty_lists_return_empty(self):
|
||||
self.assertEqual(merge_mapping([], [], 'source'), [])
|
||||
|
||||
def test_missing_key_raises_error(self):
|
||||
list1 = [{'target': 'no_source'}]
|
||||
list2 = []
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
merge_mapping(list1, list2, 'source')
|
||||
|
||||
def test_non_list_inputs_raise_error(self):
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
merge_mapping("not a list", [], 'source')
|
||||
with self.assertRaises(AnsibleFilterError):
|
||||
merge_mapping([], "not a list", 'source')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -1,57 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(
|
||||
0,
|
||||
os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), "../../")
|
||||
),
|
||||
)
|
||||
|
||||
from filter_plugins.redirect_filters import FilterModule
|
||||
|
||||
|
||||
class TestAddRedirectIfGroup(unittest.TestCase):
|
||||
"""Unit-tests for the add_redirect_if_group filter."""
|
||||
|
||||
def setUp(self):
|
||||
# Obtain the callable once for reuse
|
||||
self.add_redirect = FilterModule().filters()["add_redirect_if_group"]
|
||||
|
||||
def test_appends_redirect_when_group_present(self):
|
||||
original = [{"source": "a", "target": "b"}]
|
||||
result = self.add_redirect(
|
||||
original,
|
||||
group="lam",
|
||||
source="ldap.example.com",
|
||||
target="lam.example.com",
|
||||
group_names=["lam", "other"],
|
||||
)
|
||||
|
||||
# Original list must stay unchanged
|
||||
self.assertEqual(len(original), 1)
|
||||
# Result list must contain the extra entry
|
||||
self.assertEqual(len(result), 2)
|
||||
self.assertIn(
|
||||
{"source": "ldap.example.com", "target": "lam.example.com"}, result
|
||||
)
|
||||
|
||||
def test_keeps_list_unchanged_when_group_absent(self):
|
||||
original = [{"source": "a", "target": "b"}]
|
||||
result = self.add_redirect(
|
||||
original,
|
||||
group="lam",
|
||||
source="ldap.example.com",
|
||||
target="lam.example.com",
|
||||
group_names=["unrelated"],
|
||||
)
|
||||
|
||||
# No new entries
|
||||
self.assertEqual(result, original)
|
||||
# But ensure a new list object was returned (no in-place mutation)
|
||||
self.assertIsNot(result, original)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
x
Reference in New Issue
Block a user