Added recursion test for group_vars

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-16 22:31:48 +02:00
parent f4d1f2a303
commit f263992393
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E

View File

@ -0,0 +1,124 @@
import re
import unittest
import yaml
from pathlib import Path
from collections import defaultdict
# Directory containing group_vars/all/*.yml
GROUPVARS_DIR = Path(__file__).resolve().parents[3] / "group_vars" / "all"
JINJA_RE = re.compile(r"{{\s*([^}]+)\s*}}")
# Matches variables like foo.bar, foo["bar"], foo['bar']
VAR_PATTERN = re.compile(r"[A-Za-z_][A-Za-z0-9_]*(?:\.(?:[A-Za-z_][A-Za-z0-9_]*|\[\"[^\"]+\"\]))*")
def load_all_yaml():
"""
Load and merge all YAML files under GROUPVARS_DIR, stripping 'defaults_' or 'default_' prefixes.
"""
result = {}
for yaml_path in GROUPVARS_DIR.glob("*.yml"):
with open(yaml_path, encoding="utf-8") as fh:
data = yaml.safe_load(fh) or {}
for k, v in data.items():
base = k
for p in ("defaults_", "default_"):
if base.startswith(p):
base = base[len(p):]
if base in result and isinstance(result[base], dict) and isinstance(v, dict):
result[base].update(v)
else:
result[base] = v
return result
def find_jinja_refs(val):
"""
Find all unconditional Jinja variable paths inside {{}} (including bracket-notation).
Skip any expression containing ' if ' and ' else '.
"""
refs = []
if not isinstance(val, str):
return refs
for inner in JINJA_RE.findall(val):
expr = inner.strip()
if " if " in expr and " else " in expr:
continue
for m in VAR_PATTERN.finditer(expr):
var = m.group(0)
# normalize bracket notation foo["bar"] -> foo.bar
var = re.sub(r"\[\"([^\"]+)\"\]", r".\1", var)
var = re.sub(r"\['([^']+)'\]", r".\1", var)
refs.append(var)
return refs
def build_edges(vars_dict):
"""
Walk the variables dict, return list of (source_key, referenced_var) edges.
"""
edges = []
def walk(node, path):
if isinstance(node, dict):
for k, v in node.items():
walk(v, path + [k])
elif isinstance(node, list):
for i, e in enumerate(node):
walk(e, path + [f"[{i}]"])
else:
full_key = ".".join(path)
for ref in find_jinja_refs(node):
edges.append((full_key, ref))
walk(vars_dict, [])
return edges
class TestNoJinjaReferenceCycles(unittest.TestCase):
def test_users_applications_cycle(self):
all_vars = load_all_yaml()
edges = build_edges(all_vars)
user_to_app = any(
src.startswith("users.") and ref == "applications"
for src, ref in edges
)
app_to_user = any(
src.startswith("applications.") and ref.startswith("users.")
for src, ref in edges
)
if user_to_app and app_to_user:
self.fail(
"❌ Indirect Jinja-cycle detected:\n"
" a) a `users.*` key references `applications`\n"
" b) an `applications.*` key references `users.*`\n"
"→ Combined this forms a cycle users → applications → users"
)
def test_no_unconditional_recursive_loops(self):
all_vars = load_all_yaml()
edges = build_edges(all_vars)
graph = defaultdict(set)
for src, ref in edges:
graph[src].add(ref)
def dfs(node, visited, stack):
if node in stack:
return stack[stack.index(node):] + [node]
if node in visited:
return None
visited.add(node)
stack.append(node)
for nxt in graph.get(node, []):
cycle = dfs(nxt, visited, stack)
if cycle:
return cycle
stack.pop()
return None
for node in list(graph):
cycle = dfs(node, set(), [])
if cycle:
self.fail("❌ Jinja recursion cycle detected:\n " + " -> ".join(cycle))
if __name__ == "__main__":
unittest.main()