mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-07-17 22:14:25 +02:00
Implemented dict renderer to resolve assets
This commit is contained in:
parent
4f5afa1220
commit
f07557c322
@ -1,106 +1,109 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import os
|
|
||||||
import yaml
|
import yaml
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
plugin_path = Path(__file__).resolve().parent / ".." / ".." / ".." /"lookup_plugins"
|
# Ensure project root on PYTHONPATH so utils is importable
|
||||||
|
repo_root = Path(__file__).resolve().parent.parent.parent.parent
|
||||||
|
sys.path.insert(0, str(repo_root))
|
||||||
|
|
||||||
|
# Add lookup_plugins for application_gid
|
||||||
|
plugin_path = repo_root / "lookup_plugins"
|
||||||
sys.path.insert(0, str(plugin_path))
|
sys.path.insert(0, str(plugin_path))
|
||||||
|
|
||||||
|
from utils.dict_renderer import DictRenderer
|
||||||
from application_gid import LookupModule
|
from application_gid import LookupModule
|
||||||
|
|
||||||
def load_yaml_file(path):
|
def load_yaml_file(path: Path) -> dict:
|
||||||
"""Load a YAML file if it exists, otherwise return an empty dict."""
|
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
return {}
|
return {}
|
||||||
with path.open("r", encoding="utf-8") as f:
|
with path.open("r", encoding="utf-8") as f:
|
||||||
return yaml.safe_load(f) or {}
|
return yaml.safe_load(f) or {}
|
||||||
|
|
||||||
|
class DefaultsGenerator:
|
||||||
|
def __init__(self, roles_dir: Path, output_file: Path, verbose: bool, timeout: float):
|
||||||
|
self.roles_dir = roles_dir
|
||||||
|
self.output_file = output_file
|
||||||
|
self.verbose = verbose
|
||||||
|
self.renderer = DictRenderer(verbose=verbose, timeout=timeout)
|
||||||
|
self.gid_lookup = LookupModule()
|
||||||
|
|
||||||
def main():
|
def log(self, message: str):
|
||||||
parser = argparse.ArgumentParser(
|
if self.verbose:
|
||||||
description="Generate defaults_applications YAML from docker roles and include users meta data for each role."
|
print(f"[DefaultsGenerator] {message}")
|
||||||
)
|
|
||||||
parser.add_argument(
|
def run(self):
|
||||||
"--roles-dir",
|
result = {"defaults_applications": {}}
|
||||||
help="Path to the roles directory (default: roles)"
|
|
||||||
)
|
for role_dir in sorted(self.roles_dir.iterdir()):
|
||||||
parser.add_argument(
|
role_name = role_dir.name
|
||||||
"--output-file",
|
vars_main = role_dir / "vars" / "main.yml"
|
||||||
help="Path to output YAML file"
|
config_file = role_dir / "config" / "main.yml"
|
||||||
)
|
|
||||||
|
if not vars_main.exists():
|
||||||
|
self.log(f"Skipping {role_name}: vars/main.yml missing")
|
||||||
|
continue
|
||||||
|
|
||||||
|
vars_data = load_yaml_file(vars_main)
|
||||||
|
application_id = vars_data.get("application_id")
|
||||||
|
if not application_id:
|
||||||
|
self.log(f"Skipping {role_name}: application_id not defined")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not config_file.exists():
|
||||||
|
self.log(f"Skipping {role_name}: config/main.yml missing")
|
||||||
|
continue
|
||||||
|
|
||||||
|
config_data = load_yaml_file(config_file)
|
||||||
|
if config_data:
|
||||||
|
try:
|
||||||
|
gid_number = self.gid_lookup.run([application_id], roles_dir=str(self.roles_dir))[0]
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
config_data["group_id"] = gid_number
|
||||||
|
result["defaults_applications"][application_id] = config_data
|
||||||
|
|
||||||
|
# Inject users mapping as Jinja2 references
|
||||||
|
users_meta = load_yaml_file(role_dir / "users" / "main.yml")
|
||||||
|
users_data = users_meta.get("users", {})
|
||||||
|
transformed = {user: f"{{{{ users[\"{user}\"] }}}}" for user in users_data}
|
||||||
|
if transformed:
|
||||||
|
result["defaults_applications"][application_id]["users"] = transformed
|
||||||
|
|
||||||
|
# Render placeholders in entire result context
|
||||||
|
self.log("Starting placeholder rendering...")
|
||||||
|
try:
|
||||||
|
result = self.renderer.render(result)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during rendering: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Write output
|
||||||
|
self.output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with self.output_file.open("w", encoding="utf-8") as f:
|
||||||
|
yaml.dump(result, f, sort_keys=False)
|
||||||
|
|
||||||
|
# Print location of generated file (absolute if not under cwd)
|
||||||
|
try:
|
||||||
|
rel = self.output_file.relative_to(Path.cwd())
|
||||||
|
except ValueError:
|
||||||
|
rel = self.output_file
|
||||||
|
print(f"✅ Generated: {rel}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Generate defaults_applications YAML...")
|
||||||
|
parser.add_argument("--roles-dir", default="roles", help="Path to the roles directory")
|
||||||
|
parser.add_argument("--output-file", required=True, help="Path to output YAML file")
|
||||||
|
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
|
||||||
|
parser.add_argument("--timeout", type=float, default=10.0, help="Timeout for rendering")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
cwd = Path.cwd()
|
cwd = Path.cwd()
|
||||||
roles_dir = (cwd / args.roles_dir).resolve()
|
roles_dir = (cwd / args.roles_dir).resolve()
|
||||||
output_file = (cwd / args.output_file).resolve()
|
output_file = (cwd / args.output_file).resolve()
|
||||||
# Ensure output directory exists
|
|
||||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
# Initialize result structure
|
DefaultsGenerator(roles_dir, output_file, args.verbose, args.timeout).run()
|
||||||
result = {"defaults_applications": {}}
|
|
||||||
|
|
||||||
gid_lookup = LookupModule()
|
|
||||||
# Process each role for application configs
|
|
||||||
for role_dir in sorted(roles_dir.iterdir()):
|
|
||||||
role_name = role_dir.name
|
|
||||||
vars_main = role_dir / "vars" / "main.yml"
|
|
||||||
config_file = role_dir / "config" / "main.yml"
|
|
||||||
|
|
||||||
if not vars_main.exists():
|
|
||||||
print(f"[!] Skipping {role_name}: vars/main.yml missing")
|
|
||||||
continue
|
|
||||||
|
|
||||||
vars_data = load_yaml_file(vars_main)
|
|
||||||
try:
|
|
||||||
application_id = vars_data.get("application_id")
|
|
||||||
except Exception as e:
|
|
||||||
print(
|
|
||||||
f"Warning: failed to read application_id from {vars_main}\nException: {e}",
|
|
||||||
file=sys.stderr
|
|
||||||
)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if not application_id:
|
|
||||||
print(f"[!] Skipping {role_name}: application_id not defined in vars/main.yml")
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not config_file.exists():
|
|
||||||
print(f"[!] Skipping {role_name}: config/main.yml missing")
|
|
||||||
continue
|
|
||||||
|
|
||||||
config_data = load_yaml_file(config_file)
|
|
||||||
if config_data:
|
|
||||||
try:
|
|
||||||
gid_number = gid_lookup.run([application_id], roles_dir=str(roles_dir))[0]
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
config_data["group_id"] = gid_number
|
|
||||||
result["defaults_applications"][application_id] = config_data
|
|
||||||
users_meta_file = role_dir / "users" / "main.yml"
|
|
||||||
transformed_users = {}
|
|
||||||
if users_meta_file.exists():
|
|
||||||
users_meta = load_yaml_file(users_meta_file)
|
|
||||||
users_data = users_meta.get("users", {})
|
|
||||||
for user, role_user_attrs in users_data.items():
|
|
||||||
transformed_users[user] = f"{{{{ users[\"{user}\"] }}}}"
|
|
||||||
|
|
||||||
# Attach transformed users under each application
|
|
||||||
if transformed_users:
|
|
||||||
result["defaults_applications"][application_id]["users"] = transformed_users
|
|
||||||
|
|
||||||
# Write out result YAML
|
|
||||||
with output_file.open("w", encoding="utf-8") as f:
|
|
||||||
yaml.dump(result, f, sort_keys=False)
|
|
||||||
|
|
||||||
try:
|
|
||||||
print(f"✅ Generated: {output_file.relative_to(cwd)}")
|
|
||||||
except ValueError:
|
|
||||||
print(f"✅ Generated: {output_file}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,2 +1,2 @@
|
|||||||
source_directory: "{{ playbook_dir }}/assets"
|
source_directory: "{{ playbook_dir }}/assets"
|
||||||
url: "{{ web_protocol ~ '://' ~ 'files.' ~ primary_domain ~ '/assets' }}"
|
url: "{{ web_protocol }}://<< defaults_applications['web-svc-file']domains.canonical[0] >>/assets"
|
@ -9,3 +9,15 @@ applications:
|
|||||||
variable_b: {} # Merges with the existing content
|
variable_b: {} # Merges with the existing content
|
||||||
variable_c: [] # Replaces the default value (use caution with domains)
|
variable_c: [] # Replaces the default value (use caution with domains)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Placeholder Logic with `<< >>`
|
||||||
|
|
||||||
|
You can reference values from the generated `defaults_applications` dictionary at build time by embedding `<< ... >>` placeholders inside your template. For example:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
url: "{{ web_protocol }}://<< defaults_applications.web-svc-file.domains.canonical[0] >>/assets"
|
||||||
|
```
|
||||||
|
|
||||||
|
- The `<< ... >>` placeholders are resolved by the [`DictRenderer`](../../../utils/dict_renderer.py) helper class.
|
||||||
|
- The CLI uses the [`DefaultsGenerator`](../../../cli/build/defaults/applications.py) class to merge all role configurations into a single YAML and then calls the renderer to substitute each `<< ... >>` occurrence.
|
||||||
|
- Use the `--verbose` flag on the CLI script to log every replacement step, and rely on the built‑in timeout (default: 10 seconds) to prevent infinite loops.
|
||||||
|
0
tests/unit/utils/__init__.py
Normal file
0
tests/unit/utils/__init__.py
Normal file
86
tests/unit/utils/test_dict_renderer.py
Normal file
86
tests/unit/utils/test_dict_renderer.py
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
import unittest
|
||||||
|
from utils.dict_renderer import DictRenderer
|
||||||
|
|
||||||
|
class TestDictRenderer(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
# Timeout is small for tests, verbose off
|
||||||
|
self.renderer = DictRenderer(verbose=False, timeout=1.0)
|
||||||
|
|
||||||
|
def test_simple_replacement(self):
|
||||||
|
data = {"foo": "bar", "val": "<<foo>>"}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["val"], "bar")
|
||||||
|
|
||||||
|
def test_nested_replacement(self):
|
||||||
|
data = {"parent": {"child": "value"}, "ref": "<<parent.child>>"}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["ref"], "value")
|
||||||
|
|
||||||
|
def test_list_index(self):
|
||||||
|
data = {"lst": [10, 20, 30], "second": "<<lst[1]>>"}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["second"], "20")
|
||||||
|
|
||||||
|
def test_multi_pass(self):
|
||||||
|
data = {"a": "<<b>>", "b": "<<c>>", "c": "final"}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["a"], "final")
|
||||||
|
|
||||||
|
def test_unresolved_raises(self):
|
||||||
|
data = {"a": "<<missing>>"}
|
||||||
|
with self.assertRaises(ValueError) as cm:
|
||||||
|
self.renderer.render(data)
|
||||||
|
self.assertIn("missing", str(cm.exception))
|
||||||
|
|
||||||
|
def test_leave_curly(self):
|
||||||
|
data = {"tmpl": "{{ not touched }}"}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["tmpl"], "{{ not touched }}")
|
||||||
|
|
||||||
|
def test_mixed_braces(self):
|
||||||
|
data = {"foo": "bar", "tmpl": "{{ <<foo>> }}"}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["tmpl"], "{{ bar }}")
|
||||||
|
|
||||||
|
def test_single_quoted_key(self):
|
||||||
|
# ['foo-bar'] should resolve the key 'foo-bar'
|
||||||
|
data = {
|
||||||
|
"foo-bar": {"val": "xyz"},
|
||||||
|
"result": "<<['foo-bar'].val>>"
|
||||||
|
}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["result"], "xyz")
|
||||||
|
|
||||||
|
def test_double_quoted_key(self):
|
||||||
|
# ["foo-bar"] should also resolve the key 'foo-bar'
|
||||||
|
data = {
|
||||||
|
"foo-bar": {"val": 123},
|
||||||
|
"result": '<<["foo-bar"].val>>'
|
||||||
|
}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["result"], "123")
|
||||||
|
|
||||||
|
def test_mixed_bracket_and_dot_with_index(self):
|
||||||
|
# Combine quoted key, dot access and numeric index
|
||||||
|
data = {
|
||||||
|
"web-svc-file": {
|
||||||
|
"domains": {
|
||||||
|
"canonical": ["file.example.com"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"url": '<<[\'web-svc-file\'].domains.canonical[0]>>'
|
||||||
|
}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["url"], "file.example.com")
|
||||||
|
|
||||||
|
def test_double_quoted_key_with_list_index(self):
|
||||||
|
# Double-quoted key and list index together
|
||||||
|
data = {
|
||||||
|
"my-list": [ "a", "b", "c" ],
|
||||||
|
"pick": '<<["my-list"][2]>>'
|
||||||
|
}
|
||||||
|
rendered = self.renderer.render(data)
|
||||||
|
self.assertEqual(rendered["pick"], "c")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
119
utils/dict_renderer.py
Normal file
119
utils/dict_renderer.py
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
import re
|
||||||
|
import time
|
||||||
|
from typing import Any, Dict, Union, List, Set
|
||||||
|
|
||||||
|
class DictRenderer:
|
||||||
|
"""
|
||||||
|
Resolves placeholders in the form << path >> within nested dictionaries,
|
||||||
|
supporting hyphens, numeric list indexing, and quoted keys via ['key'] or ["key"].
|
||||||
|
"""
|
||||||
|
# Match << path >> where path contains no whitespace or closing >
|
||||||
|
PATTERN = re.compile(r"<<\s*(?P<path>[^\s>]+)\s*>>")
|
||||||
|
# Tokenizes a path into unquoted keys, single-quoted, double-quoted keys, or numeric indices
|
||||||
|
TOKEN_REGEX = re.compile(
|
||||||
|
r"(?P<key>[\w\-]+)"
|
||||||
|
r"|\['(?P<qkey>[^']+)'\]"
|
||||||
|
r"|\[\"(?P<dkey>[^\"]+)\"\]"
|
||||||
|
r"|\[(?P<idx>\d+)\]"
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, verbose: bool = False, timeout: float = 10.0):
|
||||||
|
self.verbose = verbose
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
def render(self, data: Union[Dict[str, Any], List[Any]]) -> Union[Dict[str, Any], List[Any]]:
|
||||||
|
start = time.monotonic()
|
||||||
|
self.root = data
|
||||||
|
rendered = data
|
||||||
|
pass_num = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
pass_num += 1
|
||||||
|
if self.verbose:
|
||||||
|
print(f"[DictRenderer] Pass {pass_num} starting...")
|
||||||
|
rendered, changed = self._render_pass(rendered)
|
||||||
|
if not changed:
|
||||||
|
if self.verbose:
|
||||||
|
print(f"[DictRenderer] No more placeholders after pass {pass_num}.")
|
||||||
|
break
|
||||||
|
if time.monotonic() - start > self.timeout:
|
||||||
|
raise TimeoutError(f"Rendering exceeded timeout of {self.timeout} seconds")
|
||||||
|
|
||||||
|
# After all passes, raise error on unresolved placeholders
|
||||||
|
unresolved = self.find_unresolved(rendered)
|
||||||
|
if unresolved:
|
||||||
|
raise ValueError(f"Unresolved placeholders: {', '.join(sorted(unresolved))}")
|
||||||
|
|
||||||
|
return rendered
|
||||||
|
|
||||||
|
def _render_pass(self, obj: Any) -> (Any, bool):
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
new = {}
|
||||||
|
changed = False
|
||||||
|
for k, v in obj.items():
|
||||||
|
nv, ch = self._render_pass(v)
|
||||||
|
new[k] = nv
|
||||||
|
changed = changed or ch
|
||||||
|
return new, changed
|
||||||
|
if isinstance(obj, list):
|
||||||
|
new_list = []
|
||||||
|
changed = False
|
||||||
|
for item in obj:
|
||||||
|
ni, ch = self._render_pass(item)
|
||||||
|
new_list.append(ni)
|
||||||
|
changed = changed or ch
|
||||||
|
return new_list, changed
|
||||||
|
if isinstance(obj, str):
|
||||||
|
def repl(m):
|
||||||
|
path = m.group('path')
|
||||||
|
val = self._lookup(path)
|
||||||
|
if val is not None:
|
||||||
|
if self.verbose:
|
||||||
|
print(f"[DictRenderer] Resolving <<{path}>> -> {val}")
|
||||||
|
return str(val)
|
||||||
|
return m.group(0)
|
||||||
|
new_str = self.PATTERN.sub(repl, obj)
|
||||||
|
return new_str, new_str != obj
|
||||||
|
return obj, False
|
||||||
|
|
||||||
|
def _lookup(self, path: str) -> Any:
|
||||||
|
current = self.root
|
||||||
|
for m in self.TOKEN_REGEX.finditer(path):
|
||||||
|
if m.group('key') is not None:
|
||||||
|
if isinstance(current, dict):
|
||||||
|
current = current.get(m.group('key'))
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
elif m.group('qkey') is not None:
|
||||||
|
if isinstance(current, dict):
|
||||||
|
current = current.get(m.group('qkey'))
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
elif m.group('dkey') is not None:
|
||||||
|
if isinstance(current, dict):
|
||||||
|
current = current.get(m.group('dkey'))
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
elif m.group('idx') is not None:
|
||||||
|
idx = int(m.group('idx'))
|
||||||
|
if isinstance(current, list) and 0 <= idx < len(current):
|
||||||
|
current = current[idx]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
if current is None:
|
||||||
|
return None
|
||||||
|
return current
|
||||||
|
|
||||||
|
def find_unresolved(self, data: Any) -> Set[str]:
|
||||||
|
"""Return all paths of unresolved << placeholders in data."""
|
||||||
|
unresolved: Set[str] = set()
|
||||||
|
if isinstance(data, dict):
|
||||||
|
for v in data.values():
|
||||||
|
unresolved |= self.find_unresolved(v)
|
||||||
|
elif isinstance(data, list):
|
||||||
|
for item in data:
|
||||||
|
unresolved |= self.find_unresolved(item)
|
||||||
|
elif isinstance(data, str):
|
||||||
|
for m in self.PATTERN.finditer(data):
|
||||||
|
unresolved.add(m.group('path'))
|
||||||
|
return unresolved
|
Loading…
x
Reference in New Issue
Block a user