mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-09-24 11:06:24 +02:00
323 lines
12 KiB
Python
323 lines
12 KiB
Python
import os
|
|
import glob
|
|
import re
|
|
import unittest
|
|
import yaml
|
|
from typing import Any, Dict, Iterable, List, Set, Tuple, Optional
|
|
|
|
|
|
# ---------- YAML helpers ----------
|
|
|
|
def load_yaml_documents(path: str) -> List[Any]:
|
|
"""
|
|
Load one or more YAML documents from a file and return them as a list.
|
|
Raises AssertionError with a helpful message on parse errors.
|
|
"""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
try:
|
|
docs = list(yaml.safe_load_all(f))
|
|
return [d for d in docs if d is not None]
|
|
except yaml.YAMLError as e:
|
|
raise AssertionError(f"YAML parsing error in {path}: {e}")
|
|
|
|
|
|
def _iter_task_like_entries(node: Any) -> Iterable[Dict[str, Any]]:
|
|
"""
|
|
Recursively yield task/handler-like dict entries from a YAML node.
|
|
Handles top-level lists and dict-wrapped lists, and also drills into
|
|
Ansible blocks ('block', 'rescue', 'always') or any list of dicts.
|
|
"""
|
|
if isinstance(node, list):
|
|
for item in node:
|
|
yield from _iter_task_like_entries(item)
|
|
elif isinstance(node, dict):
|
|
# Consider any dict as a potential task/handler entry.
|
|
yield node
|
|
# Recurse into list-of-dicts values (blocks, etc.)
|
|
for v in node.values():
|
|
if isinstance(v, list) and any(isinstance(x, dict) for x in v):
|
|
yield from _iter_task_like_entries(v)
|
|
|
|
|
|
def iter_task_like_entries(docs: List[Any]) -> Iterable[Dict[str, Any]]:
|
|
for doc in docs:
|
|
yield from _iter_task_like_entries(doc)
|
|
|
|
|
|
def as_str_list(val: Any) -> List[str]:
|
|
"""Normalize a YAML value (string or list) into a list of strings."""
|
|
if val is None:
|
|
return []
|
|
if isinstance(val, str):
|
|
return [val]
|
|
if isinstance(val, list):
|
|
return [str(v) for v in val]
|
|
return [str(val)]
|
|
|
|
|
|
# ---------- Notify extraction helpers ----------
|
|
|
|
# Extract quoted literals inside a string (e.g. from Jinja conditionals)
|
|
_QUOTED_RE = re.compile(r"""(['"])(.+?)\1""")
|
|
_JINJA_TOKEN_RE = re.compile(r"\{\{.*?\}\}")
|
|
|
|
|
|
def _jinja_mixed_to_regex(value: str) -> Optional[re.Pattern]:
|
|
"""
|
|
Turn a string that mixes plain text with Jinja placeholders into a ^...$ regex.
|
|
Example: 'Import {{ folder }} LDIF files' -> r'^Import .+ LDIF files$'
|
|
Returns None if there is no Jinja placeholder.
|
|
"""
|
|
s = value.strip()
|
|
if "{{" not in s or "}}" not in s:
|
|
return None
|
|
parts = re.split(r"(\{\{.*?\}\})", s)
|
|
regex_str = "^" + "".join(
|
|
(".+" if p.startswith("{{") and p.endswith("}}") else re.escape(p))
|
|
for p in parts
|
|
) + "$"
|
|
return re.compile(regex_str)
|
|
|
|
|
|
def _expand_dynamic_notify(value: str) -> List[str]:
|
|
"""
|
|
If 'value' is a Jinja expression like:
|
|
"{{ 'reload system daemon' if cond else 'refresh systemctl service' }}"
|
|
then extract all quoted literals as potential targets.
|
|
Always include the raw value too (in case it is already a plain name).
|
|
"""
|
|
results = []
|
|
s = value.strip()
|
|
if s:
|
|
results.append(s)
|
|
if "{{" in s and "}}" in s:
|
|
for m in _QUOTED_RE.finditer(s):
|
|
literal = m.group(2).strip()
|
|
if literal:
|
|
results.append(literal)
|
|
return results
|
|
|
|
|
|
# ---------- Extraction from handlers/tasks ----------
|
|
|
|
def collect_handler_groups(handler_file: str) -> List[Set[str]]:
|
|
"""
|
|
Build groups of acceptable targets for each handler task from a handlers file.
|
|
For each handler, collect its 'name' and all 'listen' aliases.
|
|
A handler is considered covered if ANY alias in its group is notified.
|
|
"""
|
|
groups: List[Set[str]] = []
|
|
docs = load_yaml_documents(handler_file)
|
|
|
|
for entry in iter_task_like_entries(docs):
|
|
names: Set[str] = set()
|
|
|
|
# primary name
|
|
if isinstance(entry.get("name"), str):
|
|
nm = entry["name"].strip()
|
|
if nm:
|
|
names.add(nm)
|
|
|
|
# listen aliases (string or list)
|
|
if "listen" in entry:
|
|
for item in as_str_list(entry["listen"]):
|
|
item = item.strip()
|
|
if item:
|
|
names.add(item)
|
|
|
|
if names:
|
|
groups.append(names)
|
|
|
|
return groups
|
|
|
|
|
|
def collect_notify_calls_from_tasks(task_file: str) -> Tuple[Set[str], List[re.Pattern]]:
|
|
"""
|
|
From a task file, collect all notification targets via:
|
|
- 'notify:' (string or list), including dynamic Jinja expressions with literals,
|
|
- any occurrence of 'package_notify:' (string or list), anywhere in the task dict.
|
|
Also traverses tasks nested inside 'block', 'rescue', 'always', etc.
|
|
|
|
Returns:
|
|
(exact_names, regex_patterns)
|
|
"""
|
|
notified_exact: Set[str] = set()
|
|
notified_patterns: List[re.Pattern] = []
|
|
docs = load_yaml_documents(task_file)
|
|
|
|
for entry in iter_task_like_entries(docs):
|
|
# Standard notify:
|
|
if "notify" in entry:
|
|
for item in as_str_list(entry["notify"]):
|
|
item_str = item.strip()
|
|
|
|
# Case 1: whole string is just a Jinja expression -> ignore
|
|
if item_str.startswith("{{") and item_str.endswith("}}"):
|
|
continue
|
|
|
|
has_jinja = ("{{" in item_str and "}}" in item_str)
|
|
|
|
# Case 2: expand quoted literals inside Jinja expressions (as exacts)
|
|
if has_jinja:
|
|
# Only take the quoted literals; do NOT add the raw mixed string as exact.
|
|
for m in _QUOTED_RE.finditer(item_str):
|
|
lit = m.group(2).strip()
|
|
if lit:
|
|
notified_exact.add(lit)
|
|
else:
|
|
# No Jinja -> the whole string is an exact name.
|
|
notified_exact.add(item_str)
|
|
|
|
# Case 3: mixed string with Jinja placeholder -> treat as regex
|
|
rx = _jinja_mixed_to_regex(item_str)
|
|
if rx is not None:
|
|
notified_patterns.append(rx)
|
|
|
|
# package_notify anywhere in the task (top-level or nested)
|
|
def walk_for_package_notify(node: Any):
|
|
if isinstance(node, dict):
|
|
for k, v in node.items():
|
|
if k == "package_notify":
|
|
for item in as_str_list(v):
|
|
item_str = item.strip()
|
|
|
|
# Ignore pure Jinja
|
|
if item_str.startswith("{{") and item_str.endswith("}}"):
|
|
continue
|
|
|
|
has_jinja = ("{{" in item_str and "}}" in item_str)
|
|
|
|
if has_jinja:
|
|
# Only quoted literals as exacts
|
|
for m in _QUOTED_RE.finditer(item_str):
|
|
lit = m.group(2).strip()
|
|
if lit:
|
|
notified_exact.add(lit)
|
|
else:
|
|
notified_exact.add(item_str)
|
|
|
|
# mixed -> regex
|
|
rx = _jinja_mixed_to_regex(item_str)
|
|
if rx is not None:
|
|
notified_patterns.append(rx)
|
|
else:
|
|
walk_for_package_notify(v)
|
|
elif isinstance(node, list):
|
|
for v in node:
|
|
walk_for_package_notify(v)
|
|
|
|
walk_for_package_notify(entry)
|
|
|
|
return notified_exact, notified_patterns
|
|
|
|
|
|
# ---------- Test case ----------
|
|
|
|
class TestHandlersInvoked(unittest.TestCase):
|
|
"""
|
|
Ensures:
|
|
(A) Every handler defined in roles/*/handlers/*.yml(.yaml) is referenced at least once
|
|
via tasks' 'notify:' or any 'package_notify:' (exact or regex match from Jinja-mixed strings).
|
|
(B) Every notified target in tasks points to an existing handler alias (name or listen).
|
|
"""
|
|
|
|
def setUp(self):
|
|
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
|
|
self.roles_dir = os.path.join(repo_root, "roles")
|
|
|
|
# Handlers: support .yml and .yaml
|
|
self.handler_files = (
|
|
glob.glob(os.path.join(self.roles_dir, "*/handlers/*.yml"))
|
|
+ glob.glob(os.path.join(self.roles_dir, "*/handlers/*.yaml"))
|
|
)
|
|
|
|
# Tasks: recurse under tasks for both .yml and .yaml
|
|
self.task_files = (
|
|
glob.glob(os.path.join(self.roles_dir, "*", "tasks", "**", "*.yml"), recursive=True)
|
|
+ glob.glob(os.path.join(self.roles_dir, "*", "tasks", "**", "*.yaml"), recursive=True)
|
|
)
|
|
|
|
def test_all_handlers_have_a_notifier_and_all_notifies_have_a_handler(self):
|
|
# 1) Collect handler groups (name + listen) for each handler task
|
|
handler_groups: List[Set[str]] = []
|
|
for hf in self.handler_files:
|
|
handler_groups.extend(collect_handler_groups(hf))
|
|
|
|
# Flatten all handler aliases for reverse checks
|
|
all_aliases: Set[str] = set().union(*handler_groups) if handler_groups else set()
|
|
|
|
# 2) Collect all notified targets (notify + package_notify) from tasks
|
|
notified_exact: Set[str] = set()
|
|
notified_patterns: List[re.Pattern] = []
|
|
for tf in self.task_files:
|
|
ex, pats = collect_notify_calls_from_tasks(tf)
|
|
notified_exact |= ex
|
|
notified_patterns.extend(pats)
|
|
|
|
def group_is_covered(grp: Set[str]) -> bool:
|
|
# exact hit?
|
|
if grp & notified_exact:
|
|
return True
|
|
# regex hit?
|
|
for alias in grp:
|
|
for rx in notified_patterns:
|
|
if rx.match(alias):
|
|
return True
|
|
return False
|
|
|
|
# 3A) Every handler group is covered if any alias is notified (exact or regex)
|
|
missing_groups: List[Set[str]] = [grp for grp in handler_groups if not group_is_covered(grp)]
|
|
|
|
if missing_groups:
|
|
representatives: List[str] = []
|
|
for grp in missing_groups:
|
|
representatives.append(sorted(grp)[0])
|
|
representatives = sorted(set(representatives))
|
|
|
|
msg = [
|
|
"The following handlers are defined but never notified (via 'notify:' or 'package_notify:'):",
|
|
*[f" - {m}" for m in representatives],
|
|
"",
|
|
"Notes:",
|
|
" • A handler is considered covered if *any* of its {name + listen} aliases is notified.",
|
|
" • We support dynamic Jinja notify expressions by extracting quoted literals",
|
|
" and by interpreting mixed strings with Jinja placeholders as wildcard regex.",
|
|
" • Ensure 'notify:' uses the exact handler name, one of its 'listen' aliases,",
|
|
" or a compatible wildcard string that matches the handler via regex.",
|
|
" • If you trigger builds via roles/vars, set 'package_notify:' to the handler name.",
|
|
]
|
|
self.fail("\n".join(msg))
|
|
|
|
# 3B) Reverse validation:
|
|
# Every notified target must resolve to an existing handler alias.
|
|
# - Exact notified strings must match an alias exactly.
|
|
# - Jinja-mixed strings (patterns) must match at least one alias via regex.
|
|
missing_exacts = sorted([s for s in notified_exact if s not in all_aliases])
|
|
|
|
orphan_patterns = sorted({
|
|
rx.pattern for rx in notified_patterns
|
|
if not any(rx.match(alias) for alias in all_aliases)
|
|
})
|
|
|
|
if missing_exacts or orphan_patterns:
|
|
msg = ["Some notify targets do not map to any existing handler:"]
|
|
if missing_exacts:
|
|
msg.append(" • Missing exact handler aliases:")
|
|
msg.extend([f" - {s}" for s in missing_exacts])
|
|
if orphan_patterns:
|
|
msg.append(" • Pattern notifications with no matching handler alias (regex):")
|
|
msg.extend([f" - {pat}" for pat in orphan_patterns])
|
|
msg += [
|
|
"",
|
|
"Hints:",
|
|
" - Make sure a handler with matching 'name' or 'listen' exists.",
|
|
" - Pure Jinja expressions like '{{ some_var }}' are ignored in this check.",
|
|
" - Mixed strings like 'Import {{ folder }} LDIF files' are treated as regex (e.g., '^Import .+ LDIF files$').",
|
|
" - Consider adding a 'listen:' alias to the handler if you want to keep a flexible notify pattern.",
|
|
]
|
|
self.fail("\n".join(msg))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|