From 9eca1958ec90320f71078ddd5f6b3a368a6f0e24 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Thu, 15 May 2025 16:27:02 +0200 Subject: [PATCH] Updated Vault Creation script --- cli/generate_vaulted_credentials.py | 274 ++++++++++++++++------------ 1 file changed, 160 insertions(+), 114 deletions(-) diff --git a/cli/generate_vaulted_credentials.py b/cli/generate_vaulted_credentials.py index 1da633f8..90d855b1 100644 --- a/cli/generate_vaulted_credentials.py +++ b/cli/generate_vaulted_credentials.py @@ -1,4 +1,4 @@ -import yaml +#!/usr/bin/env python3 import argparse import secrets import hashlib @@ -6,148 +6,194 @@ import bcrypt import subprocess from pathlib import Path -def prompt(text, default=None): - """Prompt the user for input, with optional default value.""" - prompt_text = f"[?] {text}" + (f" [{default}]" if default else "") + ": " - response = input(prompt_text) - return response.strip() or default +import yaml +from yaml.loader import SafeLoader + +# ───────────────────────────────────────────────────────────────────────────── +# Let PyYAML treat !vault blocks as ordinary multiline strings +def _vault_constructor(loader, node): + return node.value +SafeLoader.add_constructor('!vault', _vault_constructor) + +# VaultScalar so PyYAML emits a !vault literal block on output +class VaultScalar(str): + pass + +def _vault_representer(dumper, data): + return dumper.represent_scalar('!vault', data, style='|') + +yaml.SafeDumper.add_representer(VaultScalar, _vault_representer) +# ───────────────────────────────────────────────────────────────────────────── def generate_value(algorithm): - """Generate a value based on the provided algorithm.""" if algorithm == "random_hex": return secrets.token_hex(64) - elif algorithm == "sha256": + if algorithm == "sha256": return hashlib.sha256(secrets.token_bytes(32)).hexdigest() - elif algorithm == "sha1": + if algorithm == "sha1": return hashlib.sha1(secrets.token_bytes(20)).hexdigest() - elif algorithm == "bcrypt": - password = secrets.token_urlsafe(16).encode() - return bcrypt.hashpw(password, bcrypt.gensalt()).decode() - elif algorithm == "plain": + if algorithm == "bcrypt": + pw = secrets.token_urlsafe(16).encode() + return bcrypt.hashpw(pw, bcrypt.gensalt()).decode() + if algorithm == "plain": return secrets.token_urlsafe(32) - else: - return "undefined" + return "undefined" -def encrypt_with_vault(value, name, vault_password_file=None, ask_vault_pass=False): - """Encrypt the given string using Ansible Vault.""" - cmd = ["ansible-vault", "encrypt_string", value, f"--name={name}"] - if vault_password_file: - cmd += ["--vault-password-file", vault_password_file] - elif ask_vault_pass: - cmd += ["--ask-vault-pass"] - else: - raise RuntimeError("You must provide --vault-password-file or use --ask-vault-pass.") - - result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode != 0: - raise RuntimeError(f"Vault encryption failed:\n{result.stderr}") - return result.stdout.strip() +def decrypt_inventory(path: Path, vault_password_file: str): + """ + Try `ansible-vault view`; on "not vaulted" fallback to yaml.safe_load + (with !vault constructor). + """ + proc = subprocess.run( + ["ansible-vault", "view", str(path), "--vault-password-file", vault_password_file], + capture_output=True, text=True + ) + if proc.returncode == 0: + return yaml.safe_load(proc.stdout) or {} + # fallback if not vaulted + if "not a vault encrypted file" in proc.stderr.lower(): + return yaml.safe_load(path.read_text()) or {} + raise RuntimeError(f"ansible-vault view failed:\n{proc.stderr}") -def load_yaml_file(path): - """Load a YAML file or return an empty dict if not found.""" - if path.exists(): - with open(path, "r") as f: - return yaml.safe_load(f) or {} - return {} +def encrypt_with_vault(value: str, name: str, vault_password_file: str): + cmd = [ + "ansible-vault", "encrypt_string", + value, f"--name={name}", + "--vault-password-file", vault_password_file + ] + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}") + return proc.stdout -def save_yaml_file(path, data): - """Save a dictionary to a YAML file.""" - with open(path, "w") as f: - yaml.dump(data, f, sort_keys=False) +def load_yaml(path: Path): + if not path.exists(): + return {} + return yaml.safe_load(path.read_text()) or {} + +def save_yaml(path: Path, data): + path.write_text(yaml.dump(data, sort_keys=False, Dumper=yaml.SafeDumper)) def parse_overrides(pairs): - """Parse key=value overrides into a dictionary.""" - result = {} - for pair in pairs: - if "=" not in pair: - continue - k, v = pair.split("=", 1) - result[k.strip()] = v.strip() - return result + out = {} + for p in pairs: + if "=" in p: + k, v = p.split("=", 1) + out[k.strip()] = v.strip() + return out -def load_application_id_from_vars(role_path): - """Read application_id from role's vars/main.yml""" - vars_file = Path(role_path) / "vars" / "main.yml" +def load_application_id(role_path: Path): + vars_file = role_path / "vars" / "main.yml" if not vars_file.exists(): - raise FileNotFoundError(f"{vars_file} not found.") - vars_data = load_yaml_file(vars_file) - app_id = vars_data.get("application_id") + raise FileNotFoundError(f"{vars_file} not found") + data = load_yaml(vars_file) + app_id = data.get("application_id") if not app_id: - raise KeyError(f"'application_id' not found in {vars_file}") + raise KeyError(f"'application_id' missing in {vars_file}") return app_id -def apply_schema_to_inventory(schema, inventory_data, app_id, overrides, vault_password_file, ask_vault_pass): - """Merge schema into inventory under applications.{app_id}, encrypting all values.""" - inventory_data.setdefault("applications", {}) - applications = inventory_data["applications"] +def apply_schema(schema: dict, + inventory: dict, + app_id: str, + overrides: dict, + vault_pw: str): + apps = inventory.setdefault("applications", {}) + target = apps.setdefault(app_id, {}) - applications.setdefault(app_id, {}) - - def process_branch(branch, target, path_prefix=""): + def recurse(branch, dest, prefix=""): for key, meta in branch.items(): - full_key_path = f"{path_prefix}.{key}" if path_prefix else key - if isinstance(meta, dict) and all(k in meta for k in ["description", "algorithm", "validation"]): - if key in target: - overwrite = prompt(f"Key '{full_key_path}' already exists. Overwrite?", "n").lower() == "y" - if not overwrite: - continue - plain_value = overrides.get(full_key_path, generate_value(meta["algorithm"])) - vaulted_value = encrypt_with_vault(plain_value, key, vault_password_file, ask_vault_pass) - target[key] = yaml.load(vaulted_value, Loader=yaml.SafeLoader) - elif isinstance(meta, dict): - target.setdefault(key, {}) - process_branch(meta, target[key], full_key_path) - else: - target[key] = meta + full = f"{prefix}.{key}" if prefix else key - process_branch(schema, applications[app_id]) - return inventory_data + # leaf spec + if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")): + plain = overrides.get(full, generate_value(meta["algorithm"])) + snippet = encrypt_with_vault(plain, key, vault_pw) + lines = snippet.splitlines() + indent = len(lines[1]) - len(lines[1].lstrip()) + body = "\n".join(line[indent:] for line in lines[1:]) + dest[key] = VaultScalar(body) + + # nested + elif isinstance(meta, dict): + sub = dest.setdefault(key, {}) + recurse(meta, sub, full) + + # passthrough + else: + dest[key] = meta + + recurse(schema, target) + return inventory + +def encrypt_leaves(branch: dict, vault_pw: str): + for k, v in list(branch.items()): + if isinstance(v, dict): + encrypt_leaves(v, vault_pw) + else: + plain = str(v) + if plain.lstrip().startswith("$ANSIBLE_VAULT"): + continue + snippet = encrypt_with_vault(plain, k, vault_pw) + lines = snippet.splitlines() + indent = len(lines[1]) - len(lines[1].lstrip()) + body = "\n".join(line[indent:] for line in lines[1:]) + branch[k] = VaultScalar(body) + +def encrypt_credentials_branch(node, vault_pw: str): + if isinstance(node, dict): + for k, v in node.items(): + if k == "credentials" and isinstance(v, dict): + encrypt_leaves(v, vault_pw) + else: + encrypt_credentials_branch(v, vault_pw) + elif isinstance(node, list): + for item in node: + encrypt_credentials_branch(item, vault_pw) def main(): - parser = argparse.ArgumentParser(description="Generate Vault-encrypted credentials from schema and write to inventory.") - parser.add_argument("--role-path", help="Path to the Ansible role") - parser.add_argument("--inventory-file", help="Path to the inventory file to update") - parser.add_argument("--vault-password-file", help="Path to Ansible Vault password file") - parser.add_argument("--ask-vault-pass", action="store_true", help="Prompt for vault password") - parser.add_argument("--set", nargs="*", default=[], help="Override values as key=value") + parser = argparse.ArgumentParser( + description="Selectively vault credentials + become-password in an inventory file" + ) + parser.add_argument("--role-path", required=True, + help="Path to your Ansible role") + parser.add_argument("--inventory-file", required=True, + help="Vaulted host_vars file to update") + parser.add_argument("--vault-password-file", required=True, + help="Ansible Vault password file") + parser.add_argument("--set", nargs="*", default=[], + help="Override values as key.subkey=VALUE") args = parser.parse_args() - # Prompt for missing values - role_path = Path(args.role_path or prompt("Path to Ansible role", "./roles/docker-")) - inventory_file = Path(args.inventory_file or prompt("Path to inventory file", "./host_vars/localhost.yml")) + role_path = Path(args.role_path) + inv_file = Path(args.inventory_file) + vault_pw = args.vault_password_file + overrides = parse_overrides(args.set) - # Determine application_id - app_id = load_application_id_from_vars(role_path) + app_id = load_application_id(role_path) - # Vault method - if not args.vault_password_file and not args.ask_vault_pass: - print("[?] No Vault password method provided.") - print(" 1) Provide path to --vault-password-file") - print(" 2) Use interactive prompt (--ask-vault-pass)") - choice = prompt("Select method", "1") - if choice == "1": - args.vault_password_file = prompt("Vault password file", "~/.vault_pass.txt").replace("~", str(Path.home())) - else: - args.ask_vault_pass = True + # 1) decrypt-or-plain-load + inventory = decrypt_inventory(inv_file, vault_pw) - # Load files - schema_path = role_path / "meta" / "schema.yml" - schema_data = load_yaml_file(schema_path) - inventory_data = load_yaml_file(inventory_file) - overrides = parse_overrides(args.set) + # 2) merge schema-driven credentials + schema = load_yaml(role_path / "meta" / "schema.yml") + inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw) - # Apply schema and save - updated = apply_schema_to_inventory( - schema=schema_data, - inventory_data=inventory_data, - app_id=app_id, - overrides=overrides, - vault_password_file=args.vault_password_file, - ask_vault_pass=args.ask_vault_pass - ) + # 3) vault leaves under credentials: + encrypt_credentials_branch(inventory, vault_pw) - save_yaml_file(inventory_file, updated) - print(f"\n✅ Inventory file updated at: {inventory_file}") + # 4) vault top-level become password + if "ansible_become_password" in inventory: + val = str(inventory["ansible_become_password"]) + if not val.lstrip().startswith("$ANSIBLE_VAULT"): + snippet = encrypt_with_vault(val, "ansible_become_password", vault_pw) + lines = snippet.splitlines() + indent = len(lines[1]) - len(lines[1].lstrip()) + body = "\n".join(line[indent:] for line in lines[1:]) + inventory["ansible_become_password"] = VaultScalar(body) + + # 5) write back YAML + save_yaml(inv_file, inventory) + print(f"✅ Inventory selectively vaulted → {inv_file}") if __name__ == "__main__": main()