mirror of
				https://github.com/kevinveenbirkenbach/computer-playbook.git
				synced 2025-11-04 04:08:15 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			1d52fcec75
			...
			82f442f40e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 82f442f40e | |||
| 9764941c7e | |||
| 9d9f11cb3d | 
							
								
								
									
										2
									
								
								cli/TODO.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								cli/TODO.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,2 @@
 | 
			
		||||
# Todo
 | 
			
		||||
- Test this script. It's just a draft. Checkout https://chatgpt.com/c/681d9e2b-7b28-800f-aef8-4f1427e9021d
 | 
			
		||||
							
								
								
									
										153
									
								
								cli/generate_vaulted_credentials.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										153
									
								
								cli/generate_vaulted_credentials.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,153 @@
 | 
			
		||||
import yaml
 | 
			
		||||
import argparse
 | 
			
		||||
import secrets
 | 
			
		||||
import hashlib
 | 
			
		||||
import bcrypt
 | 
			
		||||
import subprocess
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
def prompt(text, default=None):
 | 
			
		||||
    """Prompt the user for input, with optional default value."""
 | 
			
		||||
    prompt_text = f"[?] {text}" + (f" [{default}]" if default else "") + ": "
 | 
			
		||||
    response = input(prompt_text)
 | 
			
		||||
    return response.strip() or default
 | 
			
		||||
 | 
			
		||||
def generate_value(algorithm):
 | 
			
		||||
    """Generate a value based on the provided algorithm."""
 | 
			
		||||
    if algorithm == "random_hex":
 | 
			
		||||
        return secrets.token_hex(64)
 | 
			
		||||
    elif algorithm == "sha256":
 | 
			
		||||
        return hashlib.sha256(secrets.token_bytes(32)).hexdigest()
 | 
			
		||||
    elif algorithm == "sha1":
 | 
			
		||||
        return hashlib.sha1(secrets.token_bytes(20)).hexdigest()
 | 
			
		||||
    elif algorithm == "bcrypt":
 | 
			
		||||
        password = secrets.token_urlsafe(16).encode()
 | 
			
		||||
        return bcrypt.hashpw(password, bcrypt.gensalt()).decode()
 | 
			
		||||
    elif algorithm == "plain":
 | 
			
		||||
        return secrets.token_urlsafe(32)
 | 
			
		||||
    else:
 | 
			
		||||
        return "undefined"
 | 
			
		||||
 | 
			
		||||
def encrypt_with_vault(value, name, vault_password_file=None, ask_vault_pass=False):
 | 
			
		||||
    """Encrypt the given string using Ansible Vault."""
 | 
			
		||||
    cmd = ["ansible-vault", "encrypt_string", value, f"--name={name}"]
 | 
			
		||||
    if vault_password_file:
 | 
			
		||||
        cmd += ["--vault-password-file", vault_password_file]
 | 
			
		||||
    elif ask_vault_pass:
 | 
			
		||||
        cmd += ["--ask-vault-pass"]
 | 
			
		||||
    else:
 | 
			
		||||
        raise RuntimeError("You must provide --vault-password-file or use --ask-vault-pass.")
 | 
			
		||||
    
 | 
			
		||||
    result = subprocess.run(cmd, capture_output=True, text=True)
 | 
			
		||||
    if result.returncode != 0:
 | 
			
		||||
        raise RuntimeError(f"Vault encryption failed:\n{result.stderr}")
 | 
			
		||||
    return result.stdout.strip()
 | 
			
		||||
 | 
			
		||||
def load_yaml_file(path):
 | 
			
		||||
    """Load a YAML file or return an empty dict if not found."""
 | 
			
		||||
    if path.exists():
 | 
			
		||||
        with open(path, "r") as f:
 | 
			
		||||
            return yaml.safe_load(f) or {}
 | 
			
		||||
    return {}
 | 
			
		||||
 | 
			
		||||
def save_yaml_file(path, data):
 | 
			
		||||
    """Save a dictionary to a YAML file."""
 | 
			
		||||
    with open(path, "w") as f:
 | 
			
		||||
        yaml.dump(data, f, sort_keys=False)
 | 
			
		||||
 | 
			
		||||
def parse_overrides(pairs):
 | 
			
		||||
    """Parse key=value overrides into a dictionary."""
 | 
			
		||||
    result = {}
 | 
			
		||||
    for pair in pairs:
 | 
			
		||||
        if "=" not in pair:
 | 
			
		||||
            continue
 | 
			
		||||
        k, v = pair.split("=", 1)
 | 
			
		||||
        result[k.strip()] = v.strip()
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
def load_application_id_from_vars(role_path):
 | 
			
		||||
    """Read application_id from role's vars/main.yml"""
 | 
			
		||||
    vars_file = Path(role_path) / "vars" / "main.yml"
 | 
			
		||||
    if not vars_file.exists():
 | 
			
		||||
        raise FileNotFoundError(f"{vars_file} not found.")
 | 
			
		||||
    vars_data = load_yaml_file(vars_file)
 | 
			
		||||
    app_id = vars_data.get("application_id")
 | 
			
		||||
    if not app_id:
 | 
			
		||||
        raise KeyError(f"'application_id' not found in {vars_file}")
 | 
			
		||||
    return app_id
 | 
			
		||||
 | 
			
		||||
