diff --git a/filter_plugins/get_app_conf.py b/filter_plugins/get_app_conf.py index cd55d5e9..9cbd5c3c 100644 --- a/filter_plugins/get_app_conf.py +++ b/filter_plugins/get_app_conf.py @@ -1,151 +1,6 @@ -import os -import re -import yaml -from ansible.errors import AnsibleFilterError -from collections.abc import Mapping - -from ansible.errors import AnsibleUndefinedVariable -try: - from ansible.utils.unsafe_proxy import AnsibleUndefined -except ImportError: - class AnsibleUndefined: pass - -class AppConfigKeyError(AnsibleFilterError, ValueError): - """ - Raised when a required application config key is missing (strict mode). - Compatible with Ansible error handling and Python ValueError. - """ - pass - -class ConfigEntryNotSetError(AppConfigKeyError): - """ - Raised when a config entry is defined in schema but not set in application. - """ - pass - - -def get_app_conf(applications, application_id, config_path, strict=True, default=None): - # Path to the schema file for this application - schema_path = os.path.join('roles', application_id, 'schema', 'main.yml') - - def schema_defines(path): - if not os.path.isfile(schema_path): - return False - with open(schema_path) as f: - schema = yaml.safe_load(f) or {} - node = schema - for part in path.split('.'): - key_match = re.match(r"^([a-zA-Z0-9_-]+)", part) - if not key_match: - return False - k = key_match.group(1) - if isinstance(node, dict) and k in node: - node = node[k] - else: - return False - return True - - def access(obj, key, path_trace): - # Match either 'key' or 'key[index]' - m = re.match(r"^([a-zA-Z0-9_-]+)(?:\[(\d+)\])?$", key) - if not m: - raise AppConfigKeyError( - f"Invalid key format in config_path: '{key}'\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - k, idx = m.group(1), m.group(2) - - if (hasattr(obj, '__class__') and obj.__class__.__name__ == 'AnsibleUndefined') \ - or isinstance(obj, AnsibleUndefinedVariable): - if not strict: - return default if default is not None else False - raise AppConfigKeyError( - f"Key '{k}' is undefined at '{'.'.join(path_trace)}'\n" - f" actual type: {type(obj).__name__}\n" - f" repr(obj): {obj!r}\n" - f" repr(applications): {applications!r}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - - # Access dict key - if isinstance(obj, Mapping): - if k not in obj: - # Non-strict mode: always return default on missing key - if not strict: - return default if default is not None else False - # Schema-defined but unset: strict raises ConfigEntryNotSetError - trace_path = '.'.join(path_trace[1:]) - if schema_defines(trace_path): - raise ConfigEntryNotSetError( - f"Config entry '{trace_path}' is defined in schema at '{schema_path}' but not set in application '{application_id}'." - ) - # Generic missing-key error - raise AppConfigKeyError( - f"Key '{k}' not found in dict at '{key}'\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"Current object: {repr(obj)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - obj = obj[k] - else: - if not strict: - return default if default is not None else False - raise AppConfigKeyError( - f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"Current object: {repr(obj)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - - # If index was provided, access list element - if idx is not None: - if not isinstance(obj, list): - if not strict: - return default if default is not None else False - raise AppConfigKeyError( - f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"Current object: {repr(obj)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - i = int(idx) - if i >= len(obj): - if not strict: - return default if default is not None else False - raise AppConfigKeyError( - f"Index {i} out of range for list at '{k}'\n" - f"Full path so far: {'.'.join(path_trace)}\n" - f"Current object: {repr(obj)}\n" - f"application_id: {application_id}\n" - f"config_path: {config_path}" - ) - obj = obj[i] - return obj - - # Begin traversal - path_trace = [f"applications[{repr(application_id)}]"] - try: - obj = applications[application_id] - except KeyError: - raise AppConfigKeyError( - f"Application ID '{application_id}' not found in applications dict.\n" - f"path_trace: {path_trace}\n" - f"applications keys: {list(applications.keys())}\n" - f"config_path: {config_path}" - ) - - for part in config_path.split('.'): - path_trace.append(part) - obj = access(obj, part, path_trace) - if obj is False and not strict: - return default if default is not None else False - return obj +import sys, os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +from module_utils.config_utils import get_app_conf, AppConfigKeyError,ConfigEntryNotSetError class FilterModule(object): ''' CyMaIS application config extraction filters ''' diff --git a/module_utils/config_utils.py b/module_utils/config_utils.py new file mode 100644 index 00000000..c47147df --- /dev/null +++ b/module_utils/config_utils.py @@ -0,0 +1,149 @@ +import os +import re +import yaml +from ansible.errors import AnsibleFilterError +from collections.abc import Mapping + +from ansible.errors import AnsibleUndefinedVariable +try: + from ansible.utils.unsafe_proxy import AnsibleUndefined +except ImportError: + class AnsibleUndefined: pass + +class AppConfigKeyError(AnsibleFilterError, ValueError): + """ + Raised when a required application config key is missing (strict mode). + Compatible with Ansible error handling and Python ValueError. + """ + pass + +class ConfigEntryNotSetError(AppConfigKeyError): + """ + Raised when a config entry is defined in schema but not set in application. + """ + pass + + +def get_app_conf(applications, application_id, config_path, strict=True, default=None): + # Path to the schema file for this application + schema_path = os.path.join('roles', application_id, 'schema', 'main.yml') + + def schema_defines(path): + if not os.path.isfile(schema_path): + return False + with open(schema_path) as f: + schema = yaml.safe_load(f) or {} + node = schema + for part in path.split('.'): + key_match = re.match(r"^([a-zA-Z0-9_-]+)", part) + if not key_match: + return False + k = key_match.group(1) + if isinstance(node, dict) and k in node: + node = node[k] + else: + return False + return True + + def access(obj, key, path_trace): + # Match either 'key' or 'key[index]' + m = re.match(r"^([a-zA-Z0-9_-]+)(?:\[(\d+)\])?$", key) + if not m: + raise AppConfigKeyError( + f"Invalid key format in config_path: '{key}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + k, idx = m.group(1), m.group(2) + + if (hasattr(obj, '__class__') and obj.__class__.__name__ == 'AnsibleUndefined') \ + or isinstance(obj, AnsibleUndefinedVariable): + if not strict: + return default if default is not None else False + raise AppConfigKeyError( + f"Key '{k}' is undefined at '{'.'.join(path_trace)}'\n" + f" actual type: {type(obj).__name__}\n" + f" repr(obj): {obj!r}\n" + f" repr(applications): {applications!r}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + + # Access dict key + if isinstance(obj, Mapping): + if k not in obj: + # Non-strict mode: always return default on missing key + if not strict: + return default if default is not None else False + # Schema-defined but unset: strict raises ConfigEntryNotSetError + trace_path = '.'.join(path_trace[1:]) + if schema_defines(trace_path): + raise ConfigEntryNotSetError( + f"Config entry '{trace_path}' is defined in schema at '{schema_path}' but not set in application '{application_id}'." + ) + # Generic missing-key error + raise AppConfigKeyError( + f"Key '{k}' not found in dict at '{key}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + obj = obj[k] + else: + if not strict: + return default if default is not None else False + raise AppConfigKeyError( + f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + + # If index was provided, access list element + if idx is not None: + if not isinstance(obj, list): + if not strict: + return default if default is not None else False + raise AppConfigKeyError( + f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + i = int(idx) + if i >= len(obj): + if not strict: + return default if default is not None else False + raise AppConfigKeyError( + f"Index {i} out of range for list at '{k}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + obj = obj[i] + return obj + + # Begin traversal + path_trace = [f"applications[{repr(application_id)}]"] + try: + obj = applications[application_id] + except KeyError: + raise AppConfigKeyError( + f"Application ID '{application_id}' not found in applications dict.\n" + f"path_trace: {path_trace}\n" + f"applications keys: {list(applications.keys())}\n" + f"config_path: {config_path}" + ) + + for part in config_path.split('.'): + path_trace.append(part) + obj = access(obj, part, path_trace) + if obj is False and not strict: + return default if default is not None else False + return obj +