import secrets import hashlib import bcrypt from pathlib import Path from typing import Dict from utils.handler.yaml import YamlHandler from utils.handler.vault import VaultHandler, VaultScalar import string import sys import base64 class InventoryManager: def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]): """Initialize the Inventory Manager.""" self.role_path = role_path self.inventory_path = inventory_path self.vault_pw = vault_pw self.overrides = overrides self.inventory = YamlHandler.load_yaml(inventory_path) self.schema = YamlHandler.load_yaml(role_path / "meta" / "schema.yml") self.app_id = self.load_application_id(role_path) self.vault_handler = VaultHandler(vault_pw) def load_application_id(self, role_path: Path) -> str: """Load the application ID from the role's vars/main.yml file.""" vars_file = role_path / "vars" / "main.yml" data = YamlHandler.load_yaml(vars_file) app_id = data.get("application_id") if not app_id: print(f"ERROR: 'application_id' missing in {vars_file}", file=sys.stderr) sys.exit(1) return app_id def apply_schema(self) -> Dict: """Apply the schema and return the updated inventory.""" apps = self.inventory.setdefault("applications", {}) target = apps.setdefault(self.app_id, {}) # Load the data from vars/main.yml vars_file = self.role_path / "vars" / "configuration.yml" data = YamlHandler.load_yaml(vars_file) # Check if 'central-database' is enabled in the features section of data if "features" in data: if "central_database" in data["features"] and \ data["features"]["central_database"]: # Add 'central_database' value (password) to credentials target.setdefault("credentials", {})["database_password"] = self.generate_value("alphanumeric") if "oauth2" in data["features"] and \ data["features"]["oauth2"]: target.setdefault("credentials", {})["oauth2_proxy_cookie_secret"] = self.generate_value("random_hex_16") # Apply recursion only for the `credentials` section self.recurse_credentials(self.schema, target) return self.inventory def recurse_credentials(self, branch: dict, dest: dict, prefix: str = ""): """Recursively process only the 'credentials' section and generate values.""" for key, meta in branch.items(): full_key = f"{prefix}.{key}" if prefix else key # Only process 'credentials' section for encryption if prefix == "credentials" and isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")): alg = meta["algorithm"] if alg == "plain": # Must be supplied via --set if full_key not in self.overrides: print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=", file=sys.stderr) sys.exit(1) plain = self.overrides[full_key] else: plain = self.overrides.get(full_key, self.generate_value(alg)) # Check if the value is already vaulted or if it's a dictionary existing_value = dest.get(key) # If existing_value is a dictionary, print a warning and skip encryption if isinstance(existing_value, dict): print(f"Skipping encryption for '{key}', as it is a dictionary.") continue # Check if the value is a VaultScalar and already vaulted if existing_value and isinstance(existing_value, VaultScalar): print(f"Skipping encryption for '{key}', as it is already vaulted.") continue # Encrypt only if it's not already vaulted snippet = self.vault_handler.encrypt_string(plain, key) lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) body = "\n".join(line[indent:] for line in lines[1:]) dest[key] = VaultScalar(body) elif isinstance(meta, dict): sub = dest.setdefault(key, {}) self.recurse_credentials(meta, sub, full_key) else: dest[key] = meta def generate_secure_alphanumeric(self, length: int) -> str: """Generate a cryptographically secure random alphanumeric string of the given length.""" characters = string.ascii_letters + string.digits # a-zA-Z0-9 return ''.join(secrets.choice(characters) for _ in range(length)) def generate_value(self, algorithm: str) -> str: """ Generate a random secret value according to the specified algorithm. Supported algorithms: • "random_hex" – Returns a 64-byte (512-bit) secure random string, encoded as 128 hexadecimal characters. – Use when you need maximum entropy in a hex-only format. • "sha256" – Generates 32 random bytes, hashes them with SHA-256, and returns a 64-character hex digest. – Good for when you want a fixed-length (256-bit) hash output. • "sha1" – Generates 20 random bytes, hashes them with SHA-1, and returns a 40-character hex digest. – Only use in legacy contexts; SHA-1 is considered weaker than SHA-256. • "bcrypt" – Creates a random 16-byte URL-safe password, then applies a bcrypt hash. – Suitable for storing user-style passwords where bcrypt verification is needed. • "alphanumeric" – Produces a 64-character string drawn from [A–Z, a–z, 0–9]. – Offers ≈380 bits of entropy; human-friendly charset. • "base64_prefixed_32" – Generates 32 random bytes, encodes them in Base64, and prefixes the result with "base64:". – Useful when downstream systems expect a Base64 format. • "random_hex_16" – Returns 16 random bytes (128 bits) encoded as 32 hexadecimal characters. – Handy for shorter tokens or salts. Returns: A securely generated string according to the chosen algorithm. """ if algorithm == "random_hex": return secrets.token_hex(64) if algorithm == "sha256": return hashlib.sha256(secrets.token_bytes(32)).hexdigest() if algorithm == "sha1": return hashlib.sha1(secrets.token_bytes(20)).hexdigest() if algorithm == "bcrypt": # Generate a random password and hash it with bcrypt pw = secrets.token_urlsafe(16).encode() raw_hash = bcrypt.hashpw(pw, bcrypt.gensalt()).decode() # Replace every '$' with a random lowercase alphanumeric character alnum = string.digits + string.ascii_lowercase escaped = "".join(secrets.choice(alnum) if ch == '$' else ch for ch in raw_hash) return escaped if algorithm == "alphanumeric": return self.generate_secure_alphanumeric(64) if algorithm == "base64_prefixed_32": return "base64:" + base64.b64encode(secrets.token_bytes(32)).decode() if algorithm == "random_hex_16": # 16 Bytes → 32 Hex-Characters return secrets.token_hex(16) return "undefined"