Added draft for graph generation

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-11 07:40:16 +02:00
parent 3c63936970
commit 23bbe0520c
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
4 changed files with 182 additions and 85 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@ site.retry
venv venv
*.log *.log
*.bak *.bak
*tree.json

View File

@ -21,6 +21,10 @@ EXTRA_USERS := $(shell \
.PHONY: build install test .PHONY: build install test
tree:
@echo Generating Tree
python3 main.py generate tree -L 1
build: build:
@echo "🔧 Generating users defaults → $(USERS_OUT)" @echo "🔧 Generating users defaults → $(USERS_OUT)"
python3 $(USERS_SCRIPT) \ python3 $(USERS_SCRIPT) \

View File

@ -4,7 +4,8 @@ import argparse
import yaml import yaml
import json import json
from collections import deque from collections import deque
from typing import List, Dict, Any from typing import List, Dict, Any, Set
def find_role_meta(roles_dir: str, role: str) -> str: def find_role_meta(roles_dir: str, role: str) -> str:
path = os.path.join(roles_dir, role, 'meta', 'main.yml') path = os.path.join(roles_dir, role, 'meta', 'main.yml')
@ -12,6 +13,7 @@ def find_role_meta(roles_dir: str, role: str) -> str:
raise FileNotFoundError(f"Metadata not found for role: {role}") raise FileNotFoundError(f"Metadata not found for role: {role}")
return path return path
def load_meta(path: str) -> Dict[str, Any]: def load_meta(path: str) -> Dict[str, Any]:
""" """
Load meta/main.yml return galaxy_info + run_after + dependencies Load meta/main.yml return galaxy_info + run_after + dependencies
@ -26,69 +28,85 @@ def load_meta(path: str) -> Dict[str, Any]:
'dependencies': data.get('dependencies', []) or [] 'dependencies': data.get('dependencies', []) or []
} }
def build_single_graph(start_role: str, recurse_on: str, roles_dir: str) -> Dict[str, Any]: def build_single_graph(
start_role: str,
dep_type: str,
direction: str,
roles_dir: str,
max_depth: int
) -> Dict[str, Any]:
""" """
Build one graph for exactly one dependency type: Build one graph for one dependency type and direction:
- includes all links for context - 'to': follow edges sourcetarget
- recurses only on `recurse_on` - 'from': reverse edges (find roles listing this role)
- max_depth > 0: limit hops to max_depth
- max_depth 0: stop when youd revisit a node already on the path
""" """
nodes: Dict[str, Dict[str, Any]] = {} nodes: Dict[str, Dict[str, Any]] = {}
links: List[Dict[str, str]] = [] links: List[Dict[str, str]] = []
visited = set()
queue = deque([start_role])
while queue: def traverse(role: str, depth: int, path: Set[str]):
role = queue.popleft() # Register node once
if role in visited: if role not in nodes:
continue
visited.add(role)
try:
meta = load_meta(find_role_meta(roles_dir, role)) meta = load_meta(find_role_meta(roles_dir, role))
except FileNotFoundError: node = {'id': role}
continue node.update(meta['galaxy_info'])
node['doc_url'] = f"https://docs.cymais.cloud/roles/{role}/README.html"
node['source_url'] = (
f"https://github.com/kevinveenbirkenbach/cymais/tree/master/roles/{role}"
)
nodes[role] = node
# register node # Depth guard
node = {'id': role} if max_depth > 0 and depth >= max_depth:
node.update(meta['galaxy_info']) return
node['doc_url'] = f"https://docs.cymais.cloud/roles/{role}/README.html"
node['source_url'] = f"https://github.com/kevinveenbirkenbach/cymais/tree/master/roles/{role}"
nodes[role] = node
# emit all links # Determine neighbors according to direction
for lt in ('run_after', 'dependencies'): if direction == 'to':
for tgt in meta[lt]: neighbors = load_meta(find_role_meta(roles_dir, role)).get(dep_type, [])
links.append({'source': role, 'target': tgt, 'type': lt}) for tgt in neighbors:
links.append({'source': role, 'target': tgt, 'type': dep_type})
# General cycle check
if tgt in path:
continue
traverse(tgt, depth + 1, path | {tgt})
# recurse only on chosen type else: # direction == 'from'
for tgt in meta[recurse_on]: # Find all roles that list this role in their dep_type
if tgt not in visited: for other in os.listdir(roles_dir):
queue.append(tgt) try:
meta_o = load_meta(find_role_meta(roles_dir, other))
except FileNotFoundError:
continue
if role in meta_o.get(dep_type, []):
links.append({'source': other, 'target': role, 'type': dep_type})
if other in path:
continue
traverse(other, depth + 1, path | {other})
# Kick off recursion
traverse(start_role, depth=0, path={start_role})
return {'nodes': list(nodes.values()), 'links': links} return {'nodes': list(nodes.values()), 'links': links}
def build_graphs(start_role: str, types: List[str], roles_dir: str) -> Dict[str, Any]: def build_mappings(
""" start_role: str,
If multiple types: return { type1: graph1, type2: graph2, ... } mappings: List[Dict[str, str]],
If single type: return that single graph dict roles_dir: str,
""" max_depth: int
if len(types) == 1: ) -> Dict[str, Any]:
return build_single_graph(start_role, types[0], roles_dir) result: Dict[str, Any] = {}
combined = {} for mapping in mappings:
for t in types: for dep_type, direction in mapping.items():
combined[t] = build_single_graph(start_role, t, roles_dir) key = f"{dep_type}_{direction}"
return combined result[key] = build_single_graph(
start_role, dep_type, direction, roles_dir, max_depth)
return result
def output_graph(graph_data: Any, fmt: str, start: str, types: List[str]):
"""
Write to file or print to console. If multiple types, join them in filename.
"""
if len(types) == 1:
base = f"{start}_{types[0]}"
else:
base = f"{start}_{'_'.join(types)}"
def output_graph(graph_data: Any, fmt: str, start: str, key: str):
base = f"{start}_{key}"
if fmt == 'console': if fmt == 'console':
print(f"--- {base} ---")
print(yaml.safe_dump(graph_data, sort_keys=False)) print(yaml.safe_dump(graph_data, sort_keys=False))
elif fmt in ('yaml', 'json'): elif fmt in ('yaml', 'json'):
path = f"{base}.{fmt}" path = f"{base}.{fmt}"
@ -97,47 +115,73 @@ def output_graph(graph_data: Any, fmt: str, start: str, types: List[str]):
yaml.safe_dump(graph_data, f, sort_keys=False) yaml.safe_dump(graph_data, f, sort_keys=False)
else: else:
json.dump(graph_data, f, indent=2) json.dump(graph_data, f, indent=2)
print(f"Wrote {fmt.upper()} to {path}") print(f"Wrote {path}")
else: else:
raise ValueError(f"Unknown format: {fmt}") raise ValueError(f"Unknown format: {fmt}")
def main(): def main():
script_dir = os.path.dirname(os.path.abspath(__file__)) script_dir = os.path.dirname(os.path.abspath(__file__))
default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles')) default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles'))
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Generate dependency graphs from Ansible roles' meta/main.yml" description="Generate graphs based on dependency mappings"
) )
parser.add_argument( parser.add_argument(
'-r', '--role', required=True, '-r', '--role',
help="Starting role name (directory under roles/)" required=True,
help="Starting role name"
) )
parser.add_argument( parser.add_argument(
'-t', '--type', '-m', '--mapping',
choices=['run_after', 'dependencies'],
default=['run_after'],
nargs='+', nargs='+',
help="Dependency type(s) to recurse on; can specify multiple" default=[
'run_after:to',
'run_after:from',
'dependencies:to',
'dependencies:from'
],
help="Mapping entries as type:direction (default all 4 combos)"
)
parser.add_argument(
'-D', '--depth',
type=int,
default=0,
help="Max recursion depth (>0) or <=0 to stop on cycle"
) )
parser.add_argument( parser.add_argument(
'-o', '--output', '-o', '--output',
choices=['yaml', 'json', 'console'], choices=['yaml', 'json', 'console'],
default='yaml', default='console',
help="Output as files (yaml/json) or print to console" help="Output format"
) )
parser.add_argument( parser.add_argument(
'--roles-dir', '--roles-dir',
default=default_roles_dir, default=default_roles_dir,
help=f"Path to the roles directory (default: {default_roles_dir})" help="Roles directory"
) )
args = parser.parse_args() args = parser.parse_args()
graph_data = build_graphs( mappings: List[Dict[str, str]] = []
for entry in args.mapping:
if ':' not in entry:
parser.error(f"Invalid mapping '{entry}', must be type:direction")
dep_type, direction = entry.split(':', 1)
if dep_type not in ('run_after', 'dependencies'):
parser.error(f"Unknown dependency type '{dep_type}'")
if direction not in ('to', 'from'):
parser.error(f"Unknown direction '{direction}'")
mappings.append({dep_type: direction})
graphs = build_mappings(
start_role=args.role, start_role=args.role,
types=args.type, mappings=mappings,
roles_dir=args.roles_dir roles_dir=args.roles_dir,
max_depth=args.depth
) )
output_graph(graph_data, args.output, args.role, args.type)
for key, graph_data in graphs.items():
output_graph(graph_data, args.output, args.role, key)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import argparse import argparse
import yaml
import json import json
from typing import List, Dict, Any
from cli.generate.graph import build_mappings, output_graph
from cli.generate.graph import build_graphs, output_graph
def find_roles(roles_dir: str): def find_roles(roles_dir: str):
"""Yield (role_name, role_path) for every subfolder in roles_dir.""" """Yield (role_name, role_path) for every subfolder in roles_dir."""
@ -13,49 +14,96 @@ def find_roles(roles_dir: str):
if os.path.isdir(path): if os.path.isdir(path):
yield entry, path yield entry, path
def main(): def main():
# default roles dir is ../../roles relative to this script # default roles dir is ../../roles relative to this script
script_dir = os.path.dirname(os.path.abspath(__file__)) script_dir = os.path.dirname(os.path.abspath(__file__))
default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles')) default_roles_dir = os.path.abspath(os.path.join(script_dir, '..', '..', 'roles'))
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Generate a tree.json for each role, containing both run_after and dependencies" description="Generate mappings-based graphs for each role and write tree.json"
) )
parser.add_argument( parser.add_argument(
'-d','--role_dir', '-d', '--role_dir',
default=default_roles_dir, default=default_roles_dir,
help=f"Path to roles directory (default: {default_roles_dir})" help=f"Path to roles directory (default: {default_roles_dir})"
) )
parser.add_argument( parser.add_argument(
'-p','--preview', '-m', '--mapping',
nargs='+',
default=[
'run_after:to', 'run_after:from',
'dependencies:to', 'dependencies:from'
],
help="Mapping entries as type:direction (default all 4 combos)"
)
parser.add_argument(
'-D', '--depth',
type=int,
default=0,
help="Max recursion depth (>0) or <=0 to stop on cycle"
)
parser.add_argument(
'-o', '--output',
choices=['yaml', 'json', 'console'],
default='json',
help="Output format"
)
parser.add_argument(
'-p', '--preview',
action='store_true', action='store_true',
help="Preview all graphs on console instead of writing files" help="Preview graphs to console instead of writing files"
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help="Enable verbose logging"
) )
args = parser.parse_args() args = parser.parse_args()
# parse mappings
mappings: List[Dict[str, str]] = []
for entry in args.mapping:
if ':' not in entry:
parser.error(f"Invalid mapping '{entry}', must be type:direction")
dep_type, direction = entry.split(':', 1)
mappings.append({dep_type: direction})
if args.verbose:
print(f"Roles directory: {args.role_dir}")
print(f"Mappings: {mappings}")
print(f"Max depth: {args.depth}")
print(f"Output format: {args.output}")
print(f"Preview mode: {args.preview}")
for role_name, role_path in find_roles(args.role_dir): for role_name, role_path in find_roles(args.role_dir):
# Build both graphs at once if args.verbose:
graph_data = build_graphs( print(f"Processing role: {role_name}")
# Build graphs for each mapping
graphs = build_mappings(
start_role=role_name, start_role=role_name,
types=['run_after','dependencies'], mappings=mappings,
roles_dir=args.role_dir roles_dir=args.role_dir,
max_depth=args.depth
) )
# Prepare output file or previews
if args.preview: if args.preview:
# pretty-print via output_graph as YAML to console for key, data in graphs.items():
output_graph( if args.verbose:
graph_data, print(f"Previewing graph '{key}' for role '{role_name}'")
fmt='console', output_graph(data, 'console', role_name, key)
start=role_name,
types=['run_after','dependencies']
)
else: else:
# write raw JSON into roles/<role>/meta/tree.json
tree_file = os.path.join(role_path, 'meta', 'tree.json') tree_file = os.path.join(role_path, 'meta', 'tree.json')
os.makedirs(os.path.dirname(tree_file), exist_ok=True) os.makedirs(os.path.dirname(tree_file), exist_ok=True)
# write combined JSON
with open(tree_file, 'w') as f: with open(tree_file, 'w') as f:
json.dump(graph_data, f, indent=2) json.dump(graphs, f, indent=2)
print(f"Wrote {tree_file}") if args.verbose:
print(f"Wrote {tree_file}")
else:
print(f"Wrote {tree_file}")
if __name__ == '__main__': if __name__ == '__main__':
main() main()