mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-08-26 21:45:20 +02:00
- Introduced module_utils/role_dependency_resolver.py with full support for include_role, import_role, meta dependencies, and run_after. - Refactored cli/build/tree.py to use RoleDependencyResolver (added toggles for include/import/dependencies/run_after). - Extended filter_plugins/canonical_domains_map.py with optional 'recursive' mode (ignores run_after by design). - Updated roles/web-app-nextcloud to properly include Collabora dependency. - Added comprehensive unittests under tests/unit/module_utils for RoleDependencyResolver. Ref: https://chatgpt.com/share/68a519c8-8e54-800f-83c0-be38546620d9
297 lines
9.9 KiB
Python
297 lines
9.9 KiB
Python
import os
|
|
import fnmatch
|
|
import re
|
|
from typing import Dict, Set, Iterable, Tuple, Optional
|
|
|
|
import yaml
|
|
|
|
|
|
class RoleDependencyResolver:
|
|
_RE_PURE_JINJA = re.compile(r"\s*\{\{\s*[^}]+\s*\}\}\s*$")
|
|
|
|
def __init__(self, roles_dir: str):
|
|
self.roles_dir = roles_dir
|
|
|
|
# -------------------------- public API --------------------------
|
|
|
|
def resolve_transitively(
|
|
self,
|
|
start_roles: Iterable[str],
|
|
*,
|
|
resolve_include_role: bool = True,
|
|
resolve_import_role: bool = True,
|
|
resolve_dependencies: bool = True,
|
|
resolve_run_after: bool = False,
|
|
max_depth: Optional[int] = None,
|
|
) -> Set[str]:
|
|
to_visit = list(dict.fromkeys(start_roles))
|
|
visited: Set[str] = set()
|
|
depth: Dict[str, int] = {}
|
|
|
|
for r in to_visit:
|
|
depth[r] = 0
|
|
|
|
while to_visit:
|
|
role = to_visit.pop()
|
|
cur_d = depth.get(role, 0)
|
|
if role in visited:
|
|
continue
|
|
visited.add(role)
|
|
|
|
if max_depth is not None and cur_d >= max_depth:
|
|
continue
|
|
|
|
for dep in self.get_role_dependencies(
|
|
role,
|
|
resolve_include_role=resolve_include_role,
|
|
resolve_import_role=resolve_import_role,
|
|
resolve_dependencies=resolve_dependencies,
|
|
resolve_run_after=resolve_run_after,
|
|
):
|
|
if dep not in visited:
|
|
to_visit.append(dep)
|
|
depth[dep] = cur_d + 1
|
|
|
|
return visited
|
|
|
|
def get_role_dependencies(
|
|
self,
|
|
role_name: str,
|
|
*,
|
|
resolve_include_role: bool = True,
|
|
resolve_import_role: bool = True,
|
|
resolve_dependencies: bool = True,
|
|
resolve_run_after: bool = False,
|
|
) -> Set[str]:
|
|
role_path = os.path.join(self.roles_dir, role_name)
|
|
if not os.path.isdir(role_path):
|
|
return set()
|
|
|
|
deps: Set[str] = set()
|
|
|
|
if resolve_include_role or resolve_import_role:
|
|
includes, imports = self._scan_tasks(role_path)
|
|
if resolve_include_role:
|
|
deps |= includes
|
|
if resolve_import_role:
|
|
deps |= imports
|
|
|
|
if resolve_dependencies:
|
|
deps |= self._extract_meta_dependencies(role_path)
|
|
|
|
if resolve_run_after:
|
|
deps |= self._extract_meta_run_after(role_path)
|
|
|
|
return deps
|
|
|
|
# -------------------------- scanning helpers --------------------------
|
|
|
|
def _scan_tasks(self, role_path: str) -> Tuple[Set[str], Set[str]]:
|
|
tasks_dir = os.path.join(role_path, "tasks")
|
|
include_roles: Set[str] = set()
|
|
import_roles: Set[str] = set()
|
|
|
|
if not os.path.isdir(tasks_dir):
|
|
return include_roles, import_roles
|
|
|
|
all_roles = self._list_role_dirs(self.roles_dir)
|
|
|
|
candidates = []
|
|
for root, _, files in os.walk(tasks_dir):
|
|
for f in files:
|
|
if f.endswith(".yml") or f.endswith(".yaml"):
|
|
candidates.append(os.path.join(root, f))
|
|
|
|
for file_path in candidates:
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
docs = list(yaml.safe_load_all(f))
|
|
except Exception:
|
|
inc, imp = self._tolerant_scan_file(file_path, all_roles)
|
|
include_roles |= inc
|
|
import_roles |= imp
|
|
continue
|
|
|
|
for doc in docs or []:
|
|
if not isinstance(doc, list):
|
|
continue
|
|
for task in doc:
|
|
if not isinstance(task, dict):
|
|
continue
|
|
if "include_role" in task:
|
|
include_roles |= self._extract_from_task(task, "include_role", all_roles)
|
|
if "import_role" in task:
|
|
import_roles |= self._extract_from_task(task, "import_role", all_roles)
|
|
|
|
return include_roles, import_roles
|
|
|
|
def _extract_from_task(self, task: dict, key: str, all_roles: Iterable[str]) -> Set[str]:
|
|
roles: Set[str] = set()
|
|
spec = task.get(key)
|
|
if not isinstance(spec, dict):
|
|
return roles
|
|
|
|
name = spec.get("name")
|
|
loop_val = self._collect_loop_values(task)
|
|
|
|
if loop_val is not None:
|
|
for item in self._iter_flat(loop_val):
|
|
cand = self._role_from_loop_item(item, name_template=name)
|
|
if cand:
|
|
roles.add(cand)
|
|
|
|
if isinstance(name, str) and name.strip() and not self._is_pure_jinja_var(name):
|
|
pattern = self._jinja_to_glob(name) if ("{{" in name and "}}" in name) else name
|
|
self._match_glob_into(pattern, all_roles, roles)
|
|
return roles
|
|
|
|
if isinstance(name, str) and name.strip():
|
|
if "{{" in name and "}}" in name:
|
|
if self._is_pure_jinja_var(name):
|
|
return roles
|
|
pattern = self._jinja_to_glob(name)
|
|
self._match_glob_into(pattern, all_roles, roles)
|
|
else:
|
|
roles.add(name.strip())
|
|
|
|
return roles
|
|
|
|
def _collect_loop_values(self, task: dict):
|
|
for k in ("loop", "with_items", "with_list", "with_flattened"):
|
|
if k in task:
|
|
return task[k]
|
|
return None
|
|
|
|
def _iter_flat(self, value):
|
|
if isinstance(value, list):
|
|
for v in value:
|
|
if isinstance(v, list):
|
|
for x in v:
|
|
yield x
|
|
else:
|
|
yield v
|
|
|
|
def _role_from_loop_item(self, item, name_template=None) -> Optional[str]:
|
|
tmpl = (name_template or "").strip() if isinstance(name_template, str) else ""
|
|
|
|
if isinstance(item, str):
|
|
if tmpl in ("{{ item }}", "{{item}}") or not tmpl or "item" in tmpl:
|
|
return item.strip()
|
|
return None
|
|
|
|
if isinstance(item, dict):
|
|
for k in ("role", "name"):
|
|
v = item.get(k)
|
|
if isinstance(v, str) and v.strip():
|
|
if tmpl in (f"{{{{ item.{k} }}}}", f"{{{{item.{k}}}}}") or not tmpl or "item" in tmpl:
|
|
return v.strip()
|
|
return None
|
|
|
|
def _match_glob_into(self, pattern: str, all_roles: Iterable[str], out: Set[str]):
|
|
if "*" in pattern or "?" in pattern or "[" in pattern:
|
|
for r in all_roles:
|
|
if fnmatch.fnmatch(r, pattern):
|
|
out.add(r)
|
|
else:
|
|
out.add(pattern)
|
|
|
|
def test_jinja_mixed_name_glob_matching(self):
|
|
"""
|
|
include_role:
|
|
name: "prefix-{{ item }}-suffix"
|
|
loop: [x, y]
|
|
Existing roles: prefix-x-suffix, prefix-y-suffix, prefix-z-suffix
|
|
|
|
Expectation:
|
|
- NO raw loop items ('x', 'y') end up as roles
|
|
- Glob matching resolves to all three concrete roles
|
|
"""
|
|
make_role(self.roles_dir, "A")
|
|
for rn in ["prefix-x-suffix", "prefix-y-suffix", "prefix-z-suffix"]:
|
|
make_role(self.roles_dir, rn)
|
|
|
|
write(
|
|
os.path.join(self.roles_dir, "A", "tasks", "main.yml"),
|
|
"""
|
|
- name: jinja-mixed glob
|
|
include_role:
|
|
name: "prefix-{{ item }}-suffix"
|
|
loop:
|
|
- x
|
|
- y
|
|
"""
|
|
)
|
|
|
|
r = RoleDependencyResolver(self.roles_dir)
|
|
deps = r.get_role_dependencies("A")
|
|
|
|
# ensure no raw loop items leak into the results
|
|
self.assertNotIn("x", deps)
|
|
self.assertNotIn("y", deps)
|
|
|
|
# only the resolved role names should be present
|
|
self.assertEqual(
|
|
deps,
|
|
{"prefix-x-suffix", "prefix-y-suffix", "prefix-z-suffix"},
|
|
)
|
|
|
|
|
|
# -------------------------- meta helpers --------------------------
|
|
|
|
def _extract_meta_dependencies(self, role_path: str) -> Set[str]:
|
|
deps: Set[str] = set()
|
|
meta_main = os.path.join(role_path, "meta", "main.yml")
|
|
if not os.path.isfile(meta_main):
|
|
return deps
|
|
try:
|
|
with open(meta_main, "r", encoding="utf-8") as f:
|
|
meta = yaml.safe_load(f) or {}
|
|
raw_deps = meta.get("dependencies", [])
|
|
if isinstance(raw_deps, list):
|
|
for item in raw_deps:
|
|
if isinstance(item, str):
|
|
deps.add(item.strip())
|
|
elif isinstance(item, dict):
|
|
r = item.get("role")
|
|
if isinstance(r, str) and r.strip():
|
|
deps.add(r.strip())
|
|
except Exception:
|
|
pass
|
|
return deps
|
|
|
|
def _extract_meta_run_after(self, role_path: str) -> Set[str]:
|
|
deps: Set[str] = set()
|
|
meta_main = os.path.join(role_path, "meta", "main.yml")
|
|
if not os.path.isfile(meta_main):
|
|
return deps
|
|
try:
|
|
with open(meta_main, "r", encoding="utf-8") as f:
|
|
meta = yaml.safe_load(f) or {}
|
|
galaxy_info = meta.get("galaxy_info", {})
|
|
run_after = galaxy_info.get("run_after", [])
|
|
if isinstance(run_after, list):
|
|
for item in run_after:
|
|
if isinstance(item, str) and item.strip():
|
|
deps.add(item.strip())
|
|
except Exception:
|
|
pass
|
|
return deps
|
|
|
|
# -------------------------- small utils --------------------------
|
|
|
|
def _list_role_dirs(self, roles_dir: str) -> list[str]:
|
|
return [
|
|
d for d in os.listdir(roles_dir)
|
|
if os.path.isdir(os.path.join(roles_dir, d))
|
|
]
|
|
|
|
@classmethod
|
|
def _is_pure_jinja_var(cls, s: str) -> bool:
|
|
return bool(cls._RE_PURE_JINJA.fullmatch(s or ""))
|
|
|
|
@staticmethod
|
|
def _jinja_to_glob(s: str) -> str:
|
|
pattern = re.sub(r"\{\{[^}]+\}\}", "*", s or "")
|
|
pattern = re.sub(r"\*{2,}", "*", pattern)
|
|
return pattern.strip()
|