def apply_schema_to_inventory(schema, inventory_data, app_id, overrides, vault_password_file, ask_vault_pass):
 | 
			
		||||
    """Merge schema into inventory under applications.{app_id}, encrypting all values."""
 | 
			
		||||
    inventory_data.setdefault("applications", {})
 | 
			
		||||
    applications = inventory_data["applications"]
 | 
			
		||||
 | 
			
		||||
    applications.setdefault(app_id, {})
 | 
			
		||||
 | 
			
		||||
    def process_branch(branch, target, path_prefix=""):
 | 
			
		||||
        for key, meta in branch.items():
 | 
			
		||||
            full_key_path = f"{path_prefix}.{key}" if path_prefix else key
 | 
			
		||||
            if isinstance(meta, dict) and all(k in meta for k in ["description", "algorithm", "validation"]):
 | 
			
		||||
                if key in target:
 | 
			
		||||
                    overwrite = prompt(f"Key '{full_key_path}' already exists. Overwrite?", "n").lower() == "y"
 | 
			
		||||
                    if not overwrite:
 | 
			
		||||
                        continue
 | 
			
		||||
                plain_value = overrides.get(full_key_path, generate_value(meta["algorithm"]))
 | 
			
		||||
                vaulted_value = encrypt_with_vault(plain_value, key, vault_password_file, ask_vault_pass)
 | 
			
		||||
                target[key] = yaml.load(vaulted_value, Loader=yaml.SafeLoader)
 | 
			
		||||
            elif isinstance(meta, dict):
 | 
			
		||||
                target.setdefault(key, {})
 | 
			
		||||
                process_branch(meta, target[key], full_key_path)
 | 
			
		||||
            else:
 | 
			
		||||
                target[key] = meta
 | 
			
		||||
 | 
			
		||||
    process_branch(schema, applications[app_id])
 | 
			
		||||
    return inventory_data
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    parser = argparse.ArgumentParser(description="Generate Vault-encrypted credentials from schema and write to inventory.")
 | 
			
		||||
    parser.add_argument("--role-path", help="Path to the Ansible role")
 | 
			
		||||
    parser.add_argument("--inventory-file", help="Path to the inventory file to update")
 | 
			
		||||
    parser.add_argument("--vault-password-file", help="Path to Ansible Vault password file")
 | 
			
		||||
    parser.add_argument("--ask-vault-pass", action="store_true", help="Prompt for vault password")
 | 
			
		||||
    parser.add_argument("--set", nargs="*", default=[], help="Override values as key=value")
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    # Prompt for missing values
 | 
			
		||||
    role_path = Path(args.role_path or prompt("Path to Ansible role", "./roles/docker-<app>"))
 | 
			
		||||
    inventory_file = Path(args.inventory_file or prompt("Path to inventory file", "./host_vars/localhost.yml"))
 | 
			
		||||
 | 
			
		||||
    # Determine application_id
 | 
			
		||||
    app_id = load_application_id_from_vars(role_path)
 | 
			
		||||
 | 
			
		||||
    # Vault method
 | 
			
		||||
    if not args.vault_password_file and not args.ask_vault_pass:
 | 
			
		||||
        print("[?] No Vault password method provided.")
 | 
			
		||||
        print("    1) Provide path to --vault-password-file")
 | 
			
		||||
        print("    2) Use interactive prompt (--ask-vault-pass)")
 | 
			
		||||
        choice = prompt("Select method", "1")
 | 
			
		||||
        if choice == "1":
 | 
			
		||||
            args.vault_password_file = prompt("Vault password file", "~/.vault_pass.txt").replace("~", str(Path.home()))
 | 
			
		||||
        else:
 | 
			
		||||
            args.ask_vault_pass = True
 | 
			
		||||
 | 
			
		||||
    # Load files
 | 
			
		||||
    schema_path = role_path / "meta" / "schema.yml"
 | 
			
		||||
    schema_data = load_yaml_file(schema_path)
 | 
			
		||||
    inventory_data = load_yaml_file(inventory_file)
 | 
			
		||||
    overrides = parse_overrides(args.set)
 | 
			
		||||
 | 
			
		||||
    # Apply schema and save
 | 
			
		||||
    updated = apply_schema_to_inventory(
 | 
			
		||||
        schema=schema_data,
 | 
			
		||||
        inventory_data=inventory_data,
 | 
			
		||||
        app_id=app_id,
 | 
			
		||||
        overrides=overrides,
 | 
			
		||||
        vault_password_file=args.vault_password_file,
 | 
			
		||||
        ask_vault_pass=args.ask_vault_pass
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    save_yaml_file(inventory_file, updated)
 | 
			
		||||
    print(f"\n✅ Inventory file updated at: {inventory_file}")
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    main()
 | 
			
		||||
							
								
								
									
										39
									
								
								tests/integration/test_application_id_consistency.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								tests/integration/test_application_id_consistency.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,39 @@
 | 
			
		||||
import os
 | 
			
		||||
import yaml
 | 
			
		||||
import unittest
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
ROLES_DIR = Path(__file__).resolve().parent.parent.parent / "roles"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestApplicationIdConsistency(unittest.TestCase):
 | 
			
		||||
    def test_application_id_matches_docker_prefix(self):
 | 
			
		||||
        failed_roles = []
 | 
			
		||||
 | 
			
		||||
        for role_path in ROLES_DIR.iterdir():
 | 
			
		||||
            if role_path.is_dir() and role_path.name.startswith("docker-"):
 | 
			
		||||
                expected_id = role_path.name.replace("docker-", "", 1)
 | 
			
		||||
                vars_file = role_path / "vars" / "main.yml"
 | 
			
		||||
 | 
			
		||||
                if not vars_file.exists():
 | 
			
		||||
                    failed_roles.append((role_path.name, "vars/main.yml missing"))
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                with open(vars_file, "r") as f:
 | 
			
		||||
                    try:
 | 
			
		||||
                        vars_data = yaml.safe_load(f) or {}
 | 
			
		||||
                    except yaml.YAMLError as e:
 | 
			
		||||
                        failed_roles.append((role_path.name, f"YAML error: {e}"))
 | 
			
		||||
                        continue
 | 
			
		||||
 | 
			
		||||
                actual_id = vars_data.get("application_id")
 | 
			
		||||
                if actual_id != expected_id:
 | 
			
		||||
                    failed_roles.append((role_path.name, f"application_id is '{actual_id}', expected '{expected_id}'"))
 | 
			
		||||
 | 
			
		||||
        if failed_roles:
 | 
			
		||||
            msg = "\n".join([f"{role}: {reason}" for role, reason in failed_roles])
 | 
			
		||||
            self.fail(f"The following roles have mismatching or missing application_id:\n{msg}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    unittest.main()
 | 
			
		||||
							
								
								
									
										133
									
								
								tests/unit/test_generate_vaulted_credentials.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										133
									
								
								tests/unit/test_generate_vaulted_credentials.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,133 @@
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
import tempfile
 | 
			
		||||
import unittest
 | 
			
		||||
import shutil
 | 
			
		||||
import yaml
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from unittest.mock import patch
 | 
			
		||||
 | 
			
		||||
# Ensure cli directory is importable
 | 
			
		||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../cli")))
 | 
			
		||||
 | 
			
		||||
import generate_vaulted_credentials as gvc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestGenerateVaultedCredentials(unittest.TestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        # Create temporary directory structure for a fake role and inventory
 | 
			
		||||
        self.temp_dir = tempfile.mkdtemp()
 | 
			
		||||
        self.role_path = Path(self.temp_dir) / "roles" / "docker-demoapp"
 | 
			
		||||
        self.meta_path = self.role_path / "meta"
 | 
			
		||||
        self.meta_path.mkdir(parents=True)
 | 
			
		||||
 | 
			
		||||
        # Define schema with no "applications" root (direct app-specific structure)
 | 
			
		||||
        self.schema = {
 | 
			
		||||
            "credentials": {
 | 
			
		||||
                "shared_secret": {
 | 
			
		||||
                    "description": "A shared secret",
 | 
			
		||||
                    "algorithm": "sha256",
 | 
			
		||||
                    "validation": "^[a-f0-9]{64}$"
 | 
			
		||||
                },
 | 
			
		||||
                "postgresql_secret": {
 | 
			
		||||
                    "description": "Postgres password",
 | 
			
		||||
                    "algorithm": "bcrypt",
 | 
			
		||||
                    "validation": "^\\$2[aby]\\$.{56}$"
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        with open(self.meta_path / "schema.yml", "w") as f:
 | 
			
		||||
            yaml.dump(self.schema, f)
 | 
			
		||||
 | 
			
		||||
        # Create an empty inventory file
 | 
			
		||||
        self.inventory_path = Path(self.temp_dir) / "host_vars" / "testhost.yml"
 | 
			
		||||
        self.inventory_path.parent.mkdir(parents=True)
 | 
			
		||||
        with open(self.inventory_path, "w") as f:
 | 
			
		||||
            f.write("")
 | 
			
		||||
 | 
			
		||||
        self.vault_mock = "$ANSIBLE_VAULT;1.1;AES256\nmockedvaultdata=="
 | 
			
		||||
 | 
			
		||||
    def tearDown(self):
 | 
			
		||||
        shutil.rmtree(self.temp_dir)
 | 
			
		||||
 | 
			
		||||
    def test_apply_schema_creates_vaulted_credentials(self):
 | 
			
		||||
        schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
 | 
			
		||||
        inventory_data = gvc.load_yaml_file(self.inventory_path)
 | 
			
		||||
 | 
			
		||||
        with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt:
 | 
			
		||||
            mock_encrypt.return_value = self.vault_mock
 | 
			
		||||
            updated = gvc.apply_schema_to_inventory(
 | 
			
		||||
                schema=schema_data,
 | 
			
		||||
                inventory_data=inventory_data,
 | 
			
		||||
                app_id="demoapp",
 | 
			
		||||
                overrides={},
 | 
			
		||||
                vault_password_file="dummy",
 | 
			
		||||
                ask_vault_pass=False
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Expect credentials to be written under applications.demoapp
 | 
			
		||||
        self.assertIn("applications", updated)
 | 
			
		||||
        self.assertIn("demoapp", updated["applications"])
 | 
			
		||||
        creds = updated["applications"]["demoapp"]["credentials"]
 | 
			
		||||
        self.assertIn("shared_secret", creds)
 | 
			
		||||
        self.assertIn("postgresql_secret", creds)
 | 
			
		||||
 | 
			
		||||
        for key in creds:
 | 
			
		||||
            self.assertTrue(str(creds[key]).startswith("!vault") or "$ANSIBLE_VAULT" in str(creds[key]))
 | 
			
		||||
 | 
			
		||||
    def test_existing_key_prompts_before_overwriting(self):
 | 
			
		||||
        # Pre-populate the inventory with one value
 | 
			
		||||
        pre_existing = {
 | 
			
		||||
            "applications": {
 | 
			
		||||
                "demoapp": {
 | 
			
		||||
                    "credentials": {
 | 
			
		||||
                        "shared_secret": "unchanged"
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        gvc.save_yaml_file(self.inventory_path, pre_existing)
 | 
			
		||||
 | 
			
		||||
        schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
 | 
			
		||||
        inventory_data = gvc.load_yaml_file(self.inventory_path)
 | 
			
		||||
 | 
			
		||||
        with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt, \
 | 
			
		||||
             patch("builtins.input", return_value="n"):
 | 
			
		||||
            mock_encrypt.return_value = self.vault_mock
 | 
			
		||||
            updated = gvc.apply_schema_to_inventory(
 | 
			
		||||
                schema=schema_data,
 | 
			
		||||
                inventory_data=inventory_data,
 | 
			
		||||
                app_id="demoapp",
 | 
			
		||||
                overrides={},
 | 
			
		||||
                vault_password_file="dummy",
 | 
			
		||||
                ask_vault_pass=False
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Value should remain unchanged
 | 
			
		||||
        self.assertEqual(updated["applications"]["demoapp"]["credentials"]["shared_secret"], "unchanged")
 | 
			
		||||
 | 
			
		||||
    def test_set_override_applies_correctly(self):
 | 
			
		||||
        schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
 | 
			
		||||
        inventory_data = gvc.load_yaml_file(self.inventory_path)
 | 
			
		||||
 | 
			
		||||
        override_value = "custom-override-value"
 | 
			
		||||
        override_key = "credentials.shared_secret"
 | 
			
		||||
 | 
			
		||||
        # Patch vault encryption to just return the plaintext prefixed as mock
 | 
			
		||||
        with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt:
 | 
			
		||||
            mock_encrypt.side_effect = lambda val, name, **kwargs: f"$ANSIBLE_VAULT;1.1;AES256\n{val}"
 | 
			
		||||
            updated = gvc.apply_schema_to_inventory(
 | 
			
		||||
                schema=schema_data,
 | 
			
		||||
                inventory_data=inventory_data,
 | 
			
		||||
                app_id="demoapp",
 | 
			
		||||
                overrides={override_key: override_value},
 | 
			
		||||
                vault_password_file="dummy",
 | 
			
		||||
                ask_vault_pass=False
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        actual = updated["applications"]["demoapp"]["credentials"]["shared_secret"]
 | 
			
		||||
        self.assertIn(override_value, str(actual), "The override value was not used during encryption.")
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    unittest.main()
 | 
			
		||||
		Reference in New Issue
	
	Block a user