mirror of
				https://github.com/kevinveenbirkenbach/computer-playbook.git
				synced 2025-11-04 12:18:17 +00:00 
			
		
		
		
	Added validation for deploy application ids
This commit is contained in:
		@@ -6,7 +6,6 @@ import os
 | 
				
			|||||||
import datetime
 | 
					import datetime
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
def run_ansible_playbook(
 | 
					def run_ansible_playbook(
 | 
				
			||||||
    inventory,
 | 
					    inventory,
 | 
				
			||||||
    modes,
 | 
					    modes,
 | 
				
			||||||
@@ -81,6 +80,24 @@ def run_ansible_playbook(
 | 
				
			|||||||
    duration = end_time - start_time
 | 
					    duration = end_time - start_time
 | 
				
			||||||
    print(f"⏱️ Total execution time: {duration}\n")
 | 
					    print(f"⏱️ Total execution time: {duration}\n")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def validate_application_ids(inventory, app_ids):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Abort the script if any application IDs are invalid, with detailed reasons.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    from utils.valid_deploy_id import ValidDeployId
 | 
				
			||||||
 | 
					    validator = ValidDeployId()
 | 
				
			||||||
 | 
					    invalid = validator.validate(inventory, app_ids)
 | 
				
			||||||
 | 
					    if invalid:
 | 
				
			||||||
 | 
					        print("\n❌ Detected invalid application_id(s):\n")
 | 
				
			||||||
 | 
					        for app_id, status in invalid.items():
 | 
				
			||||||
 | 
					            reasons = []
 | 
				
			||||||
 | 
					            if not status['in_roles']:
 | 
				
			||||||
 | 
					                reasons.append("not defined in roles (cymais)")
 | 
				
			||||||
 | 
					            if not status['in_inventory']:
 | 
				
			||||||
 | 
					                reasons.append("not found in inventory file")
 | 
				
			||||||
 | 
					            print(f"  - {app_id}: " + ", ".join(reasons))
 | 
				
			||||||
 | 
					        sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def main():
 | 
					def main():
 | 
				
			||||||
    parser = argparse.ArgumentParser(
 | 
					    parser = argparse.ArgumentParser(
 | 
				
			||||||
@@ -150,6 +167,7 @@ def main():
 | 
				
			|||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    args = parser.parse_args()
 | 
					    args = parser.parse_args()
 | 
				
			||||||
 | 
					    validate_application_ids(args.inventory, args.id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    modes = {
 | 
					    modes = {
 | 
				
			||||||
        "mode_reset": args.reset,
 | 
					        "mode_reset": args.reset,
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										116
									
								
								tests/unit/utils/test_valid_deploy_id.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								tests/unit/utils/test_valid_deploy_id.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,116 @@
 | 
				
			|||||||
 | 
					# File: tests/unit/utils/test_valid_deploy_id.py
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import tempfile
 | 
				
			||||||
 | 
					import unittest
 | 
				
			||||||
 | 
					import yaml
 | 
				
			||||||
 | 
					from utils.valid_deploy_id import ValidDeployId
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class TestValidDeployId(unittest.TestCase):
 | 
				
			||||||
 | 
					    def setUp(self):
 | 
				
			||||||
 | 
					        # Create a temporary directory for roles
 | 
				
			||||||
 | 
					        self.temp_dir = tempfile.TemporaryDirectory()
 | 
				
			||||||
 | 
					        self.roles_dir = os.path.join(self.temp_dir.name, 'roles')
 | 
				
			||||||
 | 
					        os.makedirs(self.roles_dir)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Create a dummy role with application_id 'app1'
 | 
				
			||||||
 | 
					        role_path = os.path.join(self.roles_dir, 'role1', 'vars')
 | 
				
			||||||
 | 
					        os.makedirs(role_path)
 | 
				
			||||||
 | 
					        with open(os.path.join(role_path, 'main.yml'), 'w', encoding='utf-8') as f:
 | 
				
			||||||
 | 
					            yaml.safe_dump({'application_id': 'app1'}, f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Initialize validator with our temp roles_dir
 | 
				
			||||||
 | 
					        self.validator = ValidDeployId(roles_dir=self.roles_dir)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def tearDown(self):
 | 
				
			||||||
 | 
					        self.temp_dir.cleanup()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _write_ini_inventory(self, content):
 | 
				
			||||||
 | 
					        fd, path = tempfile.mkstemp(suffix='.ini')
 | 
				
			||||||
 | 
					        os.close(fd)
 | 
				
			||||||
 | 
					        with open(path, 'w', encoding='utf-8') as f:
 | 
				
			||||||
 | 
					            f.write(content)
 | 
				
			||||||
 | 
					        return path
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _write_yaml_inventory(self, data):
 | 
				
			||||||
 | 
					        fd, path = tempfile.mkstemp(suffix='.yml')
 | 
				
			||||||
 | 
					        os.close(fd)
 | 
				
			||||||
 | 
					        with open(path, 'w', encoding='utf-8') as f:
 | 
				
			||||||
 | 
					            yaml.safe_dump(data, f)
 | 
				
			||||||
 | 
					        return path
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_valid_in_roles_and_ini_inventory(self):
 | 
				
			||||||
 | 
					        # Inventory contains app1 as a host
 | 
				
			||||||
 | 
					        ini_content = """
 | 
				
			||||||
 | 
					        [servers]
 | 
				
			||||||
 | 
					        app1,otherhost
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        inv = self._write_ini_inventory(ini_content)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['app1'])
 | 
				
			||||||
 | 
					        self.assertEqual(result, {}, "app1 should be valid when in roles and ini inventory")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_missing_in_roles(self):
 | 
				
			||||||
 | 
					        # Inventory contains app2 but roles only have app1
 | 
				
			||||||
 | 
					        ini_content = """
 | 
				
			||||||
 | 
					        [servers]
 | 
				
			||||||
 | 
					        app2
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        inv = self._write_ini_inventory(ini_content)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['app2'])
 | 
				
			||||||
 | 
					        # app2 not in roles, but in inventory
 | 
				
			||||||
 | 
					        expected = {'app2': {'in_roles': False, 'in_inventory': True}}
 | 
				
			||||||
 | 
					        self.assertEqual(result, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_missing_in_inventory_ini(self):
 | 
				
			||||||
 | 
					        # Roles have app1 but inventory does not mention it
 | 
				
			||||||
 | 
					        ini_content = """
 | 
				
			||||||
 | 
					        [servers]
 | 
				
			||||||
 | 
					        otherhost
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        inv = self._write_ini_inventory(ini_content)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['app1'])
 | 
				
			||||||
 | 
					        expected = {'app1': {'in_roles': True, 'in_inventory': False}}
 | 
				
			||||||
 | 
					        self.assertEqual(result, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_missing_both_ini(self):
 | 
				
			||||||
 | 
					        # Neither roles nor inventory have appX
 | 
				
			||||||
 | 
					        ini_content = """
 | 
				
			||||||
 | 
					        [servers]
 | 
				
			||||||
 | 
					        otherhost
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        inv = self._write_ini_inventory(ini_content)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['appX'])
 | 
				
			||||||
 | 
					        expected = {'appX': {'in_roles': False, 'in_inventory': False}}
 | 
				
			||||||
 | 
					        self.assertEqual(result, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_valid_in_roles_and_yaml_inventory(self):
 | 
				
			||||||
 | 
					        # YAML inventory with app1 as a dict key
 | 
				
			||||||
 | 
					        data = {'app1': {'hosts': ['app1']}, 'group': {'app1': {}}}
 | 
				
			||||||
 | 
					        inv = self._write_yaml_inventory(data)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['app1'])
 | 
				
			||||||
 | 
					        self.assertEqual(result, {}, "app1 should be valid in roles and yaml inventory")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_missing_in_roles_yaml(self):
 | 
				
			||||||
 | 
					        # YAML inventory has app2 key but roles only have app1
 | 
				
			||||||
 | 
					        data = {'app2': {}}
 | 
				
			||||||
 | 
					        inv = self._write_yaml_inventory(data)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['app2'])
 | 
				
			||||||
 | 
					        expected = {'app2': {'in_roles': False, 'in_inventory': True}}
 | 
				
			||||||
 | 
					        self.assertEqual(result, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_missing_in_inventory_yaml(self):
 | 
				
			||||||
 | 
					        # Roles have app1 but YAML inventory has no app1
 | 
				
			||||||
 | 
					        data = {'group': {'other': {}}}
 | 
				
			||||||
 | 
					        inv = self._write_yaml_inventory(data)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['app1'])
 | 
				
			||||||
 | 
					        expected = {'app1': {'in_roles': True, 'in_inventory': False}}
 | 
				
			||||||
 | 
					        self.assertEqual(result, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_missing_both_yaml(self):
 | 
				
			||||||
 | 
					        data = {}
 | 
				
			||||||
 | 
					        inv = self._write_yaml_inventory(data)
 | 
				
			||||||
 | 
					        result = self.validator.validate(inv, ['unknown'])
 | 
				
			||||||
 | 
					        expected = {'unknown': {'in_roles': False, 'in_inventory': False}}
 | 
				
			||||||
 | 
					        self.assertEqual(result, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == '__main__':
 | 
				
			||||||
 | 
					    unittest.main()
 | 
				
			||||||
							
								
								
									
										89
									
								
								utils/valid_deploy_id.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								utils/valid_deploy_id.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,89 @@
 | 
				
			|||||||
 | 
					# File: utils/valid_deploy_id.py
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					Utility for validating deployment application IDs against defined roles and inventory.
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import yaml
 | 
				
			||||||
 | 
					import glob
 | 
				
			||||||
 | 
					import configparser
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from filter_plugins.get_all_application_ids import get_all_application_ids
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class ValidDeployId:
 | 
				
			||||||
 | 
					    def __init__(self, roles_dir='roles'):
 | 
				
			||||||
 | 
					        # Load all known application IDs from roles
 | 
				
			||||||
 | 
					        self.valid_ids = set(get_all_application_ids(roles_dir))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def validate(self, inventory_path, ids):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Validate a list of application IDs against both role definitions and inventory.
 | 
				
			||||||
 | 
					        Returns a dict mapping invalid IDs to their presence status.
 | 
				
			||||||
 | 
					        Example:
 | 
				
			||||||
 | 
					          {
 | 
				
			||||||
 | 
					            "app1": {"in_roles": False, "in_inventory": True},
 | 
				
			||||||
 | 
					            "app2": {"in_roles": True, "in_inventory": False}
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        invalid = {}
 | 
				
			||||||
 | 
					        for app_id in ids:
 | 
				
			||||||
 | 
					            in_roles = app_id in self.valid_ids
 | 
				
			||||||
 | 
					            in_inventory = self._exists_in_inventory(inventory_path, app_id)
 | 
				
			||||||
 | 
					            if not (in_roles and in_inventory):
 | 
				
			||||||
 | 
					                invalid[app_id] = {
 | 
				
			||||||
 | 
					                    'in_roles': in_roles,
 | 
				
			||||||
 | 
					                    'in_inventory': in_inventory
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					        return invalid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _exists_in_inventory(self, inventory_path, app_id):
 | 
				
			||||||
 | 
					        _, ext = os.path.splitext(inventory_path)
 | 
				
			||||||
 | 
					        if ext in ('.yml', '.yaml'):
 | 
				
			||||||
 | 
					            return self._search_yaml_keys(inventory_path, app_id)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            return self._search_ini_sections(inventory_path, app_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _search_ini_sections(self, inventory_path, app_id):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Manually parse INI inventory for sections and host lists.
 | 
				
			||||||
 | 
					        Returns True if app_id matches a section name or a host in a section.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        present = False
 | 
				
			||||||
 | 
					        with open(inventory_path, 'r', encoding='utf-8') as f:
 | 
				
			||||||
 | 
					            current_section = None
 | 
				
			||||||
 | 
					            for raw in f:
 | 
				
			||||||
 | 
					                line = raw.strip()
 | 
				
			||||||
 | 
					                # Skip blanks and comments
 | 
				
			||||||
 | 
					                if not line or line.startswith(('#', ';')):
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					                # Section header
 | 
				
			||||||
 | 
					                if line.startswith('[') and line.endswith(']'):
 | 
				
			||||||
 | 
					                    current_section = line[1:-1].strip()
 | 
				
			||||||
 | 
					                    if current_section == app_id:
 | 
				
			||||||
 | 
					                        return True
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					                # Host or variable line under a section
 | 
				
			||||||
 | 
					                if current_section:
 | 
				
			||||||
 | 
					                    # Split on commas or whitespace
 | 
				
			||||||
 | 
					                    for part in [p.strip() for p in line.replace(',', ' ').split()]:
 | 
				
			||||||
 | 
					                        if part == app_id:
 | 
				
			||||||
 | 
					                            return True
 | 
				
			||||||
 | 
					        return False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _search_yaml_keys(self, inventory_path, app_id):
 | 
				
			||||||
 | 
					        with open(inventory_path, 'r', encoding='utf-8') as f:
 | 
				
			||||||
 | 
					            data = yaml.safe_load(f)
 | 
				
			||||||
 | 
					        return self._find_key(data, app_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _find_key(self, node, key):  # recursive search
 | 
				
			||||||
 | 
					        if isinstance(node, dict):
 | 
				
			||||||
 | 
					            for k, v in node.items():
 | 
				
			||||||
 | 
					                # If key matches and maps to a dict or list, consider it present
 | 
				
			||||||
 | 
					                if k == key and isinstance(v, (dict, list)):
 | 
				
			||||||
 | 
					                    return True
 | 
				
			||||||
 | 
					                if self._find_key(v, key):
 | 
				
			||||||
 | 
					                    return True
 | 
				
			||||||
 | 
					        elif isinstance(node, list):
 | 
				
			||||||
 | 
					            for item in node:
 | 
				
			||||||
 | 
					                if self._find_key(item, key):
 | 
				
			||||||
 | 
					                    return True
 | 
				
			||||||
 | 
					        return False
 | 
				
			||||||
		Reference in New Issue
	
	Block a user