From d43fdc63ea9eabd77a0c9559a58aad280533f4d1 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Fri, 11 Jul 2025 01:34:44 +0200 Subject: [PATCH] Optimized inventory validator for wrong groups --- Makefile | 2 +- cli/meta/applications.py | 49 +++++ cli/validate/inventory.py | 194 +++++++++--------- main.py | 8 +- tasks/stages/01_constructor.yml | 2 +- tests/integration/test_cli_help.py | 2 +- .../unit/cli/validate/test_inventory_hosts.py | 71 +++++++ tests/unit/filter_plugins/test_get_domain.py | 1 - utils/test_main.py | 2 - 9 files changed, 226 insertions(+), 105 deletions(-) create mode 100644 cli/meta/applications.py create mode 100644 tests/unit/cli/validate/test_inventory_hosts.py diff --git a/Makefile b/Makefile index e650da23..83d295d2 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ USERS_OUT := ./group_vars/all/03_users.yml USERS_SCRIPT := ./cli/generate/defaults/users.py INCLUDES_SCRIPT := ./cli/generate/conditional_role_include.py -INCLUDE_GROUPS := $(shell python3 main.py meta invokable_paths -s "-" | tr '\n' ' ') +INCLUDE_GROUPS := $(shell python3 main.py meta invokable_paths -s "-" --no-signal | tr '\n' ' ') # Directory where these include-files will be written INCLUDES_OUT_DIR := ./tasks/groups diff --git a/cli/meta/applications.py b/cli/meta/applications.py new file mode 100644 index 00000000..b185ef9b --- /dev/null +++ b/cli/meta/applications.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +import argparse +import glob +import os +import sys + +try: + import yaml +except ImportError: + sys.stderr.write("PyYAML is required. Install with `pip install pyyaml`.\n") + sys.exit(1) + + +def find_application_ids(): + """ + Searches all files matching roles/*/vars/main.yml for the key 'application_id' + and returns a list of all found IDs. + """ + pattern = os.path.join('roles', '*', 'vars', 'main.yml') + app_ids = [] + + for filepath in glob.glob(pattern): + try: + with open(filepath, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) + except Exception as e: + sys.stderr.write(f"Error reading {filepath}: {e}\n") + continue + + if isinstance(data, dict) and 'application_id' in data: + app_ids.append(data['application_id']) + + return sorted(set(app_ids)) + + +def main(): + parser = argparse.ArgumentParser( + description='Output a list of all application_id values defined in roles/*/vars/main.yml' + ) + # No arguments other than --help + parser.parse_args() + + ids = find_application_ids() + for app_id in ids: + print(app_id) + + +if __name__ == '__main__': + main() diff --git a/cli/validate/inventory.py b/cli/validate/inventory.py index ac53fc14..eae6b929 100644 --- a/cli/validate/inventory.py +++ b/cli/validate/inventory.py @@ -5,140 +5,144 @@ import yaml import re from pathlib import Path +# Ensure imports work when run directly +script_dir = Path(__file__).resolve().parent +repo_root = script_dir.parent.parent +sys.path.insert(0, str(repo_root)) + +from cli.meta.applications import find_application_ids def load_yaml_file(path): try: - with open(path, "r", encoding="utf-8") as f: + with open(path, 'r', encoding='utf-8') as f: content = f.read() - content = re.sub(r'(?m)^([ \t]*[^\s:]+):\s*!vault[\s\S]+?(?=^\S|\Z)', r'\1: ""\n', content) + content = re.sub(r'(?m)^([ \t]*[^\s:]+):\s*!vault[\s\S]+?(?=^\S|\Z)', r"\1: \"\"\n", content) return yaml.safe_load(content) except Exception as e: print(f"Warning: Could not parse {path}: {e}", file=sys.stderr) return None -def recursive_keys(d, prefix=""): +def recursive_keys(d, prefix=''): keys = set() if isinstance(d, dict): for k, v in d.items(): - full_key = f"{prefix}.{k}" if prefix else k - keys.add(full_key) - keys.update(recursive_keys(v, full_key)) + full = f"{prefix}.{k}" if prefix else k + keys.add(full) + keys.update(recursive_keys(v, full)) return keys -def compare_application_keys(applications, defaults, source_file): - errors = [] - for app_id, app_conf in applications.items(): +def compare_application_keys(applications, defaults, source): + errs = [] + for app_id, conf in applications.items(): if app_id not in defaults: - errors.append(f"{source_file}: Unknown application '{app_id}' (not in defaults_applications)") + errs.append(f"{source}: Unknown application '{app_id}' (not in defaults_applications)") continue - - default_conf = defaults.get(app_id, {}) - app_keys = recursive_keys(app_conf) - default_keys = recursive_keys(default_conf) - + default = defaults[app_id] + app_keys = recursive_keys(conf) + def_keys = recursive_keys(default) for key in app_keys: - if key.startswith("credentials"): - continue # explicitly ignore credentials - if key not in default_keys: - errors.append(f"{source_file}: Missing default for {app_id}: {key}") - return errors + if key.startswith('credentials'): + continue + if key not in def_keys: + errs.append(f"{source}: Missing default for {app_id}: {key}") + return errs -def compare_user_keys(users, default_users, source_file): - errors = [] - for username, user_conf in users.items(): - if username not in default_users: - print(f"Warning: {source_file}: Unknown user '{username}' (not in default_users)", file=sys.stderr) +def compare_user_keys(users, default_users, source): + errs = [] + for user, conf in users.items(): + if user not in default_users: + print(f"Warning: {source}: Unknown user '{user}' (not in default_users)", file=sys.stderr) continue - - default_conf = default_users.get(username, {}) - for key in user_conf: - if key in ("password", "credentials", "mailu_token"): - continue # ignore credentials/password - if key not in default_conf: - raise Exception(f"{source_file}: Missing default for user '{username}': key '{key}'") - return errors + def_conf = default_users[user] + for key in conf: + if key in ('password','credentials','mailu_token'): + continue + if key not in def_conf: + errs.append(f"Missing default for user '{user}': key '{key}'") + return errs -def load_inventory_files(inventory_dir): +def load_inventory_files(inv_dir): all_data = {} - inventory_path = Path(inventory_dir) - - for path in inventory_path.glob("*.yml"): - data = load_yaml_file(path) + p = Path(inv_dir) + for f in p.glob('*.yml'): + data = load_yaml_file(f) if isinstance(data, dict): - applications = data.get("applications") or data.get("defaults_applications") - if applications: - all_data[path] = applications - - for vars_folder in inventory_path.glob("*_vars"): - if vars_folder.is_dir(): - for subfile in vars_folder.rglob("*.yml"): - data = load_yaml_file(subfile) + apps = data.get('applications') or data.get('defaults_applications') + if apps: + all_data[str(f)] = apps + for d in p.glob('*_vars'): + if d.is_dir(): + for f in d.rglob('*.yml'): + data = load_yaml_file(f) if isinstance(data, dict): - applications = data.get("applications") or data.get("defaults_applications") - if applications: - all_data[subfile] = applications - + apps = data.get('applications') or data.get('defaults_applications') + if apps: + all_data[str(f)] = apps return all_data +def validate_host_keys(app_ids, inv_dir): + errs = [] + f = Path(inv_dir) / 'servers.yml' + data = load_yaml_file(f) + if isinstance(data, dict): + children = data.get('all',{}).get('children',{}) + for grp in children: + if grp not in app_ids: + errs.append(f"{f}: Invalid group '{grp}' (not in application_ids)") + return errs + + def find_single_file(pattern): - candidates = list(Path("group_vars/all").glob(pattern)) - if len(candidates) != 1: - raise RuntimeError(f"Expected exactly one {pattern} file in group_vars/all, found {len(candidates)}") - return candidates[0] + c = list(Path('group_vars/all').glob(pattern)) + if len(c)!=1: + raise RuntimeError(f"Expected exactly one {pattern} in group_vars/all, found {len(c)}") + return c[0] def main(): - parser = argparse.ArgumentParser(description="Verify application and user variable consistency with defaults.") - parser.add_argument("inventory_dir", help="Path to inventory directory (contains inventory.yml and *_vars/)") - args = parser.parse_args() - - defaults_path = find_single_file("*_applications.yml") - users_path = find_single_file("*users.yml") - - defaults_data = load_yaml_file(defaults_path) - default_users_data = load_yaml_file(users_path) - - defaults = defaults_data.get("defaults_applications", {}) if defaults_data else {} - default_users = default_users_data.get("default_users", {}) if default_users_data else {} - + p = argparse.ArgumentParser() + p.add_argument('inventory_dir') + args = p.parse_args() + # defaults + dfile = find_single_file('*_applications.yml') + ufile = find_single_file('*users.yml') + ddata = load_yaml_file(dfile) or {} + udata = load_yaml_file(ufile) or {} + defaults = ddata.get('defaults_applications',{}) + default_users = udata.get('default_users',{}) if not defaults: - print(f"Error: No 'defaults_applications' found in {defaults_path}.", file=sys.stderr) + print(f"Error: No 'defaults_applications' found in {dfile}", file=sys.stderr) sys.exit(1) if not default_users: - print(f"Error: No 'default_users' found in {users_path}.", file=sys.stderr) + print(f"Error: No 'default_users' found in {ufile}", file=sys.stderr) sys.exit(1) - - all_errors = [] - - inventory_files = load_inventory_files(args.inventory_dir) - for source_path, app_data in inventory_files.items(): - errors = compare_application_keys(app_data, defaults, str(source_path)) - all_errors.extend(errors) - - # Load all users.yml files from inventory - for path in Path(args.inventory_dir).rglob("*.yml"): - data = load_yaml_file(path) - if isinstance(data, dict) and "users" in data: - try: - compare_user_keys(data["users"], default_users, str(path)) - except Exception as e: + app_errs = [] + inv_files = load_inventory_files(args.inventory_dir) + for src, apps in inv_files.items(): + app_errs.extend(compare_application_keys(apps, defaults, src)) + user_errs = [] + for fpath in Path(args.inventory_dir).rglob('*.yml'): + data = load_yaml_file(fpath) + if isinstance(data, dict) and 'users' in data: + errs = compare_user_keys(data['users'], default_users, str(fpath)) + for e in errs: print(e, file=sys.stderr) - sys.exit(1) - - if all_errors: - print("Validation failed with the following issues:") - for err in all_errors: - print("-", err) + user_errs.extend(errs) + host_errs = validate_host_keys(find_application_ids(), args.inventory_dir) + app_errs.extend(host_errs) + if app_errs or user_errs: + if app_errs: + print('Validation failed with the following issues:') + for e in app_errs: + print(f"- {e}") sys.exit(1) - else: - print("Inventory directory is valid against defaults.") - sys.exit(0) + print('Inventory directory is valid against defaults and hosts.') + sys.exit(0) - -if __name__ == "__main__": +if __name__=='__main__': main() diff --git a/main.py b/main.py index 42ee235b..aed050d7 100755 --- a/main.py +++ b/main.py @@ -97,12 +97,12 @@ def play_start_intro(): def failure_with_warning_loop(no_signal, sound_enabled): - if sound_enabled and not no_signal: + if not no_signal: Sound.play_finished_failed_sound() print(color_text("Warning: command failed. Press Ctrl+C to stop warnings.", Fore.RED)) try: while True: - if sound_enabled: + if not no_signal: Sound.play_warning_sound() except KeyboardInterrupt: print(color_text("Warnings stopped by user.", Fore.YELLOW)) @@ -118,7 +118,7 @@ if __name__ == "__main__": # Segfault handler def segv_handler(signum, frame): - if sound_enabled and not no_signal: + if not no_signal: Sound.play_finished_failed_sound() try: while True: @@ -303,7 +303,7 @@ if __name__ == "__main__": failure_with_warning_loop(no_signal, sound_enabled) sys.exit(rc) else: - if sound_enabled and not no_signal: + if not no_signal: Sound.play_finished_successfully_sound() return True except Exception as e: diff --git a/tasks/stages/01_constructor.yml b/tasks/stages/01_constructor.yml index 5bc4a189..a09902d5 100644 --- a/tasks/stages/01_constructor.yml +++ b/tasks/stages/01_constructor.yml @@ -72,7 +72,7 @@ recursive=True )) | generate_all_domains( - ('www_redirect' in group_names) + ('redir-www' in group_names) ) }} diff --git a/tests/integration/test_cli_help.py b/tests/integration/test_cli_help.py index c6d94b7a..5040c864 100644 --- a/tests/integration/test_cli_help.py +++ b/tests/integration/test_cli_help.py @@ -34,7 +34,7 @@ class CLIHelpIntegrationTest(unittest.TestCase): segments = rel_dir.split(os.sep) + [cmd_name] with self.subTest(command=' '.join(segments)): - cmd = [self.python, self.main_py] + segments + ['--help'] + cmd = [self.python, self.main_py] + segments + ['--help', '--no-signal'] result = subprocess.run( cmd, capture_output=True, text=True ) diff --git a/tests/unit/cli/validate/test_inventory_hosts.py b/tests/unit/cli/validate/test_inventory_hosts.py new file mode 100644 index 00000000..fd3876ea --- /dev/null +++ b/tests/unit/cli/validate/test_inventory_hosts.py @@ -0,0 +1,71 @@ +import os +import re +import yaml +import tempfile +import shutil +import unittest +from unittest.mock import patch + +# Module under test +import cli.validate.inventory as inventory_mod + +class TestValidateHostKeys(unittest.TestCase): + def setUp(self): + # Create a temporary directory for servers.yml + self.test_dir = tempfile.mkdtemp() + # Patch find_application_ids to a fixed set + self.valid_ids = {'valid-service', 'another-service'} + patcher = patch.object(inventory_mod, 'find_application_ids', return_value=self.valid_ids) + self.mock_find_ids = patcher.start() + self.addCleanup(patcher.stop) + + def tearDown(self): + shutil.rmtree(self.test_dir) + + def write_servers_yaml(self, groups): + # Build the YAML structure + content = {'all': {'children': {}}} + for group, hosts in groups.items(): + # hosts is a list of hostnames + content['all']['children'][group] = {'hosts': {h: None for h in hosts}} + path = os.path.join(self.test_dir, 'servers.yml') + with open(path, 'w', encoding='utf-8') as f: + yaml.safe_dump(content, f) + return path + + def test_no_invalid_groups(self): + # All groups valid + groups = { + 'valid-service': ['host1'], + 'another-service': ['host2', 'host3'] + } + self.write_servers_yaml(groups) + errors = inventory_mod.validate_host_keys(self.valid_ids, self.test_dir) + self.assertEqual(errors, [], f"Expected no errors, got {errors}") + + def test_single_invalid_group(self): + # One invalid group + groups = { + 'valid-service': ['host1'], + 'invalid-service': ['host2'] + } + self.write_servers_yaml(groups) + errors = inventory_mod.validate_host_keys(self.valid_ids, self.test_dir) + self.assertEqual(len(errors), 1) + self.assertRegex(errors[0], r"Invalid group 'invalid-service'.*not in application_ids") + + def test_multiple_invalid_groups(self): + # Multiple invalid groups + groups = { + 'bad-one': ['h1'], + 'bad-two': ['h2'], + 'valid-service': ['h3'] + } + self.write_servers_yaml(groups) + errors = inventory_mod.validate_host_keys(self.valid_ids, self.test_dir) + self.assertEqual(len(errors), 2) + found = {re.search(r"Invalid group '(.+?)'", e).group(1) for e in errors} + self.assertSetEqual(found, {'bad-one', 'bad-two'}) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/filter_plugins/test_get_domain.py b/tests/unit/filter_plugins/test_get_domain.py index dbd11078..26e9ce5b 100644 --- a/tests/unit/filter_plugins/test_get_domain.py +++ b/tests/unit/filter_plugins/test_get_domain.py @@ -1,4 +1,3 @@ -# tests/unit/test_get_domain.py import unittest import sys import os diff --git a/utils/test_main.py b/utils/test_main.py index e40333ec..48db9489 100644 --- a/utils/test_main.py +++ b/utils/test_main.py @@ -1,5 +1,3 @@ -# tests/unit/test_main.py - import os import sys import stat