From b867a5247170bfd77c0523fd789865f7b3117f5a Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Wed, 20 Aug 2025 02:42:07 +0200 Subject: [PATCH] Refactor and extend role dependency resolution: - Introduced module_utils/role_dependency_resolver.py with full support for include_role, import_role, meta dependencies, and run_after. - Refactored cli/build/tree.py to use RoleDependencyResolver (added toggles for include/import/dependencies/run_after). - Extended filter_plugins/canonical_domains_map.py with optional 'recursive' mode (ignores run_after by design). - Updated roles/web-app-nextcloud to properly include Collabora dependency. - Added comprehensive unittests under tests/unit/module_utils for RoleDependencyResolver. Ref: https://chatgpt.com/share/68a519c8-8e54-800f-83c0-be38546620d9 --- cli/build/tree.py | 204 +++-------- filter_plugins/canonical_domains_map.py | 110 +++--- module_utils/role_dependency_resolver.py | 296 ++++++++++++++++ roles/web-app-nextcloud/config/main.yml | 12 +- roles/web-app-nextcloud/tasks/main.yml | 14 +- .../test_role_dependency_resolver.py | 329 ++++++++++++++++++ 6 files changed, 745 insertions(+), 220 deletions(-) create mode 100644 module_utils/role_dependency_resolver.py create mode 100644 tests/unit/module_utils/test_role_dependency_resolver.py diff --git a/cli/build/tree.py b/cli/build/tree.py index ca684fb6..7db6a6b0 100644 --- a/cli/build/tree.py +++ b/cli/build/tree.py @@ -2,174 +2,45 @@ import os import argparse import json -import fnmatch -import re from typing import Dict, Any -import yaml - from cli.build.graph import build_mappings, output_graph +from module_utils.role_dependency_resolver import RoleDependencyResolver def find_roles(roles_dir: str): - """Yield (role_name, role_path) for every subfolder in roles_dir.""" for entry in os.listdir(roles_dir): path = os.path.join(roles_dir, entry) if os.path.isdir(path): yield entry, path -def _is_pure_jinja_var(s: str) -> bool: - """Check if string is exactly a single {{ var }} expression.""" - return bool(re.fullmatch(r"\s*\{\{\s*[^}]+\s*\}\}\s*", s)) - - -def _jinja_to_glob(s: str) -> str: - """Convert Jinja placeholders {{ ... }} into * for fnmatch.""" - pattern = re.sub(r"\{\{[^}]+\}\}", "*", s) - pattern = re.sub(r"\*{2,}", "*", pattern) - return pattern.strip() - - -def _list_role_dirs(roles_dir: str) -> list[str]: - """Return a list of role directory names inside roles_dir.""" - return [ - d for d in os.listdir(roles_dir) - if os.path.isdir(os.path.join(roles_dir, d)) - ] - - -def find_include_role_dependencies(role_path: str, roles_dir: str) -> set[str]: - """ - Scan all tasks/*.yml(.yaml) files of a role and collect include_role dependencies. - - Rules: - - loop/with_items with literal strings -> add those as roles - - name contains jinja AND surrounding literals -> convert to glob and match existing roles - - name is a pure jinja variable only -> ignore - - name is a pure literal -> add as-is - """ - deps: set[str] = set() - tasks_dir = os.path.join(role_path, "tasks") - if not os.path.isdir(tasks_dir): - return deps - - candidates = [] - for root, _, files in os.walk(tasks_dir): - for f in files: - if f.endswith(".yml") or f.endswith(".yaml"): - candidates.append(os.path.join(root, f)) - - all_roles = _list_role_dirs(roles_dir) - - def add_literal_loop_items(loop_val): - if isinstance(loop_val, list): - for item in loop_val: - if isinstance(item, str) and item.strip(): - deps.add(item.strip()) - - for file_path in candidates: - try: - with open(file_path, "r", encoding="utf-8") as f: - docs = list(yaml.safe_load_all(f)) - except Exception: - # Be tolerant to any parsing issues; skip unreadable files - continue - - for doc in docs: - if not isinstance(doc, list): - continue - for task in doc: - if not isinstance(task, dict): - continue - if "include_role" not in task: - continue - inc = task.get("include_role") - if not isinstance(inc, dict): - continue - name = inc.get("name") - if not isinstance(name, str) or not name.strip(): - continue - - # 1) Handle loop/with_items - loop_val = task.get("loop", task.get("with_items")) - if loop_val is not None: - add_literal_loop_items(loop_val) - # still check name for surrounding literals - if not _is_pure_jinja_var(name): - pattern = ( - _jinja_to_glob(name) - if ("{{" in name and "}}" in name) - else name - ) - if "*" in pattern: - for r in all_roles: - if fnmatch.fnmatch(r, pattern): - deps.add(r) - continue - - # 2) No loop: evaluate name - if "{{" in name and "}}" in name: - if _is_pure_jinja_var(name): - continue # ignore pure variable - pattern = _jinja_to_glob(name) - if "*" in pattern: - for r in all_roles: - if fnmatch.fnmatch(r, pattern): - deps.add(r) - continue - else: - deps.add(pattern) - else: - # pure literal - deps.add(name.strip()) - - return deps - - def main(): - # default roles dir is ../../roles relative to this script script_dir = os.path.dirname(os.path.abspath(__file__)) - default_roles_dir = os.path.abspath( - os.path.join(script_dir, "..", "..", "roles") - ) + default_roles_dir = os.path.abspath(os.path.join(script_dir, "..", "..", "roles")) parser = argparse.ArgumentParser( description="Generate all graphs for each role and write meta/tree.json" ) - parser.add_argument( - "-d", "--role_dir", - default=default_roles_dir, - help=f"Path to roles directory (default: {default_roles_dir})" - ) - parser.add_argument( - "-D", "--depth", - type=int, - default=0, - help="Max recursion depth (>0) or <=0 to stop on cycle" - ) - parser.add_argument( - "-o", "--output", - choices=["yaml", "json", "console"], - default="json", - help="Output format" - ) - parser.add_argument( - "-p", "--preview", - action="store_true", - help="Preview graphs to console instead of writing files" - ) - parser.add_argument( - "-s", "--shadow-folder", - type=str, - default=None, - help="If set, writes tree.json to this shadow folder instead of the role's actual meta/ folder" - ) - parser.add_argument( - "-v", "--verbose", - action="store_true", - help="Enable verbose logging" - ) + parser.add_argument("-d", "--role_dir", default=default_roles_dir, + help=f"Path to roles directory (default: {default_roles_dir})") + parser.add_argument("-D", "--depth", type=int, default=0, + help="Max recursion depth (>0) or <=0 to stop on cycle") + parser.add_argument("-o", "--output", choices=["yaml", "json", "console"], + default="json", help="Output format") + parser.add_argument("-p", "--preview", action="store_true", + help="Preview graphs to console instead of writing files") + parser.add_argument("-s", "--shadow-folder", type=str, default=None, + help="If set, writes tree.json to this shadow folder instead of the role's actual meta/ folder") + parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose logging") + + # Toggles + parser.add_argument("--no-include-role", action="store_true", help="Do not scan include_role") + parser.add_argument("--no-import-role", action="store_true", help="Do not scan import_role") + parser.add_argument("--no-dependencies", action="store_true", help="Do not read meta/main.yml dependencies") + parser.add_argument("--no-run-after", action="store_true", + help="Do not read galaxy_info.run_after from meta/main.yml") + args = parser.parse_args() if args.verbose: @@ -179,6 +50,8 @@ def main(): print(f"Preview mode: {args.preview}") print(f"Shadow folder: {args.shadow_folder}") + resolver = RoleDependencyResolver(args.role_dir) + for role_name, role_path in find_roles(args.role_dir): if args.verbose: print(f"Processing role: {role_name}") @@ -189,13 +62,26 @@ def main(): max_depth=args.depth ) - # add include_role dependencies from tasks - include_deps = find_include_role_dependencies(role_path, args.role_dir) - if include_deps: + # Direct deps (depth=1) – getrennt erfasst für buckets + inc_roles, imp_roles = resolver._scan_tasks(role_path) + meta_deps = resolver._extract_meta_dependencies(role_path) + run_after = set() + if not args.no_run_after: + run_after = resolver._extract_meta_run_after(role_path) + + if any([not args.no_include_role and inc_roles, + not args.no_import_role and imp_roles, + not args.no_dependencies and meta_deps, + not args.no_run_after and run_after]): deps_root = graphs.setdefault("dependencies", {}) - inc_list = set(deps_root.get("include_role", [])) - inc_list.update(include_deps) - deps_root["include_role"] = sorted(inc_list) + if not args.no_include_role and inc_roles: + deps_root["include_role"] = sorted(inc_roles) + if not args.no_import_role and imp_roles: + deps_root["import_role"] = sorted(imp_roles) + if not args.no_dependencies and meta_deps: + deps_root["dependencies"] = sorted(meta_deps) + if not args.no_run_after and run_after: + deps_root["run_after"] = sorted(run_after) graphs["dependencies"] = deps_root if args.preview: @@ -205,13 +91,11 @@ def main(): output_graph(data, "console", role_name, key) else: if args.shadow_folder: - tree_file = os.path.join( - args.shadow_folder, role_name, "meta", "tree.json" - ) + tree_file = os.path.join(args.shadow_folder, role_name, "meta", "tree.json") else: tree_file = os.path.join(role_path, "meta", "tree.json") os.makedirs(os.path.dirname(tree_file), exist_ok=True) - with open(tree_file, "w") as f: + with open(tree_file, "w", encoding="utf-8") as f: json.dump(graphs, f, indent=2) print(f"Wrote {tree_file}") diff --git a/filter_plugins/canonical_domains_map.py b/filter_plugins/canonical_domains_map.py index b7e31287..c57e169c 100644 --- a/filter_plugins/canonical_domains_map.py +++ b/filter_plugins/canonical_domains_map.py @@ -4,45 +4,78 @@ import os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from module_utils.entity_name_utils import get_entity_name +from module_utils.role_dependency_resolver import RoleDependencyResolver + class FilterModule(object): def filters(self): return {'canonical_domains_map': self.canonical_domains_map} - def canonical_domains_map(self, apps, PRIMARY_DOMAIN): + def canonical_domains_map( + self, + apps, + PRIMARY_DOMAIN, + *, + recursive: bool = False, + roles_base_dir: str | None = None, + ): """ - Maps applications to their canonical domains, checking for conflicts - and ensuring all domains are valid and unique across applications. + Build { app_id: [canonical domains...] }. + + Rekursiv werden nur include_role, import_role und meta/main.yml:dependencies verfolgt. + 'run_after' wird hier absichtlich ignoriert. """ + if not isinstance(apps, dict): + raise AnsibleFilterError(f"'apps' must be a dict, got {type(apps).__name__}") + + app_keys = set(apps.keys()) + + if recursive: + roles_base_dir = roles_base_dir or os.path.join(os.getcwd(), "roles") + if not os.path.isdir(roles_base_dir): + raise AnsibleFilterError( + f"roles_base_dir '{roles_base_dir}' not found or not a directory." + ) + + resolver = RoleDependencyResolver(roles_base_dir) + # WICHTIG: resolve_run_after=False (hart) + discovered_roles = resolver.resolve_transitively( + start_roles=app_keys, + resolve_include_role=True, + resolve_import_role=True, + resolve_dependencies=True, + resolve_run_after=False, + max_depth=None, + ) + target_apps = discovered_roles & app_keys + else: + target_apps = app_keys + result = {} seen_domains = {} - for app_id, cfg in apps.items(): - if app_id.startswith(( - "web-", - "svc-db-" # Database services can also be exposed to the internet. It is just listening to the port, but the domain is used for port mapping - )): - if not isinstance(cfg, dict): - raise AnsibleFilterError( - f"Invalid configuration for application '{app_id}': " - f"expected a dict, got {cfg!r}" + for app_id in sorted(target_apps): + cfg = apps.get(app_id) + if cfg is None: + continue + if not str(app_id).startswith(("web-", "svc-db-")): + continue + if not isinstance(cfg, dict): + raise AnsibleFilterError( + f"Invalid configuration for application '{app_id}': expected dict, got {cfg!r}" ) - - domains_cfg = cfg.get('server',{}).get('domains',{}) - if not domains_cfg or 'canonical' not in domains_cfg: - self._add_default_domain(app_id, PRIMARY_DOMAIN, seen_domains, result) - continue - canonical_domains = domains_cfg['canonical'] - self._process_canonical_domains(app_id, canonical_domains, seen_domains, result) + domains_cfg = cfg.get('server', {}).get('domains', {}) + if not domains_cfg or 'canonical' not in domains_cfg: + self._add_default_domain(app_id, PRIMARY_DOMAIN, seen_domains, result) + continue + + canonical_domains = domains_cfg['canonical'] + self._process_canonical_domains(app_id, canonical_domains, seen_domains, result) return result def _add_default_domain(self, app_id, PRIMARY_DOMAIN, seen_domains, result): - """ - Add the default domain for an application if no canonical domains are defined. - Ensures the domain is unique across applications. - """ entity_name = get_entity_name(app_id) default_domain = f"{entity_name}.{PRIMARY_DOMAIN}" if default_domain in seen_domains: @@ -54,40 +87,21 @@ class FilterModule(object): result[app_id] = [default_domain] def _process_canonical_domains(self, app_id, canonical_domains, seen_domains, result): - """ - Process the canonical domains for an application, handling both lists and dicts, - and ensuring each domain is unique. - """ if isinstance(canonical_domains, dict): - self._process_canonical_domains_dict(app_id, canonical_domains, seen_domains, result) + for _, domain in canonical_domains.items(): + self._validate_and_check_domain(app_id, domain, seen_domains) + result[app_id] = canonical_domains.copy() elif isinstance(canonical_domains, list): - self._process_canonical_domains_list(app_id, canonical_domains, seen_domains, result) + for domain in canonical_domains: + self._validate_and_check_domain(app_id, domain, seen_domains) + result[app_id] = list(canonical_domains) else: raise AnsibleFilterError( f"Unexpected type for 'server.domains.canonical' in application '{app_id}': " f"{type(canonical_domains).__name__}" ) - def _process_canonical_domains_dict(self, app_id, domains_dict, seen_domains, result): - """ - Process a dictionary of canonical domains for an application. - """ - for name, domain in domains_dict.items(): - self._validate_and_check_domain(app_id, domain, seen_domains) - result[app_id] = domains_dict.copy() - - def _process_canonical_domains_list(self, app_id, domains_list, seen_domains, result): - """ - Process a list of canonical domains for an application. - """ - for domain in domains_list: - self._validate_and_check_domain(app_id, domain, seen_domains) - result[app_id] = list(domains_list) - def _validate_and_check_domain(self, app_id, domain, seen_domains): - """ - Validate the domain and check if it has already been assigned to another application. - """ if not isinstance(domain, str) or not domain.strip(): raise AnsibleFilterError( f"Invalid domain entry in 'canonical' for application '{app_id}': {domain!r}" diff --git a/module_utils/role_dependency_resolver.py b/module_utils/role_dependency_resolver.py new file mode 100644 index 00000000..cadb7c5a --- /dev/null +++ b/module_utils/role_dependency_resolver.py @@ -0,0 +1,296 @@ +import os +import fnmatch +import re +from typing import Dict, Set, Iterable, Tuple, Optional + +import yaml + + +class RoleDependencyResolver: + _RE_PURE_JINJA = re.compile(r"\s*\{\{\s*[^}]+\s*\}\}\s*$") + + def __init__(self, roles_dir: str): + self.roles_dir = roles_dir + + # -------------------------- public API -------------------------- + + def resolve_transitively( + self, + start_roles: Iterable[str], + *, + resolve_include_role: bool = True, + resolve_import_role: bool = True, + resolve_dependencies: bool = True, + resolve_run_after: bool = False, + max_depth: Optional[int] = None, + ) -> Set[str]: + to_visit = list(dict.fromkeys(start_roles)) + visited: Set[str] = set() + depth: Dict[str, int] = {} + + for r in to_visit: + depth[r] = 0 + + while to_visit: + role = to_visit.pop() + cur_d = depth.get(role, 0) + if role in visited: + continue + visited.add(role) + + if max_depth is not None and cur_d >= max_depth: + continue + + for dep in self.get_role_dependencies( + role, + resolve_include_role=resolve_include_role, + resolve_import_role=resolve_import_role, + resolve_dependencies=resolve_dependencies, + resolve_run_after=resolve_run_after, + ): + if dep not in visited: + to_visit.append(dep) + depth[dep] = cur_d + 1 + + return visited + + def get_role_dependencies( + self, + role_name: str, + *, + resolve_include_role: bool = True, + resolve_import_role: bool = True, + resolve_dependencies: bool = True, + resolve_run_after: bool = False, + ) -> Set[str]: + role_path = os.path.join(self.roles_dir, role_name) + if not os.path.isdir(role_path): + return set() + + deps: Set[str] = set() + + if resolve_include_role or resolve_import_role: + includes, imports = self._scan_tasks(role_path) + if resolve_include_role: + deps |= includes + if resolve_import_role: + deps |= imports + + if resolve_dependencies: + deps |= self._extract_meta_dependencies(role_path) + + if resolve_run_after: + deps |= self._extract_meta_run_after(role_path) + + return deps + + # -------------------------- scanning helpers -------------------------- + + def _scan_tasks(self, role_path: str) -> Tuple[Set[str], Set[str]]: + tasks_dir = os.path.join(role_path, "tasks") + include_roles: Set[str] = set() + import_roles: Set[str] = set() + + if not os.path.isdir(tasks_dir): + return include_roles, import_roles + + all_roles = self._list_role_dirs(self.roles_dir) + + candidates = [] + for root, _, files in os.walk(tasks_dir): + for f in files: + if f.endswith(".yml") or f.endswith(".yaml"): + candidates.append(os.path.join(root, f)) + + for file_path in candidates: + try: + with open(file_path, "r", encoding="utf-8") as f: + docs = list(yaml.safe_load_all(f)) + except Exception: + inc, imp = self._tolerant_scan_file(file_path, all_roles) + include_roles |= inc + import_roles |= imp + continue + + for doc in docs or []: + if not isinstance(doc, list): + continue + for task in doc: + if not isinstance(task, dict): + continue + if "include_role" in task: + include_roles |= self._extract_from_task(task, "include_role", all_roles) + if "import_role" in task: + import_roles |= self._extract_from_task(task, "import_role", all_roles) + + return include_roles, import_roles + + def _extract_from_task(self, task: dict, key: str, all_roles: Iterable[str]) -> Set[str]: + roles: Set[str] = set() + spec = task.get(key) + if not isinstance(spec, dict): + return roles + + name = spec.get("name") + loop_val = self._collect_loop_values(task) + + if loop_val is not None: + for item in self._iter_flat(loop_val): + cand = self._role_from_loop_item(item, name_template=name) + if cand: + roles.add(cand) + + if isinstance(name, str) and name.strip() and not self._is_pure_jinja_var(name): + pattern = self._jinja_to_glob(name) if ("{{" in name and "}}" in name) else name + self._match_glob_into(pattern, all_roles, roles) + return roles + + if isinstance(name, str) and name.strip(): + if "{{" in name and "}}" in name: + if self._is_pure_jinja_var(name): + return roles + pattern = self._jinja_to_glob(name) + self._match_glob_into(pattern, all_roles, roles) + else: + roles.add(name.strip()) + + return roles + + def _collect_loop_values(self, task: dict): + for k in ("loop", "with_items", "with_list", "with_flattened"): + if k in task: + return task[k] + return None + + def _iter_flat(self, value): + if isinstance(value, list): + for v in value: + if isinstance(v, list): + for x in v: + yield x + else: + yield v + + def _role_from_loop_item(self, item, name_template=None) -> Optional[str]: + tmpl = (name_template or "").strip() if isinstance(name_template, str) else "" + + if isinstance(item, str): + if tmpl in ("{{ item }}", "{{item}}") or not tmpl or "item" in tmpl: + return item.strip() + return None + + if isinstance(item, dict): + for k in ("role", "name"): + v = item.get(k) + if isinstance(v, str) and v.strip(): + if tmpl in (f"{{{{ item.{k} }}}}", f"{{{{item.{k}}}}}") or not tmpl or "item" in tmpl: + return v.strip() + return None + + def _match_glob_into(self, pattern: str, all_roles: Iterable[str], out: Set[str]): + if "*" in pattern or "?" in pattern or "[" in pattern: + for r in all_roles: + if fnmatch.fnmatch(r, pattern): + out.add(r) + else: + out.add(pattern) + + def test_jinja_mixed_name_glob_matching(self): + """ + include_role: + name: "prefix-{{ item }}-suffix" + loop: [x, y] + Existing roles: prefix-x-suffix, prefix-y-suffix, prefix-z-suffix + + Expectation: + - NO raw loop items ('x', 'y') end up as roles + - Glob matching resolves to all three concrete roles + """ + make_role(self.roles_dir, "A") + for rn in ["prefix-x-suffix", "prefix-y-suffix", "prefix-z-suffix"]: + make_role(self.roles_dir, rn) + + write( + os.path.join(self.roles_dir, "A", "tasks", "main.yml"), + """ + - name: jinja-mixed glob + include_role: + name: "prefix-{{ item }}-suffix" + loop: + - x + - y + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + deps = r.get_role_dependencies("A") + + # ensure no raw loop items leak into the results + self.assertNotIn("x", deps) + self.assertNotIn("y", deps) + + # only the resolved role names should be present + self.assertEqual( + deps, + {"prefix-x-suffix", "prefix-y-suffix", "prefix-z-suffix"}, + ) + + + # -------------------------- meta helpers -------------------------- + + def _extract_meta_dependencies(self, role_path: str) -> Set[str]: + deps: Set[str] = set() + meta_main = os.path.join(role_path, "meta", "main.yml") + if not os.path.isfile(meta_main): + return deps + try: + with open(meta_main, "r", encoding="utf-8") as f: + meta = yaml.safe_load(f) or {} + raw_deps = meta.get("dependencies", []) + if isinstance(raw_deps, list): + for item in raw_deps: + if isinstance(item, str): + deps.add(item.strip()) + elif isinstance(item, dict): + r = item.get("role") + if isinstance(r, str) and r.strip(): + deps.add(r.strip()) + except Exception: + pass + return deps + + def _extract_meta_run_after(self, role_path: str) -> Set[str]: + deps: Set[str] = set() + meta_main = os.path.join(role_path, "meta", "main.yml") + if not os.path.isfile(meta_main): + return deps + try: + with open(meta_main, "r", encoding="utf-8") as f: + meta = yaml.safe_load(f) or {} + galaxy_info = meta.get("galaxy_info", {}) + run_after = galaxy_info.get("run_after", []) + if isinstance(run_after, list): + for item in run_after: + if isinstance(item, str) and item.strip(): + deps.add(item.strip()) + except Exception: + pass + return deps + + # -------------------------- small utils -------------------------- + + def _list_role_dirs(self, roles_dir: str) -> list[str]: + return [ + d for d in os.listdir(roles_dir) + if os.path.isdir(os.path.join(roles_dir, d)) + ] + + @classmethod + def _is_pure_jinja_var(cls, s: str) -> bool: + return bool(cls._RE_PURE_JINJA.fullmatch(s or "")) + + @staticmethod + def _jinja_to_glob(s: str) -> str: + pattern = re.sub(r"\{\{[^}]+\}\}", "*", s or "") + pattern = re.sub(r"\*{2,}", "*", pattern) + return pattern.strip() diff --git a/roles/web-app-nextcloud/config/main.yml b/roles/web-app-nextcloud/config/main.yml index d948c739..66578739 100644 --- a/roles/web-app-nextcloud/config/main.yml +++ b/roles/web-app-nextcloud/config/main.yml @@ -9,14 +9,14 @@ server: whitelist: font-src: - "data:" - #frame-src: - # - "" + frame-src: + - "{{ WEB_PROTOCOL }}://collabora.{{ PRIMARY_DOMAIN }}" + - "{{ WEB_PROTOCOL }}://collabora.{{ PRIMARY_DOMAIN }}" + - "wss://collabora.{{ PRIMARY_DOMAIN }}" domains: canonical: - "cloud.{{ PRIMARY_DOMAIN }}" # talk: "talk.{{ PRIMARY_DOMAIN }}" @todo needs to be activated - helpers: - collabora: "{{ WEB_PROTOCOL ~ '://' ~ applications | get_app_conf('web-svc-collabora','server.domains.canonical[0]',False,'<< defaults_applications[web-svc-collabora].server.domains.canonical[0]>>') }}" docker: volumes: data: nextcloud_data @@ -54,11 +54,11 @@ oidc: features: matomo: true css: false - desktop: true + desktop: true ldap: true oidc: true central_database: true - logout: true + logout: true default_quota: '1000000000' # Quota to assign if no quota is specified in the OIDC response (bytes) legacy_login_mask: enabled: False # If true, then legacy login mask is shown. Otherwise just SSO diff --git a/roles/web-app-nextcloud/tasks/main.yml b/roles/web-app-nextcloud/tasks/main.yml index 41caed74..bc76edb4 100644 --- a/roles/web-app-nextcloud/tasks/main.yml +++ b/roles/web-app-nextcloud/tasks/main.yml @@ -1,10 +1,12 @@ --- -#- name: "Install Collabora Dependency" -# include_role: -# name: web-svc-collabora -# vars: -# flush_handlers: true -# when: NEXTCLOUD_COLLABORA_ENABLED +- name: "Install Collabora Dependency" + include_role: + name: web-svc-collabora + vars: + flush_handlers: true + when: + - run_once_web_svc_collabora is not defined + - NEXTCLOUD_COLLABORA_ENABLED - name: "include role for {{ application_id }} to receive certs & do modification routines" include_role: diff --git a/tests/unit/module_utils/test_role_dependency_resolver.py b/tests/unit/module_utils/test_role_dependency_resolver.py new file mode 100644 index 00000000..e9259bcc --- /dev/null +++ b/tests/unit/module_utils/test_role_dependency_resolver.py @@ -0,0 +1,329 @@ +import os +import sys +import shutil +import tempfile +import unittest +from textwrap import dedent + +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) +if PROJECT_ROOT not in sys.path: + sys.path.insert(0, PROJECT_ROOT) + +from module_utils.role_dependency_resolver import RoleDependencyResolver # noqa: E402 + + +def write(path: str, content: str): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(dedent(content).lstrip()) + + +def make_role(roles_dir: str, name: str): + path = os.path.join(roles_dir, name) + os.makedirs(path, exist_ok=True) + os.makedirs(os.path.join(path, "tasks"), exist_ok=True) + os.makedirs(os.path.join(path, "meta"), exist_ok=True) + return path + + +class TestRoleDependencyResolver(unittest.TestCase): + def setUp(self): + self.roles_dir = tempfile.mkdtemp(prefix="roles_") + + def tearDown(self): + shutil.rmtree(self.roles_dir, ignore_errors=True) + + # ----------------------------- TESTS ----------------------------- + + def test_include_and_import_literal(self): + """ + A/tasks/main.yml: + - include_role: { name: B } + - import_role: { name: C } + Expect: deps = {B, C} + """ + make_role(self.roles_dir, "A") + make_role(self.roles_dir, "B") + make_role(self.roles_dir, "C") + + write( + os.path.join(self.roles_dir, "A", "tasks", "main.yml"), + """ + - name: include B + include_role: + name: B + + - name: import C + import_role: + name: C + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + deps = r.get_role_dependencies("A") + self.assertEqual(deps, {"B", "C"}) + + def test_loop_with_string_items_and_dict_items(self): + """ + A/tasks/main.yml uses loop with strings and dicts. + Expect: {D, E, F, G} + """ + make_role(self.roles_dir, "A") + for rn in ["D", "E", "F", "G"]: + make_role(self.roles_dir, rn) + + write( + os.path.join(self.roles_dir, "A", "tasks", "main.yml"), + """ + - name: loop over strings → D, E + include_role: + name: "{{ item }}" + loop: + - D + - E + + - name: loop over dicts → F, G + import_role: + name: "{{ item.role }}" + with_items: + - { role: "F" } + - { role: "G" } + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + deps = r.get_role_dependencies("A") + self.assertEqual(deps, {"D", "E", "F", "G"}) + +def test_jinja_mixed_name_glob_matching(self): + """ + include_role: + name: "prefix-{{ item }}-suffix" + loop: [x, y] + Existierende Rollen: prefix-x-suffix, prefix-y-suffix, prefix-z-suffix + + Erwartung: + - KEINE Roh-Items ('x', 'y') als Rollen + - Glob-Matching liefert die drei konkreten Rollen + """ + make_role(self.roles_dir, "A") + for rn in ["prefix-x-suffix", "prefix-y-suffix", "prefix-z-suffix"]: + make_role(self.roles_dir, rn) + + write( + os.path.join(self.roles_dir, "A", "tasks", "main.yml"), + """ + - name: jinja-mixed glob + include_role: + name: "prefix-{{ item }}-suffix" + loop: + - x + - y + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + deps = r.get_role_dependencies("A") + + # keine Roh-Loop-Items + self.assertNotIn("x", deps) + self.assertNotIn("y", deps) + + # erwartete Rollen aus dem Glob-Matching + self.assertEqual( + deps, + {"prefix-x-suffix", "prefix-y-suffix", "prefix-z-suffix"}, + ) + + def test_pure_jinja_ignored_without_loop(self): + """ + name: "{{ something }}" with no loop should be ignored. + """ + make_role(self.roles_dir, "A") + for rn in ["X", "Y"]: + make_role(self.roles_dir, rn) + + write( + os.path.join(self.roles_dir, "A", "tasks", "main.yml"), + """ + - name: pure var ignored + include_role: + name: "{{ something }}" + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + deps = r.get_role_dependencies("A") + self.assertEqual(deps, set()) + + def test_meta_dependencies_strings_and_dicts(self): + """ + meta/main.yml: + dependencies: + - H + - { role: I } + Expect: {H, I} + """ + make_role(self.roles_dir, "A") + make_role(self.roles_dir, "H") + make_role(self.roles_dir, "I") + + write( + os.path.join(self.roles_dir, "A", "meta", "main.yml"), + """ + --- + dependencies: + - H + - { role: I } + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + deps = r.get_role_dependencies("A") + self.assertEqual(deps, {"H", "I"}) + + def test_run_after_extraction_and_toggle(self): + """ + galaxy_info.run_after is only included when resolve_run_after=True + """ + make_role(self.roles_dir, "A") + make_role(self.roles_dir, "J") + make_role(self.roles_dir, "K") + + write( + os.path.join(self.roles_dir, "A", "meta", "main.yml"), + """ + --- + galaxy_info: + run_after: + - J + - K + dependencies: [] + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + + # Direkter Helper + ra = r._extract_meta_run_after(os.path.join(self.roles_dir, "A")) + self.assertEqual(ra, {"J", "K"}) + + # Transitiv – off by default + visited_off = r.resolve_transitively(["A"], resolve_run_after=False) + self.assertNotIn("J", visited_off) + self.assertNotIn("K", visited_off) + + # Transitiv – enabled + visited_on = r.resolve_transitively(["A"], resolve_run_after=True) + self.assertTrue({"A", "J", "K"}.issubset(visited_on)) + + def test_cycle_and_max_depth(self): + """ + A → include B + B → import A + - Ensure cycle-safe traversal. + - max_depth=0 → only start + - max_depth=1 → start + direct deps + """ + make_role(self.roles_dir, "A") + make_role(self.roles_dir, "B") + + write( + os.path.join(self.roles_dir, "A", "tasks", "main.yml"), + """ + - include_role: + name: B + """ + ) + write( + os.path.join(self.roles_dir, "B", "tasks", "main.yml"), + """ + - import_role: + name: A + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + + visited = r.resolve_transitively(["A"]) + self.assertTrue({"A", "B"}.issubset(visited)) + + only_start = r.resolve_transitively(["A"], max_depth=0) + self.assertEqual(only_start, {"A"}) + + depth_one = r.resolve_transitively(["A"], max_depth=1) + self.assertEqual(depth_one, {"A", "B"}) + + def test_tolerant_scan_fallback_on_invalid_yaml(self): + """ + Force yaml.safe_load_all to fail and ensure tolerant scan picks up: + - include_role literal name + - loop list items + """ + make_role(self.roles_dir, "A") + for rn in ["R1", "R2", "R3"]: + make_role(self.roles_dir, rn) + + # Invalid YAML (e.g., stray colon) to trigger exception + write( + os.path.join(self.roles_dir, "A", "tasks", "broken.yml"), + """ + include_role: + name: R1 + :: this line breaks YAML :: + + - include_role: + name: "{{ item }}" + loop: + - R2 + - R3 + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + inc, imp = r._scan_tasks(os.path.join(self.roles_dir, "A")) + self.assertTrue({"R1", "R2", "R3"}.issubset(inc)) + self.assertEqual(imp, set()) + + def test_resolve_transitively_combined_sources(self): + """ + Combined test: include/import + dependencies (+ optional run_after). + """ + for rn in ["ROOT", "C1", "C2", "D1", "D2", "RA1"]: + make_role(self.roles_dir, rn) + + write( + os.path.join(self.roles_dir, "ROOT", "tasks", "main.yml"), + """ + - include_role: { name: C1 } + - import_role: { name: C2 } + """ + ) + write( + os.path.join(self.roles_dir, "ROOT", "meta", "main.yml"), + """ + --- + dependencies: + - D1 + - { role: D2 } + galaxy_info: + run_after: + - RA1 + """ + ) + + r = RoleDependencyResolver(self.roles_dir) + + # Ohne run_after + visited = r.resolve_transitively(["ROOT"], resolve_run_after=False) + for expected in ["ROOT", "C1", "C2", "D1", "D2"]: + self.assertIn(expected, visited) + self.assertNotIn("RA1", visited) + + # Mit run_after + visited_ra = r.resolve_transitively(["ROOT"], resolve_run_after=True) + self.assertIn("RA1", visited_ra) + + +if __name__ == "__main__": + unittest.main()