From 5ad8a7e85706e8a2c881da91f6e1250ef9a1bf10 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Tue, 20 May 2025 13:39:43 +0200 Subject: [PATCH] Optimized credential logics --- cli/create_credentials.py | 27 ++++++----- cli/encrypt_inventory.py | 66 +++++++++++++++++++++++++++ cli/utils/manager/inventory.py | 28 ++++++++---- roles/docker-peertube/meta/schema.yml | 5 -- 4 files changed, 101 insertions(+), 25 deletions(-) create mode 100644 cli/encrypt_inventory.py diff --git a/cli/create_credentials.py b/cli/create_credentials.py index 7c895898..869d36d3 100644 --- a/cli/create_credentials.py +++ b/cli/create_credentials.py @@ -9,6 +9,7 @@ from utils.handler.vault import VaultHandler, VaultScalar from utils.handler.yaml import YamlHandler from yaml.dumper import SafeDumper + def ask_for_confirmation(key: str) -> bool: """Prompt the user for confirmation to overwrite an existing value.""" confirmation = input(f"Are you sure you want to overwrite the value for '{key}'? (y/n): ").strip().lower() @@ -40,8 +41,19 @@ def main(): # 1) Apply schema and update inventory updated_inventory = manager.apply_schema() - # 2) Vault any leaves under 'credentials:' mappings - manager.vault_handler.encrypt_leaves(updated_inventory, args.vault_password_file) + # 2) Apply vault encryption ONLY to 'credentials' fields (we no longer apply it globally) + credentials = updated_inventory.get("applications", {}).get(manager.app_id, {}).get("credentials", {}) + for key, value in credentials.items(): + if not value.lstrip().startswith("$ANSIBLE_VAULT"): # Only apply encryption if the value is not already vaulted + if key in credentials and not args.force: + if not ask_for_confirmation(key): # Ask for confirmation before overwriting + print(f"Skipping overwrite of '{key}'.") + continue + encrypted_value = manager.vault_handler.encrypt_string(value, key) + lines = encrypted_value.splitlines() + indent = len(lines[1]) - len(lines[1].lstrip()) + body = "\n".join(line[indent:] for line in lines[1:]) + credentials[key] = VaultScalar(body) # Store encrypted value as VaultScalar # 3) Vault top-level ansible_become_password if present if "ansible_become_password" in updated_inventory: @@ -53,19 +65,12 @@ def main(): body = "\n".join(line[indent:] for line in lines[1:]) updated_inventory["ansible_become_password"] = VaultScalar(body) - # 4) Ask for confirmation before overwriting existing values - if not args.force: - for key in updated_inventory.get("applications", {}).get(manager.app_id, {}).get("credentials", {}).keys(): - if key in updated_inventory["applications"][manager.app_id]["credentials"]: - if not ask_for_confirmation(key): - print(f"Skipping overwrite of '{key}'.") - continue - - # 5) Save the updated inventory to file + # 4) Save the updated inventory to file with open(args.inventory_file, "w", encoding="utf-8") as f: yaml.dump(updated_inventory, f, sort_keys=False, Dumper=SafeDumper) print(f"✅ Inventory selectively vaulted → {args.inventory_file}") + if __name__ == "__main__": main() diff --git a/cli/encrypt_inventory.py b/cli/encrypt_inventory.py new file mode 100644 index 00000000..cb4a9429 --- /dev/null +++ b/cli/encrypt_inventory.py @@ -0,0 +1,66 @@ +import argparse +import subprocess +import sys +from pathlib import Path +import yaml +from typing import Dict, Any +from utils.handler.vault import VaultHandler, VaultScalar +from utils.handler.yaml import YamlHandler +from yaml.dumper import SafeDumper + +def ask_for_confirmation(key: str) -> bool: + """Prompt the user for confirmation to overwrite an existing value.""" + confirmation = input(f"Do you want to encrypt the value for '{key}'? (y/n): ").strip().lower() + return confirmation == 'y' + + +def encrypt_recursively(data: Any, vault_handler: VaultHandler, ask_confirmation: bool = True, prefix: str = "") -> Any: + """Recursively encrypt values in the data.""" + if isinstance(data, dict): + for key, value in data.items(): + new_prefix = f"{prefix}.{key}" if prefix else key + data[key] = encrypt_recursively(value, vault_handler, ask_confirmation, new_prefix) + elif isinstance(data, list): + for i, item in enumerate(data): + data[i] = encrypt_recursively(item, vault_handler, ask_confirmation, prefix) + elif isinstance(data, str): + # Only encrypt if it's not already vaulted + if not data.lstrip().startswith("$ANSIBLE_VAULT"): + if ask_confirmation: + # Ask for confirmation before encrypting if not `--all` + if not ask_for_confirmation(prefix): + print(f"Skipping encryption for '{prefix}'.") + return data + encrypted_value = vault_handler.encrypt_string(data, prefix) + lines = encrypted_value.splitlines() + indent = len(lines[1]) - len(lines[1].lstrip()) + body = "\n".join(line[indent:] for line in lines[1:]) + return VaultScalar(body) # Store encrypted value as VaultScalar + return data + + +def main(): + parser = argparse.ArgumentParser( + description="Encrypt all fields, ask for confirmation unless --all is specified." + ) + parser.add_argument("--inventory-file", required=True, help="Host vars file to update") + parser.add_argument("--vault-password-file", required=True, help="Vault password file") + parser.add_argument("--all", action="store_true", help="Encrypt all fields without confirmation") + args = parser.parse_args() + + # Initialize the VaultHandler and load the inventory + vault_handler = VaultHandler(vault_password_file=args.vault_password_file) + updated_inventory = YamlHandler.load_yaml(Path(args.inventory_file)) + + # 1) Encrypt all fields recursively + updated_inventory = encrypt_recursively(updated_inventory, vault_handler, ask_confirmation=not args.all) + + # 2) Save the updated inventory to file + with open(args.inventory_file, "w", encoding="utf-8") as f: + yaml.dump(updated_inventory, f, sort_keys=False, Dumper=SafeDumper) + + print(f"✅ Inventory selectively vaulted → {args.inventory_file}") + + +if __name__ == "__main__": + main() diff --git a/cli/utils/manager/inventory.py b/cli/utils/manager/inventory.py index f424f605..0bd2f54b 100644 --- a/cli/utils/manager/inventory.py +++ b/cli/utils/manager/inventory.py @@ -47,15 +47,17 @@ class InventoryManager: "value": self.generate_value("alphanumeric") # Generate the password value } - self.recurse(self.schema, target) + # Apply recursion only for the `credentials` section + self.recurse_credentials(self.schema, target) return self.inventory - def recurse(self, branch: dict, dest: dict, prefix: str = ""): - """Recursively process the schema and generate values.""" + def recurse_credentials(self, branch: dict, dest: dict, prefix: str = ""): + """Recursively process only the 'credentials' section and generate values.""" for key, meta in branch.items(): full_key = f"{prefix}.{key}" if prefix else key - if isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")): + # Only process 'credentials' section for encryption + if prefix == "credentials" and isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")): alg = meta["algorithm"] if alg == "plain": # Must be supplied via --set @@ -66,6 +68,14 @@ class InventoryManager: else: plain = self.overrides.get(full_key, self.generate_value(alg)) + # Check if the value already starts with '$ANSIBLE_VAULT', indicating it's already vaulted + existing_value = dest.get(key) + if existing_value and existing_value.lstrip().startswith("$ANSIBLE_VAULT"): + # Skip vaulting if the value is already vaulted + print(f"Skipping encryption for '{key}', as it is already vaulted.") + continue + + # Encrypt only if it's not already vaulted snippet = self.vault_handler.encrypt_string(plain, key) lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) @@ -74,11 +84,11 @@ class InventoryManager: elif isinstance(meta, dict): sub = dest.setdefault(key, {}) - self.recurse(meta, sub, full_key) + self.recurse_credentials(meta, sub, full_key) else: dest[key] = meta - - def generate_secure_alphanumeric(length: int) -> str: + + def generate_secure_alphanumeric(self, length: int) -> str: """Generate a cryptographically secure random alphanumeric string of the given length.""" characters = string.ascii_letters + string.digits # a-zA-Z0-9 return ''.join(secrets.choice(characters) for _ in range(length)) @@ -95,5 +105,5 @@ class InventoryManager: pw = secrets.token_urlsafe(16).encode() return bcrypt.hashpw(pw, bcrypt.gensalt()).decode() if algorithm == "alphanumeric": - return generate_secure_alphanumeric(64) - return "undefined" \ No newline at end of file + return self.generate_secure_alphanumeric(64) + return "undefined" diff --git a/roles/docker-peertube/meta/schema.yml b/roles/docker-peertube/meta/schema.yml index c13f5451..464fe197 100644 --- a/roles/docker-peertube/meta/schema.yml +++ b/roles/docker-peertube/meta/schema.yml @@ -1,9 +1,4 @@ credentials: - database_password: - description: "Password for the PeerTube PostgreSQL database" - algorithm: "bcrypt" - validation: "^\\$2[aby]\\$.{56}$" - secret: description: "PeerTube secret used for session signing and CSRF protection" algorithm: "sha256"