From c8669e19cfeb9b37eefcd03e695f451f2059ab61 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Sun, 13 Jul 2025 13:36:52 +0200 Subject: [PATCH] Implemented new get_app_conf function --- filter_plugins/configuration_filters.py | 6 +- filter_plugins/docker_image.py | 20 ++-- filter_plugins/get_app_conf.py | 96 +++++++++++++++ tests/integration/test_valid_applications.py | 16 ++- .../unit/filter_plugins/test_get_app_conf.py | 109 ++++++++++++++++++ 5 files changed, 227 insertions(+), 20 deletions(-) create mode 100644 filter_plugins/get_app_conf.py create mode 100644 tests/unit/filter_plugins/test_get_app_conf.py diff --git a/filter_plugins/configuration_filters.py b/filter_plugins/configuration_filters.py index 504750ea..6986b717 100644 --- a/filter_plugins/configuration_filters.py +++ b/filter_plugins/configuration_filters.py @@ -1,9 +1,9 @@ +from get_app_conf import get_app_conf def is_feature_enabled(applications: dict, feature: str, application_id: str) -> bool: """ - Return True if applications[application_id].features[feature] is truthy. + Wrapper for compatibility: Return True if applications[application_id].features[feature] is truthy. """ - app = applications.get(application_id, {}) - return bool(app.get('features', {}).get(feature, False)) + return bool(get_app_conf(applications, application_id, f"features.{feature}", strict=False)) def get_docker_compose(path_docker_compose_instances: str, application_id: str) -> dict: """ diff --git a/filter_plugins/docker_image.py b/filter_plugins/docker_image.py index 74ae5165..014e7327 100644 --- a/filter_plugins/docker_image.py +++ b/filter_plugins/docker_image.py @@ -1,15 +1,13 @@ -def get_docker_image(applications, application_id, image_key:str=None): +from get_app_conf import get_app_conf + +def get_docker_image(applications, application_id, image_key: str = None): + """ + Wrapper for compatibility: Compose the docker image:version string. + Raises error if value missing, like before. + """ image_key = image_key if image_key else application_id - docker = applications.get(application_id, {}).get("docker", {}) - version = docker.get("versions", {}).get(image_key) - image = docker.get("images", {}).get(image_key) - - if not image: - raise ValueError(f"Missing image for {application_id}:{image_key}") - - if not version: - raise ValueError(f"Missing version for {application_id}:{image_key}") - + image = get_app_conf(applications, application_id, f"docker.images.{image_key}", strict=True) + version = get_app_conf(applications, application_id, f"docker.versions.{image_key}", strict=True) return f"{image}:{version}" class FilterModule(object): diff --git a/filter_plugins/get_app_conf.py b/filter_plugins/get_app_conf.py new file mode 100644 index 00000000..7735510c --- /dev/null +++ b/filter_plugins/get_app_conf.py @@ -0,0 +1,96 @@ +# filter_plugins/get_app_conf.py + +import re +from ansible.errors import AnsibleFilterError + +class AppConfigKeyError(AnsibleFilterError, ValueError): + """ + Raised when a required application config key is missing (strict mode). + Compatible with Ansible error handling and Python ValueError. + """ + pass + +def get_app_conf(applications, application_id, config_path, strict=True): + def access(obj, key, path_trace): + m = re.match(r"^([a-zA-Z0-9_]+)(?:\[(\d+)\])?$", key) + if not m: + raise AppConfigKeyError( + f"Invalid key format in config_path: '{key}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + k, idx = m.group(1), m.group(2) + if isinstance(obj, dict): + if k not in obj: + if strict: + raise AppConfigKeyError( + f"Key '{k}' not found in dict at '{key}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + return False + obj = obj[k] + else: + if strict: + raise AppConfigKeyError( + f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + return False + if idx is not None: + if not isinstance(obj, list): + if strict: + raise AppConfigKeyError( + f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + return False + i = int(idx) + if i >= len(obj): + if strict: + raise AppConfigKeyError( + f"Index {i} out of range for list at '{k}'\n" + f"Full path so far: {'.'.join(path_trace)}\n" + f"Current object: {repr(obj)}\n" + f"application_id: {application_id}\n" + f"config_path: {config_path}" + ) + return False + obj = obj[i] + return obj + + path_trace = [f"applications[{repr(application_id)}]"] + try: + obj = applications[application_id] + except KeyError: + if strict: + raise AppConfigKeyError( + f"Application ID '{application_id}' not found in applications dict.\n" + f"path_trace: {path_trace}\n" + f"applications keys: {list(applications.keys())}\n" + f"config_path: {config_path}" + ) + return False + + for part in config_path.split("."): + path_trace.append(part) + obj = access(obj, part, path_trace) + if obj is False and not strict: + return False + return obj + +class FilterModule(object): + ''' CyMaIS application config extraction filters ''' + def filters(self): + return { + 'get_app_conf': get_app_conf, + } diff --git a/tests/integration/test_valid_applications.py b/tests/integration/test_valid_applications.py index 4d150f0a..f5a91305 100644 --- a/tests/integration/test_valid_applications.py +++ b/tests/integration/test_valid_applications.py @@ -5,7 +5,6 @@ import unittest from cli.meta.applications.all import find_application_ids # ensure project root is on PYTHONPATH so we can import the CLI code -# project root is two levels up from this file (tests/integration -> project root) ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, ROOT) @@ -22,8 +21,9 @@ class TestValidApplicationUsage(unittest.TestCase): APPLICATION_ATTR_RE = re.compile(r"applications\.(?P[A-Za-z_]\w*)") APPLICATION_DOMAIN_RE = re.compile(r"get_domain\(\s*['\"](?P[^'\"]+)['\"]\s*\)") - # methods and exceptions that should not be validated as application IDs - WHITELIST = {'items', 'yml', 'get'} + # default methods and exceptions that should not be validated as application IDs + DEFAULT_WHITELIST = {'items', 'yml', 'get'} + PYTHON_EXTRA_WHITELIST = {'keys'} def test_application_references_use_valid_ids(self): valid_apps = find_application_ids() @@ -45,14 +45,18 @@ class TestValidApplicationUsage(unittest.TestCase): # skip files that cannot be opened continue + # Whitelist je nach Dateityp erweitern + if filename.endswith('.py'): + whitelist = self.DEFAULT_WHITELIST | self.PYTHON_EXTRA_WHITELIST + else: + whitelist = self.DEFAULT_WHITELIST + for pattern in ( self.APPLICATION_SUBSCRIPT_RE, self.APPLICATION_GET_RE, self.APPLICATION_ATTR_RE, self.APPLICATION_DOMAIN_RE, ): - for match in pattern.finditer(content): - name = match.group('name') for match in pattern.finditer(content): # Determine the full line containing this match start = match.start() @@ -66,7 +70,7 @@ class TestValidApplicationUsage(unittest.TestCase): name = match.group('name') # skip whitelisted methods/exceptions - if name in self.WHITELIST: + if name in whitelist: continue # each found reference must be in valid_apps self.assertIn( diff --git a/tests/unit/filter_plugins/test_get_app_conf.py b/tests/unit/filter_plugins/test_get_app_conf.py new file mode 100644 index 00000000..3536d3c8 --- /dev/null +++ b/tests/unit/filter_plugins/test_get_app_conf.py @@ -0,0 +1,109 @@ +import unittest +import sys +import os + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins'))) + +from get_app_conf import get_app_conf +from ansible.errors import AnsibleFilterError + +class TestGetAppConf(unittest.TestCase): + """ + Unit tests for the get_app_conf filter plugin function. + Tests both strict and non-strict mode for positive and negative cases. + """ + + def setUp(self): + """Set up a sample applications dict for all tests.""" + self.applications = { + "myapp": { + "features": { + "foo": True, + "bar": False, + }, + "docker": { + "images": { + "myapp": "repo/myapp" + }, + "versions": { + "myapp": "1.2.3" + } + } + } + } + + def test_feature_enabled_strict_true_positive(self): + """Test feature enabled (strict, present): should return True.""" + result = get_app_conf(self.applications, "myapp", "features.foo", strict=True) + self.assertTrue(result) + + def test_feature_enabled_strict_true_negative(self): + """Test feature enabled (strict, missing): should raise AnsibleFilterError.""" + with self.assertRaises(AnsibleFilterError) as ctx: + get_app_conf(self.applications, "myapp", "features.baz", strict=True) + self.assertIn("features.baz", str(ctx.exception)) + + def test_feature_enabled_strict_false_positive(self): + """Test feature enabled (non-strict, present): should return True.""" + result = get_app_conf(self.applications, "myapp", "features.foo", strict=False) + self.assertTrue(result) + + def test_feature_enabled_strict_false_negative(self): + """Test feature enabled (non-strict, missing): should return False, not error.""" + result = get_app_conf(self.applications, "myapp", "features.baz", strict=False) + self.assertFalse(result) + + def test_docker_image_strict_true_positive(self): + """Test docker image (strict, present): should return image name.""" + result = get_app_conf(self.applications, "myapp", "docker.images.myapp", strict=True) + self.assertEqual(result, "repo/myapp") + + def test_docker_image_strict_true_negative(self): + """Test docker image (strict, missing): should raise AnsibleFilterError.""" + with self.assertRaises(AnsibleFilterError) as ctx: + get_app_conf(self.applications, "myapp", "docker.images.unknown", strict=True) + self.assertIn("docker.images.unknown", str(ctx.exception)) + + def test_docker_image_strict_false_positive(self): + """Test docker image (non-strict, present): should return image name.""" + result = get_app_conf(self.applications, "myapp", "docker.images.myapp", strict=False) + self.assertEqual(result, "repo/myapp") + + def test_docker_image_strict_false_negative(self): + """Test docker image (non-strict, missing): should return False, not error.""" + result = get_app_conf(self.applications, "myapp", "docker.images.unknown", strict=False) + self.assertFalse(result) + + def test_list_indexing_positive(self): + """Test access of list index, present.""" + apps = { + "app": {"foo": [{"bar": "x"}, {"bar": "y"}]} + } + result = get_app_conf(apps, "app", "foo[1].bar", strict=True) + self.assertEqual(result, "y") + + def test_list_indexing_negative_strict_false(self): + """Test access of list index, missing (non-strict): should return False.""" + apps = { + "app": {"foo": [{"bar": "x"}]} + } + result = get_app_conf(apps, "app", "foo[1].bar", strict=False) + self.assertFalse(result) + + def test_list_indexing_negative_strict_true(self): + """Test access of list index, missing (strict): should raise error.""" + apps = { + "app": {"foo": [{"bar": "x"}]} + } + with self.assertRaises(AnsibleFilterError): + get_app_conf(apps, "app", "foo[1].bar", strict=True) + + def test_application_id_not_found(self): + """Test with unknown application_id: should raise error in strict mode.""" + with self.assertRaises(AnsibleFilterError): + get_app_conf(self.applications, "unknown", "features.foo", strict=True) + # Non-strict: returns False + self.assertFalse(get_app_conf(self.applications, "unknown", "features.foo", strict=False)) + +if __name__ == '__main__': + unittest.main()