Finished Graph and Tree implementation

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-11 14:33:27 +02:00
parent 6780950257
commit 25cee9a4c7
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
5 changed files with 175 additions and 116 deletions

View File

@ -3,10 +3,16 @@ import os
import argparse
import yaml
import json
from collections import deque
import re
from typing import List, Dict, Any, Set
JINJA_PATTERN = re.compile(r'{{.*}}')
ALL_DEP_TYPES = ['run_after', 'dependencies', 'include_tasks', 'import_tasks', 'include_role', 'import_role']
ALL_DIRECTIONS = ['to', 'from']
ALL_KEYS = [f"{dep}_{dir}" for dep in ALL_DEP_TYPES for dir in ALL_DIRECTIONS]
def find_role_meta(roles_dir: str, role: str) -> str:
path = os.path.join(roles_dir, role, 'meta', 'main.yml')
if not os.path.isfile(path):
@ -14,10 +20,14 @@ def find_role_meta(roles_dir: str, role: str) -> str:
return path
def find_role_tasks(roles_dir: str, role: str) -> str:
path = os.path.join(roles_dir, role, 'tasks', 'main.yml')
if not os.path.isfile(path):
raise FileNotFoundError(f"Tasks not found for role: {role}")
return path
def load_meta(path: str) -> Dict[str, Any]:
"""
Load meta/main.yml return galaxy_info + run_after + dependencies
"""
with open(path, 'r') as f:
data = yaml.safe_load(f) or {}
@ -28,6 +38,24 @@ def load_meta(path: str) -> Dict[str, Any]:
'dependencies': data.get('dependencies', []) or []
}
def load_tasks(path: str, dep_type: str) -> List[str]:
with open(path, 'r') as f:
data = yaml.safe_load(f) or []
included_roles = []
for task in data:
if dep_type in task:
entry = task[dep_type]
if isinstance(entry, dict):
entry = entry.get('name', '')
if entry and not JINJA_PATTERN.search(entry):
included_roles.append(entry)
return included_roles
def build_single_graph(
start_role: str,
dep_type: str,
@ -35,71 +63,73 @@ def build_single_graph(
roles_dir: str,
max_depth: int
) -> Dict[str, Any]:
"""
Build one graph for one dependency type and direction:
- 'to': follow edges sourcetarget
- 'from': reverse edges (find roles listing this role)
- max_depth > 0: limit hops to max_depth
- max_depth 0: stop when youd revisit a node already on the path
"""
nodes: Dict[str, Dict[str, Any]] = {}
links: List[Dict[str, str]] = []
def traverse(role: str, depth: int, path: Set[str]):
# Register node once
if role not in nodes:
meta = load_meta(find_role_meta(roles_dir, role))
node = {'id': role}
node.update(meta['galaxy_info'])
node['doc_url'] = f"https://docs.cymais.cloud/roles/{role}/README.html"
node['source_url'] = (
f"https://github.com/kevinveenbirkenbach/cymais/tree/master/roles/{role}"
)
node['source_url'] = f"https://github.com/kevinveenbirkenbach/cymais/tree/master/roles/{role}"
nodes[role] = node
# Depth guard
if max_depth > 0 and depth >= max_depth:
return
# Determine neighbors according to direction
neighbors = []
if dep_type in ['run_after', 'dependencies']:
meta = load_meta(find_role_meta(roles_dir, role))
neighbors = meta.get(dep_type, [])
else:
try:
neighbors = load_tasks(find_role_tasks(roles_dir, role), dep_type)
except FileNotFoundError:
neighbors = []
if direction == 'to':
neighbors = load_meta(find_role_meta(roles_dir, role)).get(dep_type, [])
for tgt in neighbors:
links.append({'source': role, 'target': tgt, 'type': dep_type})
# General cycle check
if tgt in path:
continue
traverse(tgt, depth + 1, path | {tgt})
else: # direction == 'from'
# Find all roles that list this role in their dep_type
for other in os.listdir(roles_dir):
try:
meta_o = load_meta(find_role_meta(roles_dir, other))
other_neighbors = []
if dep_type in ['run_after', 'dependencies']:
meta_o = load_meta(find_role_meta(roles_dir, other))
other_neighbors = meta_o.get(dep_type, [])
else:
other_neighbors = load_tasks(find_role_tasks(roles_dir, other), dep_type)
if role in other_neighbors:
links.append({'source': other, 'target': role, 'type': dep_type})
if other in path:
continue
traverse(other, depth + 1, path | {other})
except FileNotFoundError:
continue
if role in meta_o.get(dep_type, []):
links.append({'source': other, 'target': role, 'type': dep_type})
if other in path:
continue
traverse(other, depth + 1, path | {other})
# Kick off recursion
traverse(start_role, depth=0, path={start_role})
return {'nodes': list(nodes.values()), 'links': links}
def build_mappings(
start_role: str,
mappings: List[Dict[str, str]],
roles_dir: str,
max_depth: int
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for mapping in mappings:
for dep_type, direction in mapping.items():
key = f"{dep_type}_{direction}"
result[key] = build_single_graph(
start_role, dep_type, direction, roles_dir, max_depth)
for key in ALL_KEYS:
dep_type, direction = key.rsplit('_', 1)
try:
result[key] = build_single_graph(start_role, dep_type, direction, roles_dir, max_depth)
except Exception:
result[key] = {'nodes': [], 'links': []}
return result
@ -124,64 +154,20 @@ def main():
script_dir = os.path.dirname(os.path.abspath(__file__))
default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles'))
parser = argparse.ArgumentParser(
description="Generate graphs based on dependency mappings"
)
parser.add_argument(
'-r', '--role',
required=True,
help="Starting role name"
)
parser.add_argument(
'-m', '--mapping',
nargs='+',
default=[
'run_after:to',
'run_after:from',
'dependencies:to',
'dependencies:from'
],
help="Mapping entries as type:direction (default all 4 combos)"
)
parser.add_argument(
'-D', '--depth',
type=int,
default=0,
help="Max recursion depth (>0) or <=0 to stop on cycle"
)
parser.add_argument(
'-o', '--output',
choices=['yaml', 'json', 'console'],
default='console',
help="Output format"
)
parser.add_argument(
'--roles-dir',
default=default_roles_dir,
help="Roles directory"
)
parser = argparse.ArgumentParser(description="Generate dependency graphs")
parser.add_argument('-r', '--role', required=True, help="Starting role name")
parser.add_argument('-D', '--depth', type=int, default=0, help="Max recursion depth")
parser.add_argument('-o', '--output', choices=['yaml', 'json', 'console'], default='console')
parser.add_argument('--roles-dir', default=default_roles_dir, help="Roles directory")
args = parser.parse_args()
mappings: List[Dict[str, str]] = []
for entry in args.mapping:
if ':' not in entry:
parser.error(f"Invalid mapping '{entry}', must be type:direction")
dep_type, direction = entry.split(':', 1)
if dep_type not in ('run_after', 'dependencies'):
parser.error(f"Unknown dependency type '{dep_type}'")
if direction not in ('to', 'from'):
parser.error(f"Unknown direction '{direction}'")
mappings.append({dep_type: direction})
graphs = build_mappings(args.role, args.roles_dir, args.depth)
graphs = build_mappings(
start_role=args.role,
mappings=mappings,
roles_dir=args.roles_dir,
max_depth=args.depth
)
for key, graph_data in graphs.items():
for key in ALL_KEYS:
graph_data = graphs.get(key, {'nodes': [], 'links': []})
output_graph(graph_data, args.output, args.role, key)
if __name__ == '__main__':
main()

View File

@ -2,7 +2,7 @@
import os
import argparse
import json
from typing import List, Dict, Any
from typing import Dict, Any
from cli.generate.graph import build_mappings, output_graph
@ -21,22 +21,13 @@ def main():
default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles'))
parser = argparse.ArgumentParser(
description="Generate mappings-based graphs for each role and write tree.json"
description="Generate all graphs for each role and write meta/tree.json"
)
parser.add_argument(
'-d', '--role_dir',
default=default_roles_dir,
help=f"Path to roles directory (default: {default_roles_dir})"
)
parser.add_argument(
'-m', '--mapping',
nargs='+',
default=[
'run_after:to', 'run_after:from',
'dependencies:to', 'dependencies:from'
],
help="Mapping entries as type:direction (default all 4 combos)"
)
parser.add_argument(
'-D', '--depth',
type=int,
@ -61,17 +52,8 @@ def main():
)
args = parser.parse_args()
# parse mappings
mappings: List[Dict[str, str]] = []
for entry in args.mapping:
if ':' not in entry:
parser.error(f"Invalid mapping '{entry}', must be type:direction")
dep_type, direction = entry.split(':', 1)
mappings.append({dep_type: direction})
if args.verbose:
print(f"Roles directory: {args.role_dir}")
print(f"Mappings: {mappings}")
print(f"Max depth: {args.depth}")
print(f"Output format: {args.output}")
print(f"Preview mode: {args.preview}")
@ -80,15 +62,12 @@ def main():
if args.verbose:
print(f"Processing role: {role_name}")
# Build graphs for each mapping
graphs = build_mappings(
graphs: Dict[str, Any] = build_mappings(
start_role=role_name,
mappings=mappings,
roles_dir=args.role_dir,
max_depth=args.depth
)
# Prepare output file or previews
if args.preview:
for key, data in graphs.items():
if args.verbose:
@ -97,13 +76,10 @@ def main():
else:
tree_file = os.path.join(role_path, 'meta', 'tree.json')
os.makedirs(os.path.dirname(tree_file), exist_ok=True)
# write combined JSON
with open(tree_file, 'w') as f:
json.dump(graphs, f, indent=2)
if args.verbose:
print(f"Wrote {tree_file}")
else:
print(f"Wrote {tree_file}")
print(f"Wrote {tree_file}")
if __name__ == '__main__':
main()

View File

View File

@ -0,0 +1,56 @@
import unittest
import tempfile
import shutil
import os
import yaml
from cli.generate import graph
class TestGraphLogic(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.role_name = "role_a"
self.role_path = os.path.join(self.temp_dir, self.role_name)
os.makedirs(os.path.join(self.role_path, "meta"))
os.makedirs(os.path.join(self.role_path, "tasks"))
# Write meta/main.yml
with open(os.path.join(self.role_path, "meta", "main.yml"), 'w') as f:
yaml.dump({
"galaxy_info": {
"author": "tester",
"run_after": []
},
"dependencies": []
}, f)
# Write tasks/main.yml
with open(os.path.join(self.role_path, "tasks", "main.yml"), 'w') as f:
yaml.dump([
{"include_role": "some_other_role"},
{"import_role": {"name": "another_role"}}
], f)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_load_meta_returns_dict(self):
meta_path = graph.find_role_meta(self.temp_dir, self.role_name)
meta = graph.load_meta(meta_path)
self.assertIsInstance(meta, dict)
self.assertIn('galaxy_info', meta)
def test_load_tasks_include_role(self):
task_path = graph.find_role_tasks(self.temp_dir, self.role_name)
includes = graph.load_tasks(task_path, 'include_role')
self.assertIn("some_other_role", includes)
def test_build_mappings_structure(self):
result = graph.build_mappings(self.role_name, self.temp_dir, max_depth=1)
self.assertIsInstance(result, dict)
for key in graph.ALL_KEYS:
self.assertIn(key, result)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,41 @@
import unittest
import tempfile
import shutil
import os
import json
from cli.generate import tree
class TestTreeMain(unittest.TestCase):
def setUp(self):
# Create a temporary roles directory with a fake role
self.temp_dir = tempfile.mkdtemp()
self.role_name = "testrole"
self.role_path = os.path.join(self.temp_dir, self.role_name)
os.makedirs(os.path.join(self.role_path, "meta"))
meta_path = os.path.join(self.role_path, "meta", "main.yml")
with open(meta_path, 'w') as f:
f.write("galaxy_info:\n author: test\n run_after: []\ndependencies: []\n")
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_find_roles(self):
roles = list(tree.find_roles(self.temp_dir))
self.assertEqual(len(roles), 1)
self.assertEqual(roles[0][0], self.role_name)
def test_main_execution_does_not_raise(self):
# Mocking sys.argv and running main should not raise
import sys
old_argv = sys.argv
sys.argv = ['tree.py', '-d', self.temp_dir, '-p']
try:
tree.main()
finally:
sys.argv = old_argv
if __name__ == '__main__':
unittest.main()