diff --git a/tests/integration/test_get_app_conf_paths.py b/tests/integration/test_get_app_conf_paths.py index 1c878343..d742279a 100644 --- a/tests/integration/test_get_app_conf_paths.py +++ b/tests/integration/test_get_app_conf_paths.py @@ -4,160 +4,152 @@ import unittest from pathlib import Path import yaml # requires PyYAML -# Import the get_role filter directly from filter_plugins.get_role import get_role class TestGetAppConfPaths(unittest.TestCase): @classmethod def setUpClass(cls): - # Locate project root and load YAML configs + # Setup paths root = Path(__file__).resolve().parents[2] cls.root = root - cls.app_config_path = root / "group_vars" / "all" / "04_applications.yml" - with cls.app_config_path.open(encoding="utf-8") as f: - app_cfg = yaml.safe_load(f) - cls.defaults_app = app_cfg.get("defaults_applications", {}) + cls.app_config_path = root / 'group_vars' / 'all' / '04_applications.yml' + cls.user_config_path = root / 'group_vars' / 'all' / '03_users.yml' - cls.user_config_path = root / "group_vars" / "all" / "03_users.yml" - with cls.user_config_path.open(encoding="utf-8") as f: - user_cfg = yaml.safe_load(f) - cls.defaults_users = user_cfg.get("default_users", {}) + # Load application defaults + with cls.app_config_path.open(encoding='utf-8') as f: + app_cfg = yaml.safe_load(f) or {} + cls.defaults_app = app_cfg.get('defaults_applications', {}) - # Regex to match get_app_conf(application_id, 'some.path', ...) - cls.pattern = re.compile( - r"get_app_conf\(\s*([^\),]+)\s*,\s*['\"]([^'\"]+)['\"]\s*,\s*[^\)]*\)" - ) + # Load default users + with cls.user_config_path.open(encoding='utf-8') as f: + user_cfg = yaml.safe_load(f) or {} + cls.defaults_users = user_cfg.get('default_users', {}) - # Store occurrences: literal_ids -> {path: [(file, line), ...]}, variable paths likewise - cls.literal_paths_by_id = {} - cls.variable_paths = {} + # Preload role schemas: map application_id -> schema dict + cls.role_schemas = {} + cls.role_for_app = {} + roles_path = str(root / 'roles') + for app_id in cls.defaults_app: + try: + role = get_role(app_id, roles_path) + cls.role_for_app[app_id] = role + schema_file = root / 'roles' / role / 'schema' / 'main.yml' + with schema_file.open(encoding='utf-8') as sf: + schema = yaml.safe_load(sf) or {} + cls.role_schemas[app_id] = schema + except Exception: + # skip apps without schema or role + continue + + # Regex to find get_app_conf calls + cls.pattern = re.compile(r"get_app_conf\(\s*([^\),]+)\s*,\s*['\"]([^'\"]+)['\"]") + + # Scan files once + cls.literal_paths = {} # app_id -> {path: [(file,line)...]} + cls.variable_paths = {} # path -> [(file,line)...] - # Scan all files except tests/ for dirpath, dirs, files in os.walk(root): - if "tests" in Path(dirpath).parts: + # skip descending into the tests directory + if 'tests' in dirs: + dirs.remove('tests') + if 'tests' in Path(dirpath).parts: continue for fname in files: file_path = Path(dirpath) / fname try: - text = file_path.read_text(encoding="utf-8") - except (UnicodeDecodeError, PermissionError): + text = file_path.read_text(encoding='utf-8') + except Exception: continue for m in cls.pattern.finditer(text): - lineno = text.count("\n", 0, m.start()) + 1 - app_arg = m.group(1).strip() - path_arg = m.group(2).strip() - if (app_arg.startswith("'") and app_arg.endswith("'")) or ( - app_arg.startswith('"') and app_arg.endswith('"') - ): + lineno = text.count('\n', 0, m.start()) + 1 + app_arg, path_arg = m.group(1).strip(), m.group(2).strip() + if (app_arg.startswith("'") and app_arg.endswith("'")) or (app_arg.startswith('"') and app_arg.endswith('"')): app_id = app_arg.strip("'\"") - cls.literal_paths_by_id.setdefault(app_id, {}).setdefault(path_arg, []).append( - (file_path, lineno) - ) + cls.literal_paths.setdefault(app_id, {}).setdefault(path_arg, []).append((file_path, lineno)) else: - cls.variable_paths.setdefault(path_arg, []).append( - (file_path, lineno) - ) + cls.variable_paths.setdefault(path_arg, []).append((file_path, lineno)) - def _assert_nested_key(self, mapping, dotted_path, context): - keys = dotted_path.split('.') - current = mapping - for key in keys: - self.assertIsInstance(current, dict, - f"Expected dict at '{'.'.join(keys[:keys.index(key)])}' in {context}") - self.assertIn(key, current, - f"Missing '{key}' in path '{dotted_path}' under {context}") - current = current[key] - - def _validate_credentials(self, application_id, key, occs): - # Delegate to get_role filter to find role name - role = get_role(application_id, roles_path=str(self.root / 'roles')) - schema_file = self.root / 'roles' / role / 'schema' / 'main.yml' - self.assertTrue(schema_file.is_file(), f"Schema file not found: {schema_file}") - with schema_file.open(encoding="utf-8") as sf: - schema = yaml.safe_load(sf) or {} - creds = schema.get('credentials') - self.assertIsInstance(creds, dict, - f"'credentials' missing or not dict in {schema_file}") - self.assertIn(key, creds, - f"Missing credential '{key}' in {schema_file} for role '{role}'") - - def _validate_path(self, app_id, dotted_path, occs): - entry_app = self.defaults_app.get(app_id, {}) - try: - self._assert_nested_key(entry_app, dotted_path, app_id) - return - except AssertionError: - # users.* fallback - if dotted_path.startswith('users.'): - subpath = dotted_path.split('.', 1)[1] - try: - self._assert_nested_key(self.defaults_users, subpath, 'default_users') - return - except AssertionError: - pass - # credentials.* fallback via get_role - if dotted_path.startswith('credentials.'): - key = dotted_path.split('.', 1)[1] - self._validate_credentials(app_id, key, occs) - return - # images.* only require top-level images dict - if dotted_path.startswith('images.'): - if any('images' in cfg and isinstance(cfg['images'], dict) - for cfg in self.defaults_app.values()): - return - file_path, lineno = occs[0] - self.fail(f"'{dotted_path}' not found for '{app_id}'; called at {file_path}:{lineno}") + def assertNested(self, mapping, dotted, context): + keys = dotted.split('.') + cur = mapping + for k in keys: + assert isinstance(cur, dict), f"{context}: expected dict at {k}" + assert k in cur, f"{context}: missing '{k}' in '{dotted}'" + cur = cur[k] def test_literal_paths(self): - for app_id, paths in sorted(self.literal_paths_by_id.items()): - with self.subTest(app_id=app_id): - self.assertIn(app_id, self.defaults_app, - f"App '{app_id}' not in defaults_applications") - for dotted_path, occs in sorted(paths.items()): - with self.subTest(path=dotted_path): - self._validate_path(app_id, dotted_path, occs) + # Check each literal path exists + for app_id, paths in self.literal_paths.items(): + with self.subTest(app=app_id): + self.assertIn(app_id, self.defaults_app, f"App '{app_id}' missing in defaults_applications") + for dotted, occs in paths.items(): + with self.subTest(path=dotted): + self._validate(app_id, dotted, occs) def test_variable_paths(self): + # dynamic paths: must exist somewhere if not self.variable_paths: - self.skipTest("No dynamic calls found.") - for dotted_path, occs in sorted(self.variable_paths.items()): - with self.subTest(path=dotted_path): - valid = False - # credentials.* dynamic - if dotted_path.startswith('credentials.'): - key = dotted_path.split('.',1)[1] - for aid in self.defaults_app: - try: - self._validate_credentials(aid, key, occs) - valid = True; break - except AssertionError: - pass - if valid: continue - # images.* dynamic - if dotted_path.startswith('images.'): - if any('images' in cfg and isinstance(cfg['images'], dict) - for cfg in self.defaults_app.values()): + self.skipTest('No dynamic get_app_conf calls') + for dotted, occs in self.variable_paths.items(): + with self.subTest(path=dotted): + found = False + # credentials.*: check preloaded schemas + if dotted.startswith('credentials.'): + key = dotted.split('.',1)[1] + for aid, schema in self.role_schemas.items(): + creds = schema.get('credentials', {}) + if isinstance(creds, dict) and key in creds: + found = True; break + if found: continue + # images.*: any images dict + if dotted.startswith('images.'): + if any(isinstance(cfg.get('images'), dict) for cfg in self.defaults_app.values()): continue - # check app defaults + # users.*: default_users fallback + if dotted.startswith('users.'): + sub = dotted.split('.',1)[1] + if sub in self.defaults_users: + continue + # application defaults for aid, cfg in self.defaults_app.items(): try: - self._assert_nested_key(cfg, dotted_path, aid) - valid = True; break + self.assertNested(cfg, dotted, aid) + found = True; break except AssertionError: pass - # users.* fallback - if not valid and dotted_path.startswith('users.'): - subpath = dotted_path.split('.',1)[1] - try: - self._assert_nested_key(self.defaults_users, subpath, 'default_users') - valid = True - except AssertionError: - pass - if not valid: + if not found: file_path, lineno = occs[0] - self.fail(f"No entry for '{dotted_path}'; called at {file_path}:{lineno}") + self.fail(f"No entry for '{dotted}'; called at {file_path}:{lineno}") + + def _validate(self, app_id, dotted, occs): + # try app defaults + cfg = self.defaults_app.get(app_id, {}) + try: + self.assertNested(cfg, dotted, app_id) + return + except AssertionError: + pass + # users.* fallback + if dotted.startswith('users.'): + sub = dotted.split('.',1)[1] + if sub in self.defaults_users: + return + # credentials.* fallback + if dotted.startswith('credentials.'): + key = dotted.split('.',1)[1] + schema = self.role_schemas.get(app_id, {}) + creds = schema.get('credentials', {}) + self.assertIn(key, creds, f"Credential '{key}' missing for app '{app_id}'") + return + # images.* fallback + if dotted.startswith('images.'): + if isinstance(cfg.get('images'), dict): + return + # final fail + file_path, lineno = occs[0] + self.fail(f"'{dotted}' not found for '{app_id}'; called at {file_path}:{lineno}") -if __name__ == "__main__": +if __name__ == '__main__': unittest.main()