Refine role dependency graph/tree builders and tests

- Refactor cli/build/graph.py to use cached metadata and dependency indices
  for faster graph generation and cleaner separation of concerns
- Refactor cli/build/tree.py to delegate per-role processing to process_role()
  and support parallel execution via ProcessPoolExecutor
- Add unit tests for graph helper functions and build_mappings()
  under tests/unit/cli/build/test_graph.py
- Add unit tests for find_roles() and process_role() behaviour
  under tests/unit/cli/build/test_tree.py
- Remove the old include_role dependency integration test which relied on the
  previous tree.json dependencies bucket

For details see ChatGPT conversation: https://chatgpt.com/share/6926b805-28a0-800f-a075-e5250aab5c4a
This commit is contained in:
2025-11-26 09:20:45 +01:00
parent aca2da885d
commit 9c65bd4839
5 changed files with 760 additions and 283 deletions

View File

@@ -6,168 +6,347 @@ import json
import re
from typing import List, Dict, Any, Set
from module_utils.role_dependency_resolver import RoleDependencyResolver
# Regex used to ignore Jinja expressions inside include/import statements
JINJA_PATTERN = re.compile(r'{{.*}}')
ALL_DEP_TYPES = ['run_after', 'dependencies', 'include_tasks', 'import_tasks', 'include_role', 'import_role']
ALL_DIRECTIONS = ['to', 'from']
ALL_KEYS = [f"{dep}_{dir}" for dep in ALL_DEP_TYPES for dir in ALL_DIRECTIONS]
# All dependency types the graph builder supports
ALL_DEP_TYPES = [
"run_after",
"dependencies",
"include_tasks",
"import_tasks",
"include_role",
"import_role",
]
# Graph directions: outgoing edges ("to") vs incoming edges ("from")
ALL_DIRECTIONS = ["to", "from"]
# Combined keys: e.g. "include_role_to", "dependencies_from", etc.
ALL_KEYS = [f"{dep}_{direction}" for dep in ALL_DEP_TYPES for direction in ALL_DIRECTIONS]
# ------------------------------------------------------------
# Helpers for locating meta and task files
# ------------------------------------------------------------
def find_role_meta(roles_dir: str, role: str) -> str:
path = os.path.join(roles_dir, role, 'meta', 'main.yml')
"""Return path to meta/main.yml of a role or raise FileNotFoundError."""
path = os.path.join(roles_dir, role, "meta", "main.yml")
if not os.path.isfile(path):
raise FileNotFoundError(f"Metadata not found for role: {role}")
return path
def find_role_tasks(roles_dir: str, role: str) -> str:
path = os.path.join(roles_dir, role, 'tasks', 'main.yml')
"""Return path to tasks/main.yml of a role or raise FileNotFoundError."""
path = os.path.join(roles_dir, role, "tasks", "main.yml")
if not os.path.isfile(path):
raise FileNotFoundError(f"Tasks not found for role: {role}")
return path
# ------------------------------------------------------------
# Parsers for meta and tasks
# ------------------------------------------------------------
def load_meta(path: str) -> Dict[str, Any]:
with open(path, 'r') as f:
"""
Load metadata from meta/main.yml.
Returns a dict with:
- galaxy_info
- run_after
- dependencies
"""
with open(path, "r") as f:
data = yaml.safe_load(f) or {}
galaxy_info = data.get('galaxy_info', {}) or {}
galaxy_info = data.get("galaxy_info", {}) or {}
return {
'galaxy_info': galaxy_info,
'run_after': galaxy_info.get('run_after', []) or [],
'dependencies': data.get('dependencies', []) or []
"galaxy_info": galaxy_info,
"run_after": galaxy_info.get("run_after", []) or [],
"dependencies": data.get("dependencies", []) or [],
}
def load_tasks(path: str, dep_type: str) -> List[str]:
with open(path, 'r') as f:
"""
Parse include_tasks/import_tasks from tasks/main.yml.
Only accepts simple, non-Jinja names.
"""
with open(path, "r") as f:
data = yaml.safe_load(f) or []
included_roles = []
roles: List[str] = []
for task in data:
if not isinstance(task, dict):
continue
if dep_type in task:
entry = task[dep_type]
if isinstance(entry, dict):
entry = entry.get('name', '')
if entry and not JINJA_PATTERN.search(entry):
included_roles.append(entry)
entry = entry.get("name", "")
if isinstance(entry, str) and entry and not JINJA_PATTERN.search(entry):
roles.append(entry)
return included_roles
return roles
# ------------------------------------------------------------
# Graph builder using precomputed caches (fast)
# ------------------------------------------------------------
def build_single_graph(
start_role: str,
dep_type: str,
direction: str,
roles_dir: str,
max_depth: int
max_depth: int,
caches: Dict[str, Any],
) -> Dict[str, Any]:
"""
Build a graph (nodes + links) for one role, one dep_type, one direction.
Uses only precomputed in-memory caches, no filesystem access.
caches structure:
caches["meta"][role] -> meta information
caches["deps"][dep_type][role] -> outgoing targets
caches["rev"][dep_type][target] -> set of source roles
"""
nodes: Dict[str, Dict[str, Any]] = {}
links: List[Dict[str, str]] = []
meta_cache = caches["meta"]
deps_cache = caches["deps"]
rev_cache = caches["rev"]
# --------------------------------------------------------
# Ensure a role exists as a node
# --------------------------------------------------------
def ensure_node(role: str):
if role in nodes:
return
# Try retrieving cached meta; fallback: lazy load
meta = meta_cache.get(role)
if meta is None:
try:
meta = load_meta(find_role_meta(roles_dir, role))
meta_cache[role] = meta
except FileNotFoundError:
meta = {"galaxy_info": {}}
galaxy_info = meta.get("galaxy_info", {}) or {}
node = {
"id": role,
**galaxy_info,
"doc_url": f"https://docs.infinito.nexus/roles/{role}/README.html",
"source_url": f"https://github.com/kevinveenbirkenbach/infinito-nexus/tree/master/roles/{role}",
}
nodes[role] = node
# --------------------------------------------------------
# Outgoing edges: role -> targets
# --------------------------------------------------------
def outgoing(role: str) -> List[str]:
return deps_cache.get(dep_type, {}).get(role, []) or []
# --------------------------------------------------------
# Incoming edges: sources -> role
# --------------------------------------------------------
def incoming(role: str) -> Set[str]:
return rev_cache.get(dep_type, {}).get(role, set())
# --------------------------------------------------------
# DFS traversal
# --------------------------------------------------------
def traverse(role: str, depth: int, path: Set[str]):
if role not in nodes:
meta = load_meta(find_role_meta(roles_dir, role))
node = {'id': role}
node.update(meta['galaxy_info'])
node['doc_url'] = f"https://docs.infinito.nexus/roles/{role}/README.html"
node['source_url'] = f"https://s.infinito.nexus/code/tree/master/roles/{role}"
nodes[role] = node
ensure_node(role)
if max_depth > 0 and depth >= max_depth:
return
neighbors = []
if dep_type in ['run_after', 'dependencies']:
meta = load_meta(find_role_meta(roles_dir, role))
neighbors = meta.get(dep_type, [])
else:
try:
neighbors = load_tasks(find_role_tasks(roles_dir, role), dep_type)
except FileNotFoundError:
neighbors = []
if direction == "to":
for tgt in outgoing(role):
ensure_node(tgt)
links.append({"source": role, "target": tgt, "type": dep_type})
if tgt not in path:
traverse(tgt, depth + 1, path | {tgt})
if direction == 'to':
for tgt in neighbors:
links.append({'source': role, 'target': tgt, 'type': dep_type})
if tgt in path:
continue
traverse(tgt, depth + 1, path | {tgt})
else: # direction == "from"
for src in incoming(role):
ensure_node(src)
links.append({"source": src, "target": role, "type": dep_type})
if src not in path:
traverse(src, depth + 1, path | {src})
else: # direction == 'from'
for other in os.listdir(roles_dir):
try:
other_neighbors = []
if dep_type in ['run_after', 'dependencies']:
meta_o = load_meta(find_role_meta(roles_dir, other))
other_neighbors = meta_o.get(dep_type, [])
else:
other_neighbors = load_tasks(find_role_tasks(roles_dir, other), dep_type)
traverse(start_role, 0, {start_role})
if role in other_neighbors:
links.append({'source': other, 'target': role, 'type': dep_type})
if other in path:
continue
traverse(other, depth + 1, path | {other})
return {"nodes": list(nodes.values()), "links": links}
except FileNotFoundError:
continue
traverse(start_role, depth=0, path={start_role})
return {'nodes': list(nodes.values()), 'links': links}
# ------------------------------------------------------------
# Build all graph variants for one role
# ------------------------------------------------------------
def build_mappings(
start_role: str,
roles_dir: str,
max_depth: int
) -> Dict[str, Any]:
"""
Build all 12 graph variants (6 dep types × 2 directions).
Accelerated version:
- One-time scan of all metadata
- One-time scan of all include_role/import_role
- One-time scan of include_tasks/import_tasks
- Build reverse-index tables
- Then generate all graphs purely from memory
"""
result: Dict[str, Any] = {}
for key in ALL_KEYS:
dep_type, direction = key.rsplit('_', 1)
roles = [
r for r in os.listdir(roles_dir)
if os.path.isdir(os.path.join(roles_dir, r))
]
# Pre-caches
meta_cache: Dict[str, Dict[str, Any]] = {}
deps_cache: Dict[str, Dict[str, List[str]]] = {dep: {} for dep in ALL_DEP_TYPES}
rev_cache: Dict[str, Dict[str, Set[str]]] = {dep: {} for dep in ALL_DEP_TYPES}
resolver = RoleDependencyResolver(roles_dir)
# --------------------------------------------------------
# Step 1: Preload meta-based deps (run_after, dependencies)
# --------------------------------------------------------
for role in roles:
try:
result[key] = build_single_graph(start_role, dep_type, direction, roles_dir, max_depth)
meta = load_meta(find_role_meta(roles_dir, role))
except FileNotFoundError:
continue
meta_cache[role] = meta
for dep_key in ["run_after", "dependencies"]:
values = meta.get(dep_key, []) or []
if isinstance(values, list) and values:
deps_cache[dep_key][role] = values
for tgt in values:
if isinstance(tgt, str) and tgt.strip():
rev_cache[dep_key].setdefault(tgt.strip(), set()).add(role)
# --------------------------------------------------------
# Step 2: Preload include_role/import_role (resolver)
# --------------------------------------------------------
for role in roles:
role_path = os.path.join(roles_dir, role)
inc, imp = resolver._scan_tasks(role_path)
if inc:
inc_list = sorted(inc)
deps_cache["include_role"][role] = inc_list
for tgt in inc_list:
rev_cache["include_role"].setdefault(tgt, set()).add(role)
if imp:
imp_list = sorted(imp)
deps_cache["import_role"][role] = imp_list
for tgt in imp_list:
rev_cache["import_role"].setdefault(tgt, set()).add(role)
# --------------------------------------------------------
# Step 3: Preload include_tasks/import_tasks
# --------------------------------------------------------
for role in roles:
try:
tasks_path = find_role_tasks(roles_dir, role)
except FileNotFoundError:
continue
for dep_key in ["include_tasks", "import_tasks"]:
values = load_tasks(tasks_path, dep_key)
if values:
deps_cache[dep_key][role] = values
for tgt in values:
rev_cache[dep_key].setdefault(tgt, set()).add(role)
caches = {
"meta": meta_cache,
"deps": deps_cache,
"rev": rev_cache,
}
# --------------------------------------------------------
# Step 4: Build all graphs from caches
# --------------------------------------------------------
for key in ALL_KEYS:
dep_type, direction = key.rsplit("_", 1)
try:
result[key] = build_single_graph(
start_role=start_role,
dep_type=dep_type,
direction=direction,
roles_dir=roles_dir,
max_depth=max_depth,
caches=caches,
)
except Exception:
result[key] = {'nodes': [], 'links': []}
result[key] = {"nodes": [], "links": []}
return result
# ------------------------------------------------------------
# Output helper
# ------------------------------------------------------------
def output_graph(graph_data: Any, fmt: str, start: str, key: str):
base = f"{start}_{key}"
if fmt == 'console':
if fmt == "console":
print(f"--- {base} ---")
print(yaml.safe_dump(graph_data, sort_keys=False))
elif fmt in ('yaml', 'json'):
else:
path = f"{base}.{fmt}"
with open(path, 'w') as f:
if fmt == 'yaml':
with open(path, "w") as f:
if fmt == "yaml":
yaml.safe_dump(graph_data, f, sort_keys=False)
else:
json.dump(graph_data, f, indent=2)
print(f"Wrote {path}")
else:
raise ValueError(f"Unknown format: {fmt}")
# ------------------------------------------------------------
# CLI entrypoint
# ------------------------------------------------------------
def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles'))
default_roles_dir = os.path.abspath(os.path.join(script_dir, "..", "..", "roles"))
parser = argparse.ArgumentParser(description="Generate dependency graphs")
parser.add_argument('-r', '--role', required=True, help="Starting role name")
parser.add_argument('-D', '--depth', type=int, default=0, help="Max recursion depth")
parser.add_argument('-o', '--output', choices=['yaml', 'json', 'console'], default='console')
parser.add_argument('--roles-dir', default=default_roles_dir, help="Roles directory")
parser.add_argument("-r", "--role", required=True, help="Starting role name")
parser.add_argument("-D", "--depth", type=int, default=0, help="Max recursion depth")
parser.add_argument("-o", "--output", choices=["yaml", "json", "console"], default="console")
parser.add_argument("--roles-dir", default=default_roles_dir, help="Roles directory")
args = parser.parse_args()
graphs = build_mappings(args.role, args.roles_dir, args.depth)
for key in ALL_KEYS:
graph_data = graphs.get(key, {'nodes': [], 'links': []})
graph_data = graphs.get(key, {"nodes": [], "links": []})
output_graph(graph_data, args.output, args.role, key)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@@ -2,19 +2,76 @@
import os
import argparse
import json
from typing import Dict, Any
from typing import Dict, Any, Optional, Iterable, Tuple
from concurrent.futures import ProcessPoolExecutor, as_completed
from cli.build.graph import build_mappings, output_graph
from module_utils.role_dependency_resolver import RoleDependencyResolver
def find_roles(roles_dir: str):
def find_roles(roles_dir: str) -> Iterable[Tuple[str, str]]:
"""
Yield (role_name, role_path) for all roles in the given roles_dir.
"""
for entry in os.listdir(roles_dir):
path = os.path.join(roles_dir, entry)
if os.path.isdir(path):
yield entry, path
def process_role(
role_name: str,
roles_dir: str,
depth: int,
shadow_folder: Optional[str],
output: str,
preview: bool,
verbose: bool,
no_include_role: bool, # currently unused, kept for CLI compatibility
no_import_role: bool, # currently unused, kept for CLI compatibility
no_dependencies: bool, # currently unused, kept for CLI compatibility
no_run_after: bool, # currently unused, kept for CLI compatibility
) -> None:
"""
Worker function: build graphs and (optionally) write meta/tree.json for a single role.
Note:
This version no longer adds a custom top-level "dependencies" bucket.
Only the graphs returned by build_mappings() are written.
"""
role_path = os.path.join(roles_dir, role_name)
if verbose:
print(f"[worker] Processing role: {role_name}")
# Build the full graph structure (all dep types / directions) for this role
graphs: Dict[str, Any] = build_mappings(
start_role=role_name,
roles_dir=roles_dir,
max_depth=depth,
)
# Preview mode: dump graphs to console instead of writing tree.json
if preview:
for key, data in graphs.items():
if verbose:
print(f"[worker] Previewing graph '{key}' for role '{role_name}'")
# In preview mode we always output as console
output_graph(data, "console", role_name, key)
return
# Non-preview: write meta/tree.json for this role
if shadow_folder:
tree_file = os.path.join(shadow_folder, role_name, "meta", "tree.json")
else:
tree_file = os.path.join(role_path, "meta", "tree.json")
os.makedirs(os.path.dirname(tree_file), exist_ok=True)
with open(tree_file, "w", encoding="utf-8") as f:
json.dump(graphs, f, indent=2)
print(f"Wrote {tree_file}")
def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
default_roles_dir = os.path.abspath(os.path.join(script_dir, "..", "..", "roles"))
@@ -22,24 +79,67 @@ def main():
parser = argparse.ArgumentParser(
description="Generate all graphs for each role and write meta/tree.json"
)
parser.add_argument("-d", "--role_dir", default=default_roles_dir,
help=f"Path to roles directory (default: {default_roles_dir})")
parser.add_argument("-D", "--depth", type=int, default=0,
help="Max recursion depth (>0) or <=0 to stop on cycle")
parser.add_argument("-o", "--output", choices=["yaml", "json", "console"],
default="json", help="Output format")
parser.add_argument("-p", "--preview", action="store_true",
help="Preview graphs to console instead of writing files")
parser.add_argument("-s", "--shadow-folder", type=str, default=None,
help="If set, writes tree.json to this shadow folder instead of the role's actual meta/ folder")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument(
"-d",
"--role_dir",
default=default_roles_dir,
help=f"Path to roles directory (default: {default_roles_dir})",
)
parser.add_argument(
"-D",
"--depth",
type=int,
default=0,
help="Max recursion depth (>0) or <=0 to stop on cycle",
)
parser.add_argument(
"-o",
"--output",
choices=["yaml", "json", "console"],
default="json",
help="Output format for preview mode",
)
parser.add_argument(
"-p",
"--preview",
action="store_true",
help="Preview graphs to console instead of writing files",
)
parser.add_argument(
"-s",
"--shadow-folder",
type=str,
default=None,
help="If set, writes tree.json to this shadow folder instead of the role's actual meta/ folder",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Enable verbose logging",
)
# Toggles
parser.add_argument("--no-include-role", action="store_true", help="Do not scan include_role")
parser.add_argument("--no-import-role", action="store_true", help="Do not scan import_role")
parser.add_argument("--no-dependencies", action="store_true", help="Do not read meta/main.yml dependencies")
parser.add_argument("--no-run-after", action="store_true",
help="Do not read galaxy_info.run_after from meta/main.yml")
# Toggles (kept for CLI compatibility, currently only meaningful for future extensions)
parser.add_argument(
"--no-include-role",
action="store_true",
help="Reserved: do not include include_role in custom dependency bucket",
)
parser.add_argument(
"--no-import-role",
action="store_true",
help="Reserved: do not include import_role in custom dependency bucket",
)
parser.add_argument(
"--no-dependencies",
action="store_true",
help="Reserved: do not include meta dependencies in custom dependency bucket",
)
parser.add_argument(
"--no-run-after",
action="store_true",
help="Reserved: do not include run_after in custom dependency bucket",
)
args = parser.parse_args()
@@ -50,54 +150,53 @@ def main():
print(f"Preview mode: {args.preview}")
print(f"Shadow folder: {args.shadow_folder}")
resolver = RoleDependencyResolver(args.role_dir)
roles = [role_name for role_name, _ in find_roles(args.role_dir)]
for role_name, role_path in find_roles(args.role_dir):
if args.verbose:
print(f"Processing role: {role_name}")
# For preview, run sequentially to avoid completely interleaved output.
if args.preview:
for role_name in roles:
process_role(
role_name=role_name,
roles_dir=args.role_dir,
depth=args.depth,
shadow_folder=args.shadow_folder,
output=args.output,
preview=True,
verbose=args.verbose,
no_include_role=args.no_include_role,
no_import_role=args.no_import_role,
no_dependencies=args.no_dependencies,
no_run_after=args.no_run_after,
)
return
graphs: Dict[str, Any] = build_mappings(
start_role=role_name,
roles_dir=args.role_dir,
max_depth=args.depth
)
# Non-preview: roles are processed in parallel
with ProcessPoolExecutor() as executor:
futures = {
executor.submit(
process_role,
role_name,
args.role_dir,
args.depth,
args.shadow_folder,
args.output,
False, # preview=False in parallel mode
args.verbose,
args.no_include_role,
args.no_import_role,
args.no_dependencies,
args.no_run_after,
): role_name
for role_name in roles
}
# Direct deps (depth=1) getrennt erfasst für buckets
inc_roles, imp_roles = resolver._scan_tasks(role_path)
meta_deps = resolver._extract_meta_dependencies(role_path)
run_after = set()
if not args.no_run_after:
run_after = resolver._extract_meta_run_after(role_path)
if any([not args.no_include_role and inc_roles,
not args.no_import_role and imp_roles,
not args.no_dependencies and meta_deps,
not args.no_run_after and run_after]):
deps_root = graphs.setdefault("dependencies", {})
if not args.no_include_role and inc_roles:
deps_root["include_role"] = sorted(inc_roles)
if not args.no_import_role and imp_roles:
deps_root["import_role"] = sorted(imp_roles)
if not args.no_dependencies and meta_deps:
deps_root["dependencies"] = sorted(meta_deps)
if not args.no_run_after and run_after:
deps_root["run_after"] = sorted(run_after)
graphs["dependencies"] = deps_root
if args.preview:
for key, data in graphs.items():
if args.verbose:
print(f"Previewing graph '{key}' for role '{role_name}'")
output_graph(data, "console", role_name, key)
else:
if args.shadow_folder:
tree_file = os.path.join(args.shadow_folder, role_name, "meta", "tree.json")
else:
tree_file = os.path.join(role_path, "meta", "tree.json")
os.makedirs(os.path.dirname(tree_file), exist_ok=True)
with open(tree_file, "w", encoding="utf-8") as f:
json.dump(graphs, f, indent=2)
print(f"Wrote {tree_file}")
for future in as_completed(futures):
role_name = futures[future]
try:
future.result()
except Exception as exc:
# Do not crash the whole run; report the failing role instead.
print(f"[ERROR] Role '{role_name}' failed: {exc}")
if __name__ == "__main__":

