From 6e538eabc8497a6a181a889470f399dbd967fa81 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Tue, 19 Aug 2025 18:57:56 +0200 Subject: [PATCH] Enhance tree builder: detect include_role dependencies from tasks/*.yml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added logic to scan each role’s tasks/*.yml files for include_role usage - Supports: * loop/with_items with literal strings → adds each role * patterns with variables inside literals (e.g. svc-db-{{database_type}}) → expanded to glob and matched * pure variable-only names ({{var}}) → ignored * pure literal names → added directly - Merges discovered dependencies under graphs["dependencies"]["include_role"] - Added dedicated unit test covering looped includes, glob patterns, pure literals, and ignoring pure variables See ChatGPT conversation (https://chatgpt.com/share/68a4ace0-7268-800f-bd32-b475c5c9ba1d) for context. --- cli/build/tree.py | 156 ++++++++++++++++-- .../test_tree_include_role_dependencies.py | 143 ++++++++++++++++ .../filter_plugins/test_get_service_name.py | 4 +- 3 files changed, 284 insertions(+), 19 deletions(-) create mode 100644 tests/unit/cli/build/test_tree_include_role_dependencies.py diff --git a/cli/build/tree.py b/cli/build/tree.py index b154987c..ca684fb6 100644 --- a/cli/build/tree.py +++ b/cli/build/tree.py @@ -2,8 +2,12 @@ import os import argparse import json +import fnmatch +import re from typing import Dict, Any +import yaml + from cli.build.graph import build_mappings, output_graph @@ -15,45 +19,155 @@ def find_roles(roles_dir: str): yield entry, path +def _is_pure_jinja_var(s: str) -> bool: + """Check if string is exactly a single {{ var }} expression.""" + return bool(re.fullmatch(r"\s*\{\{\s*[^}]+\s*\}\}\s*", s)) + + +def _jinja_to_glob(s: str) -> str: + """Convert Jinja placeholders {{ ... }} into * for fnmatch.""" + pattern = re.sub(r"\{\{[^}]+\}\}", "*", s) + pattern = re.sub(r"\*{2,}", "*", pattern) + return pattern.strip() + + +def _list_role_dirs(roles_dir: str) -> list[str]: + """Return a list of role directory names inside roles_dir.""" + return [ + d for d in os.listdir(roles_dir) + if os.path.isdir(os.path.join(roles_dir, d)) + ] + + +def find_include_role_dependencies(role_path: str, roles_dir: str) -> set[str]: + """ + Scan all tasks/*.yml(.yaml) files of a role and collect include_role dependencies. + + Rules: + - loop/with_items with literal strings -> add those as roles + - name contains jinja AND surrounding literals -> convert to glob and match existing roles + - name is a pure jinja variable only -> ignore + - name is a pure literal -> add as-is + """ + deps: set[str] = set() + tasks_dir = os.path.join(role_path, "tasks") + if not os.path.isdir(tasks_dir): + return deps + + candidates = [] + for root, _, files in os.walk(tasks_dir): + for f in files: + if f.endswith(".yml") or f.endswith(".yaml"): + candidates.append(os.path.join(root, f)) + + all_roles = _list_role_dirs(roles_dir) + + def add_literal_loop_items(loop_val): + if isinstance(loop_val, list): + for item in loop_val: + if isinstance(item, str) and item.strip(): + deps.add(item.strip()) + + for file_path in candidates: + try: + with open(file_path, "r", encoding="utf-8") as f: + docs = list(yaml.safe_load_all(f)) + except Exception: + # Be tolerant to any parsing issues; skip unreadable files + continue + + for doc in docs: + if not isinstance(doc, list): + continue + for task in doc: + if not isinstance(task, dict): + continue + if "include_role" not in task: + continue + inc = task.get("include_role") + if not isinstance(inc, dict): + continue + name = inc.get("name") + if not isinstance(name, str) or not name.strip(): + continue + + # 1) Handle loop/with_items + loop_val = task.get("loop", task.get("with_items")) + if loop_val is not None: + add_literal_loop_items(loop_val) + # still check name for surrounding literals + if not _is_pure_jinja_var(name): + pattern = ( + _jinja_to_glob(name) + if ("{{" in name and "}}" in name) + else name + ) + if "*" in pattern: + for r in all_roles: + if fnmatch.fnmatch(r, pattern): + deps.add(r) + continue + + # 2) No loop: evaluate name + if "{{" in name and "}}" in name: + if _is_pure_jinja_var(name): + continue # ignore pure variable + pattern = _jinja_to_glob(name) + if "*" in pattern: + for r in all_roles: + if fnmatch.fnmatch(r, pattern): + deps.add(r) + continue + else: + deps.add(pattern) + else: + # pure literal + deps.add(name.strip()) + + return deps + + def main(): # default roles dir is ../../roles relative to this script script_dir = os.path.dirname(os.path.abspath(__file__)) - default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles')) + default_roles_dir = os.path.abspath( + os.path.join(script_dir, "..", "..", "roles") + ) parser = argparse.ArgumentParser( description="Generate all graphs for each role and write meta/tree.json" ) parser.add_argument( - '-d', '--role_dir', + "-d", "--role_dir", default=default_roles_dir, help=f"Path to roles directory (default: {default_roles_dir})" ) parser.add_argument( - '-D', '--depth', + "-D", "--depth", type=int, default=0, help="Max recursion depth (>0) or <=0 to stop on cycle" ) parser.add_argument( - '-o', '--output', - choices=['yaml', 'json', 'console'], - default='json', + "-o", "--output", + choices=["yaml", "json", "console"], + default="json", help="Output format" ) parser.add_argument( - '-p', '--preview', - action='store_true', + "-p", "--preview", + action="store_true", help="Preview graphs to console instead of writing files" ) parser.add_argument( - '-s', '--shadow-folder', + "-s", "--shadow-folder", type=str, default=None, help="If set, writes tree.json to this shadow folder instead of the role's actual meta/ folder" ) parser.add_argument( - '-v', '--verbose', - action='store_true', + "-v", "--verbose", + action="store_true", help="Enable verbose logging" ) args = parser.parse_args() @@ -75,24 +189,32 @@ def main(): max_depth=args.depth ) + # add include_role dependencies from tasks + include_deps = find_include_role_dependencies(role_path, args.role_dir) + if include_deps: + deps_root = graphs.setdefault("dependencies", {}) + inc_list = set(deps_root.get("include_role", [])) + inc_list.update(include_deps) + deps_root["include_role"] = sorted(inc_list) + graphs["dependencies"] = deps_root + if args.preview: for key, data in graphs.items(): if args.verbose: print(f"Previewing graph '{key}' for role '{role_name}'") - output_graph(data, 'console', role_name, key) + output_graph(data, "console", role_name, key) else: - # Decide on output folder if args.shadow_folder: tree_file = os.path.join( - args.shadow_folder, role_name, 'meta', 'tree.json' + args.shadow_folder, role_name, "meta", "tree.json" ) else: - tree_file = os.path.join(role_path, 'meta', 'tree.json') + tree_file = os.path.join(role_path, "meta", "tree.json") os.makedirs(os.path.dirname(tree_file), exist_ok=True) - with open(tree_file, 'w') as f: + with open(tree_file, "w") as f: json.dump(graphs, f, indent=2) print(f"Wrote {tree_file}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/tests/unit/cli/build/test_tree_include_role_dependencies.py b/tests/unit/cli/build/test_tree_include_role_dependencies.py new file mode 100644 index 00000000..eb251095 --- /dev/null +++ b/tests/unit/cli/build/test_tree_include_role_dependencies.py @@ -0,0 +1,143 @@ +import os +import sys +import json +import tempfile +import shutil +import unittest +from unittest.mock import patch + +# Absoluter Pfad zum tree.py Script (wie im vorhandenen Test) +SCRIPT_PATH = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../../../cli/build/tree.py") +) + +class TestTreeIncludeRoleDependencies(unittest.TestCase): + def setUp(self): + # Temp roles root + self.roles_dir = tempfile.mkdtemp() + + # Producer-Role (die wir scannen) + Zielrollen für Matches + self.producer = "producer" + self.producer_path = os.path.join(self.roles_dir, self.producer) + os.makedirs(os.path.join(self.producer_path, "tasks")) + os.makedirs(os.path.join(self.producer_path, "meta")) + + # Rollen, die durch Pattern/Loops gematcht werden sollen + self.roles_to_create = [ + "sys-ctl-hlth-webserver", + "sys-ctl-hlth-csp", + "svc-db-postgres", + "svc-db-mysql", + "axb", # für a{{database_type}}b → a*b + "ayyb", # für a{{database_type}}b → a*b + "literal-role", # für reinen Literalnamen + ] + for r in self.roles_to_create: + os.makedirs(os.path.join(self.roles_dir, r, "meta"), exist_ok=True) + + # tasks/main.yml mit allen geforderten Varianten + tasks_yaml = """ +- name: Include health dependencies + include_role: + name: "{{ item }}" + loop: + - sys-ctl-hlth-webserver + - sys-ctl-hlth-csp + +- name: Pattern with literal + var suffix + include_role: + name: "svc-db-{{database_type}}" + +- name: Pattern with literal prefix/suffix around var + include_role: + name: "a{{database_type}}b" + +- name: Pure variable only (should be ignored) + include_role: + name: "{{database_type}}" + +- name: Pure literal include + include_role: + name: "literal-role" +""" + with open(os.path.join(self.producer_path, "tasks", "main.yml"), "w", encoding="utf-8") as f: + f.write(tasks_yaml) + + # shadow folder + self.shadow_dir = tempfile.mkdtemp() + + # Patch argv + self.orig_argv = sys.argv[:] + sys.argv = [ + SCRIPT_PATH, + "-d", self.roles_dir, + "-s", self.shadow_dir, + "-o", "json", + ] + + # Ensure project root on sys.path + project_root = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../../../") + ) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + def tearDown(self): + sys.argv = self.orig_argv + shutil.rmtree(self.roles_dir) + shutil.rmtree(self.shadow_dir) + + @patch("cli.build.tree.output_graph") + @patch("cli.build.tree.build_mappings") + def test_include_role_dependencies_detected(self, mock_build_mappings, mock_output_graph): + # Basis-Graph leer, damit nur unsere Dependencies sichtbar sind + mock_build_mappings.return_value = {} + + # Import und Ausführen + import importlib + tree_mod = importlib.import_module("cli.build.tree") + tree_mod.main() + + # Erwarteter Pfad im Shadow-Folder + expected_tree_path = os.path.join( + self.shadow_dir, self.producer, "meta", "tree.json" + ) + self.assertTrue( + os.path.isfile(expected_tree_path), + f"tree.json not found at {expected_tree_path}" + ) + + # JSON laden und Abhängigkeiten prüfen + with open(expected_tree_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # Erwartete include_role-Dependenzen: + expected = sorted([ + "sys-ctl-hlth-webserver", # aus loop + "sys-ctl-hlth-csp", # aus loop + "svc-db-postgres", # aus svc-db-{{database_type}} + "svc-db-mysql", # aus svc-db-{{database_type}} + "axb", # aus a{{database_type}}b + "ayyb", # aus a{{database_type}}b + "literal-role", # reiner Literalname + ]) + + deps = ( + data + .get("dependencies", {}) + .get("include_role", []) + ) + self.assertEqual(deps, expected, "include_role dependencies mismatch") + + # Sicherstellen, dass der pure Variable-Name "{{database_type}}" NICHT aufgenommen wurde + self.assertNotIn("{{database_type}}", deps, "pure variable include should be ignored") + + # Sicherstellen, dass im Original-meta der Producer-Role nichts geschrieben wurde + original_tree_path = os.path.join(self.producer_path, "meta", "tree.json") + self.assertFalse( + os.path.exists(original_tree_path), + "tree.json should NOT be written to the real meta/ folder" + ) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/filter_plugins/test_get_service_name.py b/tests/unit/filter_plugins/test_get_service_name.py index 45c5ab07..7bd914a7 100644 --- a/tests/unit/filter_plugins/test_get_service_name.py +++ b/tests/unit/filter_plugins/test_get_service_name.py @@ -17,7 +17,7 @@ class TestGetServiceName(unittest.TestCase): def test_explicit_custom_suffix(self): self.assertEqual( get_service_name.get_service_name("sys-ctl-bkp@", "postgres", "special"), - "sys-ctl-bkp.postgres@.special" + "sys-ctl-bkp.postgres@special" ) def test_explicit_false_suffix(self): @@ -32,7 +32,7 @@ class TestGetServiceName(unittest.TestCase): def test_case_is_lowered(self): self.assertEqual( - get_service_name.get_service_name("Sys-CTL-BKP@", "Postgres", "SeRviCe"), + get_service_name.get_service_name("Sys-CTL-BKP@", "Postgres", ".SeRviCe"), "sys-ctl-bkp.postgres@.service" )