Optimized get paths and applications generation

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-15 22:48:59 +02:00
parent 54490faca7
commit c8054ffbc3
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
2 changed files with 77 additions and 34 deletions

View File

@ -1,6 +1,6 @@
# filter_plugins/get_app_conf.py import os
import re import re
import yaml
from ansible.errors import AnsibleFilterError from ansible.errors import AnsibleFilterError
class AppConfigKeyError(AnsibleFilterError, ValueError): class AppConfigKeyError(AnsibleFilterError, ValueError):
@ -10,8 +10,36 @@ class AppConfigKeyError(AnsibleFilterError, ValueError):
""" """
pass pass
class ConfigEntryNotSetError(AppConfigKeyError):
"""
Raised when a config entry is defined in schema but not set in application.
"""
pass
def get_app_conf(applications, application_id, config_path, strict=True, default=None): def get_app_conf(applications, application_id, config_path, strict=True, default=None):
# Path to the schema file for this application
schema_path = os.path.join('roles', application_id, 'schema', 'main.yml')
def schema_defines(path):
if not os.path.isfile(schema_path):
return False
with open(schema_path) as f:
schema = yaml.safe_load(f) or {}
node = schema
for part in path.split('.'):
key_match = re.match(r"^([a-zA-Z0-9_-]+)", part)
if not key_match:
return False
k = key_match.group(1)
if isinstance(node, dict) and k in node:
node = node[k]
else:
return False
return True
def access(obj, key, path_trace): def access(obj, key, path_trace):
# Match either 'key' or 'key[index]'
m = re.match(r"^([a-zA-Z0-9_-]+)(?:\[(\d+)\])?$", key) m = re.match(r"^([a-zA-Z0-9_-]+)(?:\[(\d+)\])?$", key)
if not m: if not m:
raise AppConfigKeyError( raise AppConfigKeyError(
@ -21,53 +49,66 @@ def get_app_conf(applications, application_id, config_path, strict=True, default
f"config_path: {config_path}" f"config_path: {config_path}"
) )
k, idx = m.group(1), m.group(2) k, idx = m.group(1), m.group(2)
# Access dict key
if isinstance(obj, dict): if isinstance(obj, dict):
if k not in obj: if k not in obj:
if strict: # Non-strict mode: always return default on missing key
raise AppConfigKeyError( if not strict:
f"Key '{k}' not found in dict at '{key}'\n" return default if default is not None else False
f"Full path so far: {'.'.join(path_trace)}\n" # Schema-defined but unset: strict raises ConfigEntryNotSetError
f"Current object: {repr(obj)}\n" trace_path = '.'.join(path_trace[1:])
f"application_id: {application_id}\n" if schema_defines(trace_path):
f"config_path: {config_path}" raise ConfigEntryNotSetError(
f"Config entry '{trace_path}' is defined in schema at '{schema_path}' but not set in application '{application_id}'."
) )
return default if default is not None else False # Generic missing-key error
obj = obj[k]
else:
if strict:
raise AppConfigKeyError( raise AppConfigKeyError(
f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n" f"Key '{k}' not found in dict at '{key}'\n"
f"Full path so far: {'.'.join(path_trace)}\n" f"Full path so far: {'.'.join(path_trace)}\n"
f"Current object: {repr(obj)}\n" f"Current object: {repr(obj)}\n"
f"application_id: {application_id}\n" f"application_id: {application_id}\n"
f"config_path: {config_path}" f"config_path: {config_path}"
) )
return default if default is not None else False obj = obj[k]
else:
if not strict:
return default if default is not None else False
raise AppConfigKeyError(
f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n"
f"Full path so far: {'.'.join(path_trace)}\n"
f"Current object: {repr(obj)}\n"
f"application_id: {application_id}\n"
f"config_path: {config_path}"
)
# If index was provided, access list element
if idx is not None: if idx is not None:
if not isinstance(obj, list): if not isinstance(obj, list):
if strict: if not strict:
raise AppConfigKeyError( return default if default is not None else False
f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n" raise AppConfigKeyError(
f"Full path so far: {'.'.join(path_trace)}\n" f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n"
f"Current object: {repr(obj)}\n" f"Full path so far: {'.'.join(path_trace)}\n"
f"application_id: {application_id}\n" f"Current object: {repr(obj)}\n"
f"config_path: {config_path}" f"application_id: {application_id}\n"
) f"config_path: {config_path}"
return default if default is not None else False )
i = int(idx) i = int(idx)
if i >= len(obj): if i >= len(obj):
if strict: if not strict:
raise AppConfigKeyError( return default if default is not None else False
f"Index {i} out of range for list at '{k}'\n" raise AppConfigKeyError(
f"Full path so far: {'.'.join(path_trace)}\n" f"Index {i} out of range for list at '{k}'\n"
f"Current object: {repr(obj)}\n" f"Full path so far: {'.'.join(path_trace)}\n"
f"application_id: {application_id}\n" f"Current object: {repr(obj)}\n"
f"config_path: {config_path}" f"application_id: {application_id}\n"
) f"config_path: {config_path}"
return default if default is not None else False )
obj = obj[i] obj = obj[i]
return obj return obj
# Begin traversal
path_trace = [f"applications[{repr(application_id)}]"] path_trace = [f"applications[{repr(application_id)}]"]
try: try:
obj = applications[application_id] obj = applications[application_id]
@ -79,7 +120,7 @@ def get_app_conf(applications, application_id, config_path, strict=True, default
f"config_path: {config_path}" f"config_path: {config_path}"
) )
for part in config_path.split("."): for part in config_path.split('.'):
path_trace.append(part) path_trace.append(part)
obj = access(obj, part, path_trace) obj = access(obj, part, path_trace)
if obj is False and not strict: if obj is False and not strict:

View File

@ -9,6 +9,8 @@ import subprocess
class TestGenerateDefaultApplications(unittest.TestCase): class TestGenerateDefaultApplications(unittest.TestCase):
def setUp(self): def setUp(self):
# Path to the generator script under test
self.script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "applications.py"
# Create temp role structure # Create temp role structure
self.temp_dir = Path(tempfile.mkdtemp()) self.temp_dir = Path(tempfile.mkdtemp())
self.roles_dir = self.temp_dir / "roles" self.roles_dir = self.temp_dir / "roles"