Implemented new get_app_conf function

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-13 13:36:52 +02:00
parent a18e888044
commit c8669e19cf
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
5 changed files with 227 additions and 20 deletions

View File

@ -1,9 +1,9 @@
from get_app_conf import get_app_conf
def is_feature_enabled(applications: dict, feature: str, application_id: str) -> bool:
"""
Return True if applications[application_id].features[feature] is truthy.
Wrapper for compatibility: Return True if applications[application_id].features[feature] is truthy.
"""
app = applications.get(application_id, {})
return bool(app.get('features', {}).get(feature, False))
return bool(get_app_conf(applications, application_id, f"features.{feature}", strict=False))
def get_docker_compose(path_docker_compose_instances: str, application_id: str) -> dict:
"""

View File

@ -1,15 +1,13 @@
def get_docker_image(applications, application_id, image_key:str=None):
from get_app_conf import get_app_conf
def get_docker_image(applications, application_id, image_key: str = None):
"""
Wrapper for compatibility: Compose the docker image:version string.
Raises error if value missing, like before.
"""
image_key = image_key if image_key else application_id
docker = applications.get(application_id, {}).get("docker", {})
version = docker.get("versions", {}).get(image_key)
image = docker.get("images", {}).get(image_key)
if not image:
raise ValueError(f"Missing image for {application_id}:{image_key}")
if not version:
raise ValueError(f"Missing version for {application_id}:{image_key}")
image = get_app_conf(applications, application_id, f"docker.images.{image_key}", strict=True)
version = get_app_conf(applications, application_id, f"docker.versions.{image_key}", strict=True)
return f"{image}:{version}"
class FilterModule(object):

View File

@ -0,0 +1,96 @@
# filter_plugins/get_app_conf.py
import re
from ansible.errors import AnsibleFilterError
class AppConfigKeyError(AnsibleFilterError, ValueError):
"""
Raised when a required application config key is missing (strict mode).
Compatible with Ansible error handling and Python ValueError.
"""
pass
def get_app_conf(applications, application_id, config_path, strict=True):
def access(obj, key, path_trace):
m = re.match(r"^([a-zA-Z0-9_]+)(?:\[(\d+)\])?$", key)
if not m:
raise AppConfigKeyError(
f"Invalid key format in config_path: '{key}'\n"
f"Full path so far: {'.'.join(path_trace)}\n"
f"application_id: {application_id}\n"
f"config_path: {config_path}"
)
k, idx = m.group(1), m.group(2)
if isinstance(obj, dict):
if k not in obj:
if strict:
raise AppConfigKeyError(
f"Key '{k}' not found in dict at '{key}'\n"
f"Full path so far: {'.'.join(path_trace)}\n"
f"Current object: {repr(obj)}\n"
f"application_id: {application_id}\n"
f"config_path: {config_path}"
)
return False
obj = obj[k]
else:
if strict:
raise AppConfigKeyError(
f"Expected dict for '{k}', got {type(obj).__name__} at '{key}'\n"
f"Full path so far: {'.'.join(path_trace)}\n"
f"Current object: {repr(obj)}\n"
f"application_id: {application_id}\n"
f"config_path: {config_path}"
)
return False
if idx is not None:
if not isinstance(obj, list):
if strict:
raise AppConfigKeyError(
f"Expected list for '{k}[{idx}]', got {type(obj).__name__}\n"
f"Full path so far: {'.'.join(path_trace)}\n"
f"Current object: {repr(obj)}\n"
f"application_id: {application_id}\n"
f"config_path: {config_path}"
)
return False
i = int(idx)
if i >= len(obj):
if strict:
raise AppConfigKeyError(
f"Index {i} out of range for list at '{k}'\n"
f"Full path so far: {'.'.join(path_trace)}\n"
f"Current object: {repr(obj)}\n"
f"application_id: {application_id}\n"
f"config_path: {config_path}"
)
return False
obj = obj[i]
return obj
path_trace = [f"applications[{repr(application_id)}]"]
try:
obj = applications[application_id]
except KeyError:
if strict:
raise AppConfigKeyError(
f"Application ID '{application_id}' not found in applications dict.\n"
f"path_trace: {path_trace}\n"
f"applications keys: {list(applications.keys())}\n"
f"config_path: {config_path}"
)
return False
for part in config_path.split("."):
path_trace.append(part)
obj = access(obj, part, path_trace)
if obj is False and not strict:
return False
return obj
class FilterModule(object):
''' CyMaIS application config extraction filters '''
def filters(self):
return {
'get_app_conf': get_app_conf,
}

View File

@ -5,7 +5,6 @@ import unittest
from cli.meta.applications.all import find_application_ids
# ensure project root is on PYTHONPATH so we can import the CLI code
# project root is two levels up from this file (tests/integration -> project root)
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
sys.path.insert(0, ROOT)
@ -22,8 +21,9 @@ class TestValidApplicationUsage(unittest.TestCase):
APPLICATION_ATTR_RE = re.compile(r"applications\.(?P<name>[A-Za-z_]\w*)")
APPLICATION_DOMAIN_RE = re.compile(r"get_domain\(\s*['\"](?P<name>[^'\"]+)['\"]\s*\)")
# methods and exceptions that should not be validated as application IDs
WHITELIST = {'items', 'yml', 'get'}
# default methods and exceptions that should not be validated as application IDs
DEFAULT_WHITELIST = {'items', 'yml', 'get'}
PYTHON_EXTRA_WHITELIST = {'keys'}
def test_application_references_use_valid_ids(self):
valid_apps = find_application_ids()
@ -45,14 +45,18 @@ class TestValidApplicationUsage(unittest.TestCase):
# skip files that cannot be opened
continue
# Whitelist je nach Dateityp erweitern
if filename.endswith('.py'):
whitelist = self.DEFAULT_WHITELIST | self.PYTHON_EXTRA_WHITELIST
else:
whitelist = self.DEFAULT_WHITELIST
for pattern in (
self.APPLICATION_SUBSCRIPT_RE,
self.APPLICATION_GET_RE,
self.APPLICATION_ATTR_RE,
self.APPLICATION_DOMAIN_RE,
):
for match in pattern.finditer(content):
name = match.group('name')
for match in pattern.finditer(content):
# Determine the full line containing this match
start = match.start()
@ -66,7 +70,7 @@ class TestValidApplicationUsage(unittest.TestCase):
name = match.group('name')
# skip whitelisted methods/exceptions
if name in self.WHITELIST:
if name in whitelist:
continue
# each found reference must be in valid_apps
self.assertIn(

View File

@ -0,0 +1,109 @@
import unittest
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins')))
from get_app_conf import get_app_conf
from ansible.errors import AnsibleFilterError
class TestGetAppConf(unittest.TestCase):
"""
Unit tests for the get_app_conf filter plugin function.
Tests both strict and non-strict mode for positive and negative cases.
"""
def setUp(self):
"""Set up a sample applications dict for all tests."""
self.applications = {
"myapp": {
"features": {
"foo": True,
"bar": False,
},
"docker": {
"images": {
"myapp": "repo/myapp"
},
"versions": {
"myapp": "1.2.3"
}
}
}
}
def test_feature_enabled_strict_true_positive(self):
"""Test feature enabled (strict, present): should return True."""
result = get_app_conf(self.applications, "myapp", "features.foo", strict=True)
self.assertTrue(result)
def test_feature_enabled_strict_true_negative(self):
"""Test feature enabled (strict, missing): should raise AnsibleFilterError."""
with self.assertRaises(AnsibleFilterError) as ctx:
get_app_conf(self.applications, "myapp", "features.baz", strict=True)
self.assertIn("features.baz", str(ctx.exception))
def test_feature_enabled_strict_false_positive(self):
"""Test feature enabled (non-strict, present): should return True."""
result = get_app_conf(self.applications, "myapp", "features.foo", strict=False)
self.assertTrue(result)
def test_feature_enabled_strict_false_negative(self):
"""Test feature enabled (non-strict, missing): should return False, not error."""
result = get_app_conf(self.applications, "myapp", "features.baz", strict=False)
self.assertFalse(result)
def test_docker_image_strict_true_positive(self):
"""Test docker image (strict, present): should return image name."""
result = get_app_conf(self.applications, "myapp", "docker.images.myapp", strict=True)
self.assertEqual(result, "repo/myapp")
def test_docker_image_strict_true_negative(self):
"""Test docker image (strict, missing): should raise AnsibleFilterError."""
with self.assertRaises(AnsibleFilterError) as ctx:
get_app_conf(self.applications, "myapp", "docker.images.unknown", strict=True)
self.assertIn("docker.images.unknown", str(ctx.exception))
def test_docker_image_strict_false_positive(self):
"""Test docker image (non-strict, present): should return image name."""
result = get_app_conf(self.applications, "myapp", "docker.images.myapp", strict=False)
self.assertEqual(result, "repo/myapp")
def test_docker_image_strict_false_negative(self):
"""Test docker image (non-strict, missing): should return False, not error."""
result = get_app_conf(self.applications, "myapp", "docker.images.unknown", strict=False)
self.assertFalse(result)
def test_list_indexing_positive(self):
"""Test access of list index, present."""
apps = {
"app": {"foo": [{"bar": "x"}, {"bar": "y"}]}
}
result = get_app_conf(apps, "app", "foo[1].bar", strict=True)
self.assertEqual(result, "y")
def test_list_indexing_negative_strict_false(self):
"""Test access of list index, missing (non-strict): should return False."""
apps = {
"app": {"foo": [{"bar": "x"}]}
}
result = get_app_conf(apps, "app", "foo[1].bar", strict=False)
self.assertFalse(result)
def test_list_indexing_negative_strict_true(self):
"""Test access of list index, missing (strict): should raise error."""
apps = {
"app": {"foo": [{"bar": "x"}]}
}
with self.assertRaises(AnsibleFilterError):
get_app_conf(apps, "app", "foo[1].bar", strict=True)
def test_application_id_not_found(self):
"""Test with unknown application_id: should raise error in strict mode."""
with self.assertRaises(AnsibleFilterError):
get_app_conf(self.applications, "unknown", "features.foo", strict=True)
# Non-strict: returns False
self.assertFalse(get_app_conf(self.applications, "unknown", "features.foo", strict=False))
if __name__ == '__main__':
unittest.main()