diff --git a/cli/deploy.py b/cli/deploy.py index 5e6d040a..9e535cb4 100644 --- a/cli/deploy.py +++ b/cli/deploy.py @@ -6,7 +6,6 @@ import os import datetime import sys - def run_ansible_playbook( inventory, modes, @@ -81,6 +80,24 @@ def run_ansible_playbook( duration = end_time - start_time print(f"⏱️ Total execution time: {duration}\n") +def validate_application_ids(inventory, app_ids): + """ + Abort the script if any application IDs are invalid, with detailed reasons. + """ + from utils.valid_deploy_id import ValidDeployId + validator = ValidDeployId() + invalid = validator.validate(inventory, app_ids) + if invalid: + print("\n❌ Detected invalid application_id(s):\n") + for app_id, status in invalid.items(): + reasons = [] + if not status['in_roles']: + reasons.append("not defined in roles (cymais)") + if not status['in_inventory']: + reasons.append("not found in inventory file") + print(f" - {app_id}: " + ", ".join(reasons)) + sys.exit(1) + def main(): parser = argparse.ArgumentParser( @@ -150,6 +167,7 @@ def main(): ) args = parser.parse_args() + validate_application_ids(args.inventory, args.id) modes = { "mode_reset": args.reset, diff --git a/tests/unit/utils/test_valid_deploy_id.py b/tests/unit/utils/test_valid_deploy_id.py new file mode 100644 index 00000000..61fa86a6 --- /dev/null +++ b/tests/unit/utils/test_valid_deploy_id.py @@ -0,0 +1,116 @@ +# File: tests/unit/utils/test_valid_deploy_id.py +import os +import tempfile +import unittest +import yaml +from utils.valid_deploy_id import ValidDeployId + +class TestValidDeployId(unittest.TestCase): + def setUp(self): + # Create a temporary directory for roles + self.temp_dir = tempfile.TemporaryDirectory() + self.roles_dir = os.path.join(self.temp_dir.name, 'roles') + os.makedirs(self.roles_dir) + + # Create a dummy role with application_id 'app1' + role_path = os.path.join(self.roles_dir, 'role1', 'vars') + os.makedirs(role_path) + with open(os.path.join(role_path, 'main.yml'), 'w', encoding='utf-8') as f: + yaml.safe_dump({'application_id': 'app1'}, f) + + # Initialize validator with our temp roles_dir + self.validator = ValidDeployId(roles_dir=self.roles_dir) + + def tearDown(self): + self.temp_dir.cleanup() + + def _write_ini_inventory(self, content): + fd, path = tempfile.mkstemp(suffix='.ini') + os.close(fd) + with open(path, 'w', encoding='utf-8') as f: + f.write(content) + return path + + def _write_yaml_inventory(self, data): + fd, path = tempfile.mkstemp(suffix='.yml') + os.close(fd) + with open(path, 'w', encoding='utf-8') as f: + yaml.safe_dump(data, f) + return path + + def test_valid_in_roles_and_ini_inventory(self): + # Inventory contains app1 as a host + ini_content = """ + [servers] + app1,otherhost + """ + inv = self._write_ini_inventory(ini_content) + result = self.validator.validate(inv, ['app1']) + self.assertEqual(result, {}, "app1 should be valid when in roles and ini inventory") + + def test_missing_in_roles(self): + # Inventory contains app2 but roles only have app1 + ini_content = """ + [servers] + app2 + """ + inv = self._write_ini_inventory(ini_content) + result = self.validator.validate(inv, ['app2']) + # app2 not in roles, but in inventory + expected = {'app2': {'in_roles': False, 'in_inventory': True}} + self.assertEqual(result, expected) + + def test_missing_in_inventory_ini(self): + # Roles have app1 but inventory does not mention it + ini_content = """ + [servers] + otherhost + """ + inv = self._write_ini_inventory(ini_content) + result = self.validator.validate(inv, ['app1']) + expected = {'app1': {'in_roles': True, 'in_inventory': False}} + self.assertEqual(result, expected) + + def test_missing_both_ini(self): + # Neither roles nor inventory have appX + ini_content = """ + [servers] + otherhost + """ + inv = self._write_ini_inventory(ini_content) + result = self.validator.validate(inv, ['appX']) + expected = {'appX': {'in_roles': False, 'in_inventory': False}} + self.assertEqual(result, expected) + + def test_valid_in_roles_and_yaml_inventory(self): + # YAML inventory with app1 as a dict key + data = {'app1': {'hosts': ['app1']}, 'group': {'app1': {}}} + inv = self._write_yaml_inventory(data) + result = self.validator.validate(inv, ['app1']) + self.assertEqual(result, {}, "app1 should be valid in roles and yaml inventory") + + def test_missing_in_roles_yaml(self): + # YAML inventory has app2 key but roles only have app1 + data = {'app2': {}} + inv = self._write_yaml_inventory(data) + result = self.validator.validate(inv, ['app2']) + expected = {'app2': {'in_roles': False, 'in_inventory': True}} + self.assertEqual(result, expected) + + def test_missing_in_inventory_yaml(self): + # Roles have app1 but YAML inventory has no app1 + data = {'group': {'other': {}}} + inv = self._write_yaml_inventory(data) + result = self.validator.validate(inv, ['app1']) + expected = {'app1': {'in_roles': True, 'in_inventory': False}} + self.assertEqual(result, expected) + + def test_missing_both_yaml(self): + data = {} + inv = self._write_yaml_inventory(data) + result = self.validator.validate(inv, ['unknown']) + expected = {'unknown': {'in_roles': False, 'in_inventory': False}} + self.assertEqual(result, expected) + +if __name__ == '__main__': + unittest.main() diff --git a/utils/valid_deploy_id.py b/utils/valid_deploy_id.py new file mode 100644 index 00000000..51ae8e47 --- /dev/null +++ b/utils/valid_deploy_id.py @@ -0,0 +1,89 @@ +# File: utils/valid_deploy_id.py +""" +Utility for validating deployment application IDs against defined roles and inventory. +""" +import os +import yaml +import glob +import configparser + +from filter_plugins.get_all_application_ids import get_all_application_ids + +class ValidDeployId: + def __init__(self, roles_dir='roles'): + # Load all known application IDs from roles + self.valid_ids = set(get_all_application_ids(roles_dir)) + + def validate(self, inventory_path, ids): + """ + Validate a list of application IDs against both role definitions and inventory. + Returns a dict mapping invalid IDs to their presence status. + Example: + { + "app1": {"in_roles": False, "in_inventory": True}, + "app2": {"in_roles": True, "in_inventory": False} + } + """ + invalid = {} + for app_id in ids: + in_roles = app_id in self.valid_ids + in_inventory = self._exists_in_inventory(inventory_path, app_id) + if not (in_roles and in_inventory): + invalid[app_id] = { + 'in_roles': in_roles, + 'in_inventory': in_inventory + } + return invalid + + def _exists_in_inventory(self, inventory_path, app_id): + _, ext = os.path.splitext(inventory_path) + if ext in ('.yml', '.yaml'): + return self._search_yaml_keys(inventory_path, app_id) + else: + return self._search_ini_sections(inventory_path, app_id) + + def _search_ini_sections(self, inventory_path, app_id): + """ + Manually parse INI inventory for sections and host lists. + Returns True if app_id matches a section name or a host in a section. + """ + present = False + with open(inventory_path, 'r', encoding='utf-8') as f: + current_section = None + for raw in f: + line = raw.strip() + # Skip blanks and comments + if not line or line.startswith(('#', ';')): + continue + # Section header + if line.startswith('[') and line.endswith(']'): + current_section = line[1:-1].strip() + if current_section == app_id: + return True + continue + # Host or variable line under a section + if current_section: + # Split on commas or whitespace + for part in [p.strip() for p in line.replace(',', ' ').split()]: + if part == app_id: + return True + return False + + def _search_yaml_keys(self, inventory_path, app_id): + with open(inventory_path, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) + return self._find_key(data, app_id) + + def _find_key(self, node, key): # recursive search + if isinstance(node, dict): + for k, v in node.items(): + # If key matches and maps to a dict or list, consider it present + if k == key and isinstance(v, (dict, list)): + return True + if self._find_key(v, key): + return True + elif isinstance(node, list): + for item in node: + if self._find_key(item, key): + return True + return False