From 882cf47c20a86b25e85dcd7b76a1ee0b0b3f7aca Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Sun, 13 Jul 2025 16:58:33 +0200 Subject: [PATCH] Added credentials testing --- tests/integration/test_get_app_conf_paths.py | 120 ++++++++++--------- 1 file changed, 63 insertions(+), 57 deletions(-) diff --git a/tests/integration/test_get_app_conf_paths.py b/tests/integration/test_get_app_conf_paths.py index fc839208..1c878343 100644 --- a/tests/integration/test_get_app_conf_paths.py +++ b/tests/integration/test_get_app_conf_paths.py @@ -4,6 +4,8 @@ import unittest from pathlib import Path import yaml # requires PyYAML +# Import the get_role filter directly +from filter_plugins.get_role import get_role class TestGetAppConfPaths(unittest.TestCase): @@ -11,6 +13,7 @@ class TestGetAppConfPaths(unittest.TestCase): def setUpClass(cls): # Locate project root and load YAML configs root = Path(__file__).resolve().parents[2] + cls.root = root cls.app_config_path = root / "group_vars" / "all" / "04_applications.yml" with cls.app_config_path.open(encoding="utf-8") as f: app_cfg = yaml.safe_load(f) @@ -22,7 +25,7 @@ class TestGetAppConfPaths(unittest.TestCase): cls.defaults_users = user_cfg.get("default_users", {}) # Regex to match get_app_conf(application_id, 'some.path', ...) - pattern = re.compile( + cls.pattern = re.compile( r"get_app_conf\(\s*([^\),]+)\s*,\s*['\"]([^'\"]+)['\"]\s*,\s*[^\)]*\)" ) @@ -30,7 +33,7 @@ class TestGetAppConfPaths(unittest.TestCase): cls.literal_paths_by_id = {} cls.variable_paths = {} - # Recursively search all files (excluding tests/ directories) + # Scan all files except tests/ for dirpath, dirs, files in os.walk(root): if "tests" in Path(dirpath).parts: continue @@ -40,7 +43,7 @@ class TestGetAppConfPaths(unittest.TestCase): text = file_path.read_text(encoding="utf-8") except (UnicodeDecodeError, PermissionError): continue - for m in pattern.finditer(text): + for m in cls.pattern.finditer(text): lineno = text.count("\n", 0, m.start()) + 1 app_arg = m.group(1).strip() path_arg = m.group(2).strip() @@ -57,34 +60,35 @@ class TestGetAppConfPaths(unittest.TestCase): ) def _assert_nested_key(self, mapping, dotted_path, context): - """ - Helper: assert that mapping contains the nested dict path defined by dotted_path. - """ keys = dotted_path.split('.') current = mapping for key in keys: - self.assertIsInstance( - current, dict, - f"Expected a dict at '{'.'.join(keys[:keys.index(key)])}' in {context}" - ) - self.assertIn( - key, current, - f"Missing key '{key}' in path '{dotted_path}' under {context}" - ) + self.assertIsInstance(current, dict, + f"Expected dict at '{'.'.join(keys[:keys.index(key)])}' in {context}") + self.assertIn(key, current, + f"Missing '{key}' in path '{dotted_path}' under {context}") current = current[key] + def _validate_credentials(self, application_id, key, occs): + # Delegate to get_role filter to find role name + role = get_role(application_id, roles_path=str(self.root / 'roles')) + schema_file = self.root / 'roles' / role / 'schema' / 'main.yml' + self.assertTrue(schema_file.is_file(), f"Schema file not found: {schema_file}") + with schema_file.open(encoding="utf-8") as sf: + schema = yaml.safe_load(sf) or {} + creds = schema.get('credentials') + self.assertIsInstance(creds, dict, + f"'credentials' missing or not dict in {schema_file}") + self.assertIn(key, creds, + f"Missing credential '{key}' in {schema_file} for role '{role}'") + def _validate_path(self, app_id, dotted_path, occs): - """ - Validate that dotted_path exists under defaults_applications, - or if path starts with 'users.' and not found there, under default_users. - """ entry_app = self.defaults_app.get(app_id, {}) try: - # First, try in application defaults self._assert_nested_key(entry_app, dotted_path, app_id) return except AssertionError: - # Only if path begins with users., check default_users next + # users.* fallback if dotted_path.startswith('users.'): subpath = dotted_path.split('.', 1)[1] try: @@ -92,54 +96,59 @@ class TestGetAppConfPaths(unittest.TestCase): return except AssertionError: pass - # If still not found, fail with original context + # credentials.* fallback via get_role + if dotted_path.startswith('credentials.'): + key = dotted_path.split('.', 1)[1] + self._validate_credentials(app_id, key, occs) + return + # images.* only require top-level images dict + if dotted_path.startswith('images.'): + if any('images' in cfg and isinstance(cfg['images'], dict) + for cfg in self.defaults_app.values()): + return file_path, lineno = occs[0] - self.fail( - f"'{dotted_path}' not found for application '{app_id}' nor in default_users; " - f"called at {file_path}:{lineno}" - ) + self.fail(f"'{dotted_path}' not found for '{app_id}'; called at {file_path}:{lineno}") - def test_literal_ids_have_all_requested_paths(self): - """Each literal application_id used must have entries for each requested path.""" + def test_literal_paths(self): for app_id, paths in sorted(self.literal_paths_by_id.items()): - with self.subTest(application_id=app_id): - self.assertIn( - app_id, self.defaults_app, - f"Application '{app_id}' not found under defaults_applications in {self.app_config_path}" - ) + with self.subTest(app_id=app_id): + self.assertIn(app_id, self.defaults_app, + f"App '{app_id}' not in defaults_applications") for dotted_path, occs in sorted(paths.items()): with self.subTest(path=dotted_path): self._validate_path(app_id, dotted_path, occs) - def test_variable_ids_require_some_app_for_each_path(self): - """ - If dynamic application_id is used, at least one application or default_users - must define each dynamic path. - """ + def test_variable_paths(self): if not self.variable_paths: - self.skipTest("No dynamic get_app_conf calls found.") + self.skipTest("No dynamic calls found.") for dotted_path, occs in sorted(self.variable_paths.items()): - with self.subTest(dynamic_path=dotted_path): + with self.subTest(path=dotted_path): valid = False - # Special: images.* dynamic paths only require 'images' to exist + # credentials.* dynamic + if dotted_path.startswith('credentials.'): + key = dotted_path.split('.',1)[1] + for aid in self.defaults_app: + try: + self._validate_credentials(aid, key, occs) + valid = True; break + except AssertionError: + pass + if valid: continue + # images.* dynamic if dotted_path.startswith('images.'): - for aid, cfg in self.defaults_app.items(): - if 'images' in cfg and isinstance(cfg['images'], dict): - valid = True - break - if valid: - continue # skip deeper check for this path - # First, check each application default for full path + if any('images' in cfg and isinstance(cfg['images'], dict) + for cfg in self.defaults_app.values()): + continue + # check app defaults for aid, cfg in self.defaults_app.items(): try: self._assert_nested_key(cfg, dotted_path, aid) - valid = True - break + valid = True; break except AssertionError: - continue - # If not found in any application, and path startswith 'users.', try default_users + pass + # users.* fallback if not valid and dotted_path.startswith('users.'): - subpath = dotted_path.split('.', 1)[1] + subpath = dotted_path.split('.',1)[1] try: self._assert_nested_key(self.defaults_users, subpath, 'default_users') valid = True @@ -147,11 +156,8 @@ class TestGetAppConfPaths(unittest.TestCase): pass if not valid: file_path, lineno = occs[0] - self.fail( - f"No entry defines '{dotted_path}' in defaults_applications or default_users; " - f"called at {file_path}:{lineno}" - ) + self.fail(f"No entry for '{dotted_path}'; called at {file_path}:{lineno}") if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main()