View File

@@ -0,0 +1,233 @@
import os
import json
import shutil
import tempfile
import unittest
from io import StringIO
from contextlib import redirect_stdout
from cli.build.graph import (
load_meta,
load_tasks,
build_mappings,
output_graph,
ALL_KEYS,
)
class TestGraphHelpers(unittest.TestCase):
def setUp(self) -> None:
self.tmpdir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(self.tmpdir, ignore_errors=True))
def _write_file(self, rel_path: str, content: str) -> str:
path = os.path.join(self.tmpdir, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return path
def test_load_meta_parses_run_after_and_dependencies(self):
meta_path = self._write_file(
"roles/role_a/meta/main.yml",
"""
galaxy_info:
author: Test Author
run_after:
- role_b
- role_c
dependencies:
- role_d
- role_e
""",
)
meta = load_meta(meta_path)
self.assertIn("galaxy_info", meta)
self.assertEqual(meta["galaxy_info"]["author"], "Test Author")
self.assertEqual(meta["run_after"], ["role_b", "role_c"])
self.assertEqual(meta["dependencies"], ["role_d", "role_e"])
def test_load_tasks_filters_out_jinja_and_reads_names(self):
tasks_path = self._write_file(
"roles/role_a/tasks/main.yml",
"""
- name: include plain file
include_tasks: "subtasks.yml"
- name: include with dict
include_tasks:
name: "other.yml"
- name: include jinja, should be ignored
include_tasks: "{{ dynamic_file }}"
- name: import plain file
import_tasks: "legacy.yml"
- name: import with dict
import_tasks:
name: "more.yml"
- name: import jinja, should be ignored
import_tasks: "{{ legacy_file }}"
""",
)
include_files = load_tasks(tasks_path, "include_tasks")
import_files = load_tasks(tasks_path, "import_tasks")
self.assertEqual(sorted(include_files), ["other.yml", "subtasks.yml"])
self.assertEqual(sorted(import_files), ["legacy.yml", "more.yml"])
class TestBuildMappings(unittest.TestCase):
def setUp(self) -> None:
self.roles_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(self.roles_dir, ignore_errors=True))
def _write_file(self, rel_path: str, content: str) -> str:
path = os.path.join(self.roles_dir, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return path
def _create_minimal_role(self, name: str, with_meta: bool = False) -> None:
os.makedirs(os.path.join(self.roles_dir, name), exist_ok=True)
if with_meta:
self._write_file(
f"{name}/meta/main.yml",
"""
galaxy_info:
author: Minimal
""",
)
def test_build_mappings_collects_all_dependency_types(self):
# Create roles directory structure
self._create_minimal_role("role_b")
self._create_minimal_role("role_c")
self._create_minimal_role("role_d")
self._create_minimal_role("role_e")
# Role A with meta (run_after + dependencies)
self._write_file(
"role_a/meta/main.yml",
"""
galaxy_info:
author: Role A Author
run_after:
- role_b
dependencies:
- role_c
""",
)
# Role A tasks with include_role, import_role, include_tasks, import_tasks
self._write_file(
"role_a/tasks/main.yml",
"""
- name: use docker style role
include_role:
name: role_d
- name: use import role
import_role:
name: role_e
- name: include static tasks file
include_tasks: "subtasks.yml"
- name: import static tasks file
import_tasks:
name: "legacy.yml"
""",
)
# Dummy tasks/meta for other roles not required, but create dirs so they
# are recognized as roles.
self._create_minimal_role("role_a") # dirs already exist but harmless
graphs = build_mappings("role_a", self.roles_dir, max_depth=2)
# Ensure we got all expected graph keys
for key in ALL_KEYS:
self.assertIn(key, graphs, msg=f"Missing graph key {key!r} in result")
# Helper to find links in a graph
def links_of(key: str):
return graphs[key]["links"]
# run_after_to: role_a -> role_b
run_after_links = links_of("run_after_to")
self.assertIn(
{"source": "role_a", "target": "role_b", "type": "run_after"},
run_after_links,
)
# dependencies_to: role_a -> role_c
dep_links = links_of("dependencies_to")
self.assertIn(
{"source": "role_a", "target": "role_c", "type": "dependencies"},
dep_links,
)
# include_role_to: role_a -> role_d
inc_role_links = links_of("include_role_to")
self.assertIn(
{"source": "role_a", "target": "role_d", "type": "include_role"},
inc_role_links,
)
# import_role_to: role_a -> role_e
imp_role_links = links_of("import_role_to")
self.assertIn(
{"source": "role_a", "target": "role_e", "type": "import_role"},
imp_role_links,
)
# include_tasks_to: role_a -> "subtasks.yml"
inc_tasks_links = links_of("include_tasks_to")
self.assertIn(
{"source": "role_a", "target": "subtasks.yml", "type": "include_tasks"},
inc_tasks_links,
)
# import_tasks_to: role_a -> "legacy.yml"
imp_tasks_links = links_of("import_tasks_to")
self.assertIn(
{"source": "role_a", "target": "legacy.yml", "type": "import_tasks"},
imp_tasks_links,
)
def test_output_graph_console_prints_header_and_yaml(self):
graph_data = {"nodes": [{"id": "role_a"}], "links": []}
buf = StringIO()
with redirect_stdout(buf):
output_graph(graph_data, "console", "role_a", "include_role_to")
out = buf.getvalue()
self.assertIn("--- role_a_include_role_to ---", out)
self.assertIn("nodes:", out)
self.assertIn("role_a", out)
def test_output_graph_writes_json_file(self):
graph_data = {"nodes": [{"id": "role_a"}], "links": []}
# Use current working directory; file is small and cleaned manually.
fname = "role_a_include_role_to.json"
try:
output_graph(graph_data, "json", "role_a", "include_role_to")
self.assertTrue(os.path.exists(fname))
with open(fname, "r", encoding="utf-8") as f:
loaded = json.load(f)
self.assertEqual(graph_data, loaded)
finally:
if os.path.exists(fname):
os.remove(fname)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,109 @@
import json
import os
import shutil
import tempfile
import unittest
from io import StringIO
from contextlib import redirect_stdout
from unittest.mock import patch
from cli.build import tree as tree_module
class TestFindRoles(unittest.TestCase):
def setUp(self) -> None:
self.roles_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(self.roles_dir, ignore_errors=True))
def test_find_roles_returns_only_directories(self):
# Create some role directories and a non-directory entry
os.makedirs(os.path.join(self.roles_dir, "role_a"))
os.makedirs(os.path.join(self.roles_dir, "role_b"))
with open(os.path.join(self.roles_dir, "not_a_role.txt"), "w", encoding="utf-8") as f:
f.write("dummy")
roles = dict(tree_module.find_roles(self.roles_dir))
self.assertEqual(set(roles.keys()), {"role_a", "role_b"})
self.assertTrue(all(os.path.isdir(path) for path in roles.values()))
class TestProcessRole(unittest.TestCase):
def setUp(self) -> None:
# We use a temporary "roles" directory and a separate shadow folder.
self.roles_dir = tempfile.mkdtemp()
self.shadow_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(self.roles_dir, ignore_errors=True))
self.addCleanup(lambda: shutil.rmtree(self.shadow_dir, ignore_errors=True))
# Create a minimal role directory
os.makedirs(os.path.join(self.roles_dir, "myrole"), exist_ok=True)
def test_process_role_writes_tree_json_and_does_not_mutate_graphs(self):
graphs = {
"include_role_to": {"nodes": [{"id": "myrole"}], "links": []},
"custom_key": {"value": 42}, # sentinel to ensure we do not modify the dict
}
with patch.object(tree_module, "build_mappings", return_value=graphs) as mocked_build:
tree_module.process_role(
role_name="myrole",
roles_dir=self.roles_dir,
depth=0,
shadow_folder=self.shadow_dir,
output="json",
preview=False,
verbose=False,
no_include_role=False,
no_import_role=False,
no_dependencies=False,
no_run_after=False,
)
mocked_build.assert_called_once()
tree_file = os.path.join(self.shadow_dir, "myrole", "meta", "tree.json")
self.assertTrue(os.path.exists(tree_file), "tree.json was not written")
with open(tree_file, "r", encoding="utf-8") as f:
written_graphs = json.load(f)
# The written file must be exactly what build_mappings returned
self.assertEqual(graphs, written_graphs)
# Especially: no extra top-level "dependencies" block is added
self.assertNotIn("dependencies", written_graphs)
def test_process_role_preview_calls_output_graph_and_does_not_write_file(self):
graphs = {
"graph_a": {"nodes": [{"id": "myrole"}], "links": []},
"graph_b": {"nodes": [], "links": []},
}
with patch.object(tree_module, "build_mappings", return_value=graphs), patch.object(
tree_module, "output_graph"
) as mocked_output:
buf = StringIO()
with redirect_stdout(buf):
tree_module.process_role(
role_name="myrole",
roles_dir=self.roles_dir,
depth=0,
shadow_folder=self.shadow_dir,
output="json",
preview=True,
verbose=True,
no_include_role=False,
no_import_role=False,
no_dependencies=False,
no_run_after=False,
)
# output_graph must be called once per graph entry
self.assertEqual(mocked_output.call_count, len(graphs))
# In preview mode, no tree.json should be written
tree_file = os.path.join(self.shadow_dir, "myrole", "meta", "tree.json")
self.assertFalse(os.path.exists(tree_file))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,143 +0,0 @@
import os
import sys
import json
import tempfile
import shutil
import unittest
from unittest.mock import patch
# Absoluter Pfad zum tree.py Script (wie im vorhandenen Test)
SCRIPT_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../../cli/build/tree.py")
)
class TestTreeIncludeRoleDependencies(unittest.TestCase):
def setUp(self):
# Temp roles root
self.roles_dir = tempfile.mkdtemp()
# Producer-Role (die wir scannen) + Zielrollen für Matches
self.producer = "producer"
self.producer_path = os.path.join(self.roles_dir, self.producer)
os.makedirs(os.path.join(self.producer_path, "tasks"))
os.makedirs(os.path.join(self.producer_path, "meta"))
# Rollen, die durch Pattern/Loops gematcht werden sollen
self.roles_to_create = [
"sys-ctl-hlth-webserver",
"sys-ctl-hlth-csp",
"svc-db-postgres",
"svc-db-mysql",
"axb", # für a{{ database_type }}b → a*b
"ayyb", # für a{{ database_type }}b → a*b
"literal-role", # für reinen Literalnamen
]
for r in self.roles_to_create:
os.makedirs(os.path.join(self.roles_dir, r, "meta"), exist_ok=True)
# tasks/main.yml mit allen geforderten Varianten
tasks_yaml = """
- name: Include health dependencies
include_role:
name: "{{ item }}"
loop:
- sys-ctl-hlth-webserver
- sys-ctl-hlth-csp
- name: Pattern with literal + var suffix
include_role:
name: "svc-db-{{ database_type }}"
- name: Pattern with literal prefix/suffix around var
include_role:
name: "a{{ database_type }}b"
- name: Pure variable only (should be ignored)
include_role:
name: "{{ database_type }}"
- name: Pure literal include
include_role:
name: "literal-role"
"""
with open(os.path.join(self.producer_path, "tasks", "main.yml"), "w", encoding="utf-8") as f:
f.write(tasks_yaml)
# shadow folder
self.shadow_dir = tempfile.mkdtemp()
# Patch argv
self.orig_argv = sys.argv[:]
sys.argv = [
SCRIPT_PATH,
"-d", self.roles_dir,
"-s", self.shadow_dir,
"-o", "json",
]
# Ensure project root on sys.path
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../../")
)
if project_root not in sys.path:
sys.path.insert(0, project_root)
def tearDown(self):
sys.argv = self.orig_argv
shutil.rmtree(self.roles_dir)
shutil.rmtree(self.shadow_dir)
@patch("cli.build.tree.output_graph")
@patch("cli.build.tree.build_mappings")
def test_include_role_dependencies_detected(self, mock_build_mappings, mock_output_graph):
# Basis-Graph leer, damit nur unsere Dependencies sichtbar sind
mock_build_mappings.return_value = {}
# Import und Ausführen
import importlib
tree_mod = importlib.import_module("cli.build.tree")
tree_mod.main()
# Erwarteter Pfad im Shadow-Folder
expected_tree_path = os.path.join(
self.shadow_dir, self.producer, "meta", "tree.json"
)
self.assertTrue(
os.path.isfile(expected_tree_path),
f"tree.json not found at {expected_tree_path}"
)
# JSON laden und Abhängigkeiten prüfen
with open(expected_tree_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Erwartete include_role-Dependenzen:
expected = sorted([
"sys-ctl-hlth-webserver", # aus loop
"sys-ctl-hlth-csp", # aus loop
"svc-db-postgres", # aus svc-db-{{ database_type }}
"svc-db-mysql", # aus svc-db-{{ database_type }}
"axb", # aus a{{ database_type }}b
"ayyb", # aus a{{ database_type }}b
"literal-role", # reiner Literalname
])
deps = (
data
.get("dependencies", {})
.get("include_role", [])
)
self.assertEqual(deps, expected, "include_role dependencies mismatch")
# Sicherstellen, dass der pure Variable-Name "{{ database_type }}" NICHT aufgenommen wurde
self.assertNotIn("{{ database_type }}", deps, "pure variable include should be ignored")
# Sicherstellen, dass im Original-meta der Producer-Role nichts geschrieben wurde
original_tree_path = os.path.join(self.producer_path, "meta", "tree.json")
self.assertFalse(
os.path.exists(original_tree_path),
"tree.json should NOT be written to the real meta/ folder"
)
if __name__ == "__main__":
unittest.main()