diff --git a/cli/build/defaults/applications.py b/cli/build/defaults/applications.py index 704a26e7..d9c12d4c 100644 --- a/cli/build/defaults/applications.py +++ b/cli/build/defaults/applications.py @@ -1,106 +1,109 @@ #!/usr/bin/env python3 - import argparse -import os import yaml import sys +import time from pathlib import Path -plugin_path = Path(__file__).resolve().parent / ".." / ".." / ".." /"lookup_plugins" +# Ensure project root on PYTHONPATH so utils is importable +repo_root = Path(__file__).resolve().parent.parent.parent.parent +sys.path.insert(0, str(repo_root)) + +# Add lookup_plugins for application_gid +plugin_path = repo_root / "lookup_plugins" sys.path.insert(0, str(plugin_path)) +from utils.dict_renderer import DictRenderer from application_gid import LookupModule -def load_yaml_file(path): - """Load a YAML file if it exists, otherwise return an empty dict.""" +def load_yaml_file(path: Path) -> dict: if not path.exists(): return {} with path.open("r", encoding="utf-8") as f: return yaml.safe_load(f) or {} +class DefaultsGenerator: + def __init__(self, roles_dir: Path, output_file: Path, verbose: bool, timeout: float): + self.roles_dir = roles_dir + self.output_file = output_file + self.verbose = verbose + self.renderer = DictRenderer(verbose=verbose, timeout=timeout) + self.gid_lookup = LookupModule() -def main(): - parser = argparse.ArgumentParser( - description="Generate defaults_applications YAML from docker roles and include users meta data for each role." - ) - parser.add_argument( - "--roles-dir", - help="Path to the roles directory (default: roles)" - ) - parser.add_argument( - "--output-file", - help="Path to output YAML file" - ) + def log(self, message: str): + if self.verbose: + print(f"[DefaultsGenerator] {message}") + + def run(self): + result = {"defaults_applications": {}} + + for role_dir in sorted(self.roles_dir.iterdir()): + role_name = role_dir.name + vars_main = role_dir / "vars" / "main.yml" + config_file = role_dir / "config" / "main.yml" + + if not vars_main.exists(): + self.log(f"Skipping {role_name}: vars/main.yml missing") + continue + + vars_data = load_yaml_file(vars_main) + application_id = vars_data.get("application_id") + if not application_id: + self.log(f"Skipping {role_name}: application_id not defined") + continue + + if not config_file.exists(): + self.log(f"Skipping {role_name}: config/main.yml missing") + continue + + config_data = load_yaml_file(config_file) + if config_data: + try: + gid_number = self.gid_lookup.run([application_id], roles_dir=str(self.roles_dir))[0] + except Exception as e: + print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr) + sys.exit(1) + + config_data["group_id"] = gid_number + result["defaults_applications"][application_id] = config_data + + # Inject users mapping as Jinja2 references + users_meta = load_yaml_file(role_dir / "users" / "main.yml") + users_data = users_meta.get("users", {}) + transformed = {user: f"{{{{ users[\"{user}\"] }}}}" for user in users_data} + if transformed: + result["defaults_applications"][application_id]["users"] = transformed + + # Render placeholders in entire result context + self.log("Starting placeholder rendering...") + try: + result = self.renderer.render(result) + except Exception as e: + print(f"Error during rendering: {e}", file=sys.stderr) + sys.exit(1) + + # Write output + self.output_file.parent.mkdir(parents=True, exist_ok=True) + with self.output_file.open("w", encoding="utf-8") as f: + yaml.dump(result, f, sort_keys=False) + + # Print location of generated file (absolute if not under cwd) + try: + rel = self.output_file.relative_to(Path.cwd()) + except ValueError: + rel = self.output_file + print(f"✅ Generated: {rel}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Generate defaults_applications YAML...") + parser.add_argument("--roles-dir", default="roles", help="Path to the roles directory") + parser.add_argument("--output-file", required=True, help="Path to output YAML file") + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + parser.add_argument("--timeout", type=float, default=10.0, help="Timeout for rendering") args = parser.parse_args() cwd = Path.cwd() roles_dir = (cwd / args.roles_dir).resolve() output_file = (cwd / args.output_file).resolve() - # Ensure output directory exists - output_file.parent.mkdir(parents=True, exist_ok=True) - # Initialize result structure - result = {"defaults_applications": {}} - - gid_lookup = LookupModule() - # Process each role for application configs - for role_dir in sorted(roles_dir.iterdir()): - role_name = role_dir.name - vars_main = role_dir / "vars" / "main.yml" - config_file = role_dir / "config" / "main.yml" - - if not vars_main.exists(): - print(f"[!] Skipping {role_name}: vars/main.yml missing") - continue - - vars_data = load_yaml_file(vars_main) - try: - application_id = vars_data.get("application_id") - except Exception as e: - print( - f"Warning: failed to read application_id from {vars_main}\nException: {e}", - file=sys.stderr - ) - sys.exit(1) - - if not application_id: - print(f"[!] Skipping {role_name}: application_id not defined in vars/main.yml") - continue - - if not config_file.exists(): - print(f"[!] Skipping {role_name}: config/main.yml missing") - continue - - config_data = load_yaml_file(config_file) - if config_data: - try: - gid_number = gid_lookup.run([application_id], roles_dir=str(roles_dir))[0] - except Exception as e: - print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr) - sys.exit(1) - config_data["group_id"] = gid_number - result["defaults_applications"][application_id] = config_data - users_meta_file = role_dir / "users" / "main.yml" - transformed_users = {} - if users_meta_file.exists(): - users_meta = load_yaml_file(users_meta_file) - users_data = users_meta.get("users", {}) - for user, role_user_attrs in users_data.items(): - transformed_users[user] = f"{{{{ users[\"{user}\"] }}}}" - - # Attach transformed users under each application - if transformed_users: - result["defaults_applications"][application_id]["users"] = transformed_users - - # Write out result YAML - with output_file.open("w", encoding="utf-8") as f: - yaml.dump(result, f, sort_keys=False) - - try: - print(f"✅ Generated: {output_file.relative_to(cwd)}") - except ValueError: - print(f"✅ Generated: {output_file}") - - -if __name__ == "__main__": - main() + DefaultsGenerator(roles_dir, output_file, args.verbose, args.timeout).run() \ No newline at end of file diff --git a/roles/web-svc-asset/config/main.yml b/roles/web-svc-asset/config/main.yml index 914b3185..8798c9b3 100644 --- a/roles/web-svc-asset/config/main.yml +++ b/roles/web-svc-asset/config/main.yml @@ -1,2 +1,2 @@ source_directory: "{{ playbook_dir }}/assets" -url: "{{ web_protocol ~ '://' ~ 'files.' ~ primary_domain ~ '/assets' }}" \ No newline at end of file +url: "{{ web_protocol }}://<< defaults_applications['web-svc-file']domains.canonical[0] >>/assets" \ No newline at end of file diff --git a/templates/roles/web-app/config/README.md.j2 b/templates/roles/web-app/config/README.md.j2 index 16b6728c..2ea691fb 100644 --- a/templates/roles/web-app/config/README.md.j2 +++ b/templates/roles/web-app/config/README.md.j2 @@ -9,3 +9,15 @@ applications: variable_b: {} # Merges with the existing content variable_c: [] # Replaces the default value (use caution with domains) ``` + +## Placeholder Logic with `<< >>` + +You can reference values from the generated `defaults_applications` dictionary at build time by embedding `<< ... >>` placeholders inside your template. For example: + +```yaml +url: "{{ web_protocol }}://<< defaults_applications.web-svc-file.domains.canonical[0] >>/assets" +``` + +- The `<< ... >>` placeholders are resolved by the [`DictRenderer`](../../../utils/dict_renderer.py) helper class. +- The CLI uses the [`DefaultsGenerator`](../../../cli/build/defaults/applications.py) class to merge all role configurations into a single YAML and then calls the renderer to substitute each `<< ... >>` occurrence. +- Use the `--verbose` flag on the CLI script to log every replacement step, and rely on the built‑in timeout (default: 10 seconds) to prevent infinite loops. diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/utils/test_dict_renderer.py b/tests/unit/utils/test_dict_renderer.py new file mode 100644 index 00000000..5072ea05 --- /dev/null +++ b/tests/unit/utils/test_dict_renderer.py @@ -0,0 +1,86 @@ +import unittest +from utils.dict_renderer import DictRenderer + +class TestDictRenderer(unittest.TestCase): + def setUp(self): + # Timeout is small for tests, verbose off + self.renderer = DictRenderer(verbose=False, timeout=1.0) + + def test_simple_replacement(self): + data = {"foo": "bar", "val": "<>"} + rendered = self.renderer.render(data) + self.assertEqual(rendered["val"], "bar") + + def test_nested_replacement(self): + data = {"parent": {"child": "value"}, "ref": "<>"} + rendered = self.renderer.render(data) + self.assertEqual(rendered["ref"], "value") + + def test_list_index(self): + data = {"lst": [10, 20, 30], "second": "<>"} + rendered = self.renderer.render(data) + self.assertEqual(rendered["second"], "20") + + def test_multi_pass(self): + data = {"a": "<>", "b": "<>", "c": "final"} + rendered = self.renderer.render(data) + self.assertEqual(rendered["a"], "final") + + def test_unresolved_raises(self): + data = {"a": "<>"} + with self.assertRaises(ValueError) as cm: + self.renderer.render(data) + self.assertIn("missing", str(cm.exception)) + + def test_leave_curly(self): + data = {"tmpl": "{{ not touched }}"} + rendered = self.renderer.render(data) + self.assertEqual(rendered["tmpl"], "{{ not touched }}") + + def test_mixed_braces(self): + data = {"foo": "bar", "tmpl": "{{ <> }}"} + rendered = self.renderer.render(data) + self.assertEqual(rendered["tmpl"], "{{ bar }}") + + def test_single_quoted_key(self): + # ['foo-bar'] should resolve the key 'foo-bar' + data = { + "foo-bar": {"val": "xyz"}, + "result": "<<['foo-bar'].val>>" + } + rendered = self.renderer.render(data) + self.assertEqual(rendered["result"], "xyz") + + def test_double_quoted_key(self): + # ["foo-bar"] should also resolve the key 'foo-bar' + data = { + "foo-bar": {"val": 123}, + "result": '<<["foo-bar"].val>>' + } + rendered = self.renderer.render(data) + self.assertEqual(rendered["result"], "123") + + def test_mixed_bracket_and_dot_with_index(self): + # Combine quoted key, dot access and numeric index + data = { + "web-svc-file": { + "domains": { + "canonical": ["file.example.com"] + } + }, + "url": '<<[\'web-svc-file\'].domains.canonical[0]>>' + } + rendered = self.renderer.render(data) + self.assertEqual(rendered["url"], "file.example.com") + + def test_double_quoted_key_with_list_index(self): + # Double-quoted key and list index together + data = { + "my-list": [ "a", "b", "c" ], + "pick": '<<["my-list"][2]>>' + } + rendered = self.renderer.render(data) + self.assertEqual(rendered["pick"], "c") + +if __name__ == "__main__": + unittest.main() diff --git a/utils/dict_renderer.py b/utils/dict_renderer.py new file mode 100644 index 00000000..f2869267 --- /dev/null +++ b/utils/dict_renderer.py @@ -0,0 +1,119 @@ +import re +import time +from typing import Any, Dict, Union, List, Set + +class DictRenderer: + """ + Resolves placeholders in the form << path >> within nested dictionaries, + supporting hyphens, numeric list indexing, and quoted keys via ['key'] or ["key"]. + """ + # Match << path >> where path contains no whitespace or closing > + PATTERN = re.compile(r"<<\s*(?P[^\s>]+)\s*>>") + # Tokenizes a path into unquoted keys, single-quoted, double-quoted keys, or numeric indices + TOKEN_REGEX = re.compile( + r"(?P[\w\-]+)" + r"|\['(?P[^']+)'\]" + r"|\[\"(?P[^\"]+)\"\]" + r"|\[(?P\d+)\]" + ) + + def __init__(self, verbose: bool = False, timeout: float = 10.0): + self.verbose = verbose + self.timeout = timeout + + def render(self, data: Union[Dict[str, Any], List[Any]]) -> Union[Dict[str, Any], List[Any]]: + start = time.monotonic() + self.root = data + rendered = data + pass_num = 0 + + while True: + pass_num += 1 + if self.verbose: + print(f"[DictRenderer] Pass {pass_num} starting...") + rendered, changed = self._render_pass(rendered) + if not changed: + if self.verbose: + print(f"[DictRenderer] No more placeholders after pass {pass_num}.") + break + if time.monotonic() - start > self.timeout: + raise TimeoutError(f"Rendering exceeded timeout of {self.timeout} seconds") + + # After all passes, raise error on unresolved placeholders + unresolved = self.find_unresolved(rendered) + if unresolved: + raise ValueError(f"Unresolved placeholders: {', '.join(sorted(unresolved))}") + + return rendered + + def _render_pass(self, obj: Any) -> (Any, bool): + if isinstance(obj, dict): + new = {} + changed = False + for k, v in obj.items(): + nv, ch = self._render_pass(v) + new[k] = nv + changed = changed or ch + return new, changed + if isinstance(obj, list): + new_list = [] + changed = False + for item in obj: + ni, ch = self._render_pass(item) + new_list.append(ni) + changed = changed or ch + return new_list, changed + if isinstance(obj, str): + def repl(m): + path = m.group('path') + val = self._lookup(path) + if val is not None: + if self.verbose: + print(f"[DictRenderer] Resolving <<{path}>> -> {val}") + return str(val) + return m.group(0) + new_str = self.PATTERN.sub(repl, obj) + return new_str, new_str != obj + return obj, False + + def _lookup(self, path: str) -> Any: + current = self.root + for m in self.TOKEN_REGEX.finditer(path): + if m.group('key') is not None: + if isinstance(current, dict): + current = current.get(m.group('key')) + else: + return None + elif m.group('qkey') is not None: + if isinstance(current, dict): + current = current.get(m.group('qkey')) + else: + return None + elif m.group('dkey') is not None: + if isinstance(current, dict): + current = current.get(m.group('dkey')) + else: + return None + elif m.group('idx') is not None: + idx = int(m.group('idx')) + if isinstance(current, list) and 0 <= idx < len(current): + current = current[idx] + else: + return None + if current is None: + return None + return current + + def find_unresolved(self, data: Any) -> Set[str]: + """Return all paths of unresolved << placeholders in data.""" + unresolved: Set[str] = set() + if isinstance(data, dict): + for v in data.values(): + unresolved |= self.find_unresolved(v) + elif isinstance(data, list): + for item in data: + unresolved |= self.find_unresolved(item) + elif isinstance(data, str): + for m in self.PATTERN.finditer(data): + unresolved.add(m.group('path')) + return unresolved