From 5470be50a993c26ffb6b8b93be71b9ab041714fe Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Tue, 20 May 2025 13:59:09 +0200 Subject: [PATCH] Optimized database encryption --- cli/utils/manager/inventory.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/cli/utils/manager/inventory.py b/cli/utils/manager/inventory.py index 0bd2f54b..c286fed3 100644 --- a/cli/utils/manager/inventory.py +++ b/cli/utils/manager/inventory.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Dict from utils.handler.yaml import YamlHandler from utils.handler.vault import VaultHandler, VaultScalar +import string class InventoryManager: def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]): @@ -35,17 +36,15 @@ class InventoryManager: target = apps.setdefault(self.app_id, {}) # Load the data from vars/main.yml - vars_file = self.role_path / "vars" / "main.yml" + vars_file = self.role_path / "vars" / "configuration.yml" data = YamlHandler.load_yaml(vars_file) # Check if 'central-database' is enabled in the features section of data if "features" in data and \ - "central-database" in data["features"] and \ + "central_database" in data["features"] and \ data["features"]["central_database"]: - # Add 'database_password' to credentials if 'central-database' is True - target.setdefault("credentials", {})["database_password"] = { - "value": self.generate_value("alphanumeric") # Generate the password value - } + # Add 'central_database' value (password) to credentials + target.setdefault("credentials", {})["central_database"] = self.generate_value("alphanumeric") # Apply recursion only for the `credentials` section self.recurse_credentials(self.schema, target) @@ -55,7 +54,7 @@ class InventoryManager: """Recursively process only the 'credentials' section and generate values.""" for key, meta in branch.items(): full_key = f"{prefix}.{key}" if prefix else key - + # Only process 'credentials' section for encryption if prefix == "credentials" and isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")): alg = meta["algorithm"] @@ -67,27 +66,34 @@ class InventoryManager: plain = self.overrides[full_key] else: plain = self.overrides.get(full_key, self.generate_value(alg)) - - # Check if the value already starts with '$ANSIBLE_VAULT', indicating it's already vaulted + + # Check if the value is already vaulted or if it's a dictionary existing_value = dest.get(key) - if existing_value and existing_value.lstrip().startswith("$ANSIBLE_VAULT"): - # Skip vaulting if the value is already vaulted + + # If existing_value is a dictionary, print a warning and skip encryption + if isinstance(existing_value, dict): + print(f"Skipping encryption for '{key}', as it is a dictionary.") + continue + + # Check if the value is a VaultScalar and already vaulted + if existing_value and isinstance(existing_value, VaultScalar): print(f"Skipping encryption for '{key}', as it is already vaulted.") continue - + # Encrypt only if it's not already vaulted snippet = self.vault_handler.encrypt_string(plain, key) lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) body = "\n".join(line[indent:] for line in lines[1:]) dest[key] = VaultScalar(body) - + elif isinstance(meta, dict): sub = dest.setdefault(key, {}) self.recurse_credentials(meta, sub, full_key) else: dest[key] = meta + def generate_secure_alphanumeric(self, length: int) -> str: """Generate a cryptographically secure random alphanumeric string of the given length.""" characters = string.ascii_letters + string.digits # a-zA-Z0-9