From c8054ffbc36613fb79cfa927efe8cc50f842e93b Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Tue, 15 Jul 2025 22:48:59 +0200 Subject: [PATCH] Optimized get paths and applications generation --- filter_plugins/get_app_conf.py | 109 ++++++++++++------ .../generate/defaults/test_applications.py | 2 + 2 files changed, 77 insertions(+), 34 deletions(-) diff --git a/filter_plugins/get_app_conf.py b/filter_plugins/get_app_conf.py index 140acde6..adfa04c6 100644 --- a/filter_plugins/get_app_conf.py +++ b/filter_plugins/get_app_conf.py @@ -1,6 +1,6 @@ -# filter_plugins/get_app_conf.py - +import os import re +import yaml from ansible.errors import AnsibleFilterError class AppConfigKeyError(AnsibleFilterError, ValueError): @@ -10,8 +10,36 @@ class AppConfigKeyError(AnsibleFilterError, ValueError): """ pass +class ConfigEntryNotSetError(AppConfigKeyError): + """ + Raised when a config entry is defined in schema but not set in application. + """ + pass + + def get_app_conf(applications, application_id, config_path, strict=True, default=None): + # Path to the schema file for this application + schema_path = os.path.join('roles', application_id, 'schema', 'main.yml') + + def schema_defines(path): + if not os.path.isfile(schema_path): + return False + with open(schema_path) as f: + schema = yaml.safe_load(f) or {} + node = schema + for part in path.split('.'): + key_match = re.match(r"^([a-zA-Z0-9_-]+)", part) + if not key_match: + return False + k = key_match.group(1) + if isinstance(node, dict) and k in node: + node = node[k] + else: + return False + return True + def access(obj, key, path_trace): + # Match either 'key' or 'key[index]' m = re.match(r"^([a-zA-Z0-9_-]+)(?:\[(\d+)\])?$", key) if not m: raise AppConfigKeyError( @@ -21,53 +49,66 @@ def get_app_conf(applications, application_id, config_path, strict=True, default f"config_path: {config_path}" ) k, idx = m.group(1), m.group(2) + + # Access dict key if isinstance(obj, dict): if k not in obj: - if strict: - raise AppConfigKeyError( - f"Key '{k}' not found in dict at '{key}'\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"Current object: {repr(obj)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" + # Non-strict mode: always return default on missing key + if not strict: + return default if default is not None else False + # Schema-defined but unset: strict raises ConfigEntryNotSetError + trace_path = '.'.join(path_trace[1:]) + if schema_defines(trace_path): + raise ConfigEntryNotSetError( + f"Config entry '{trace_path}' is defined in schema at '{schema_path}' but not set in application '{application_id}'." ) - return default if default is not None else False - obj = obj[k] - else: - if strict: + # Generic missing-key error raise AppConfigKeyError( - f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n" + f"Key '{k}' not found in dict at '{key}'\n" f"Full path so far: {'.'.join(path_trace)}\n" f"Current object: {repr(obj)}\n" f"application_id: {application_id}\n" f"config_path: {config_path}" ) - return default if default is not None else False + obj = obj[k] + else: + if not strict: + return default if default is not None else False + raise AppConfigKeyError( + f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + + # If index was provided, access list element if idx is not None: if not isinstance(obj, list): - if strict: - raise AppConfigKeyError( - f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"Current object: {repr(obj)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - return default if default is not None else False + if not strict: + return default if default is not None else False + raise AppConfigKeyError( + f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) i = int(idx) if i >= len(obj): - if strict: - raise AppConfigKeyError( - f"Index {i} out of range for list at '{k}'\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"Current object: {repr(obj)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - return default if default is not None else False + if not strict: + return default if default is not None else False + raise AppConfigKeyError( + f"Index {i} out of range for list at '{k}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) obj = obj[i] return obj + # Begin traversal path_trace = [f"applications[{repr(application_id)}]"] try: obj = applications[application_id] @@ -79,7 +120,7 @@ def get_app_conf(applications, application_id, config_path, strict=True, default f"config_path: {config_path}" ) - for part in config_path.split("."): + for part in config_path.split('.'): path_trace.append(part) obj = access(obj, part, path_trace) if obj is False and not strict: diff --git a/tests/unit/cli/generate/defaults/test_applications.py b/tests/unit/cli/generate/defaults/test_applications.py index 3418ec0f..7528fbb9 100644 --- a/tests/unit/cli/generate/defaults/test_applications.py +++ b/tests/unit/cli/generate/defaults/test_applications.py @@ -9,6 +9,8 @@ import subprocess class TestGenerateDefaultApplications(unittest.TestCase): def setUp(self): + # Path to the generator script under test + self.script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "applications.py" # Create temp role structure self.temp_dir = Path(tempfile.mkdtemp()) self.roles_dir = self.temp_dir / "roles"