#!/usr/bin/env python3 import argparse import secrets import hashlib import bcrypt import subprocess import sys from pathlib import Path import yaml from yaml.loader import SafeLoader from yaml.dumper import SafeDumper # ───────────────────────────────────────────────────────────────────────────── # On load: treat any !vault tag as plain text def _vault_constructor(loader, node): return node.value SafeLoader.add_constructor('!vault', _vault_constructor) # A str subclass so PyYAML emits !vault literal blocks on dump class VaultScalar(str): pass def _vault_representer(dumper, data): return dumper.represent_scalar('!vault', data, style='|') SafeDumper.add_representer(VaultScalar, _vault_representer) # ───────────────────────────────────────────────────────────────────────────── def generate_value(algorithm: str) -> str: if algorithm == "random_hex": return secrets.token_hex(64) if algorithm == "sha256": return hashlib.sha256(secrets.token_bytes(32)).hexdigest() if algorithm == "sha1": return hashlib.sha1(secrets.token_bytes(20)).hexdigest() if algorithm == "bcrypt": pw = secrets.token_urlsafe(16).encode() return bcrypt.hashpw(pw, bcrypt.gensalt()).decode() # we should never auto-generate for "plain" return "undefined" def wrap_existing_vaults(node): """ Recursively wrap any str that begins with '$ANSIBLE_VAULT' in a VaultScalar so it dumps as a literal block. """ if isinstance(node, dict): return {k: wrap_existing_vaults(v) for k, v in node.items()} if isinstance(node, list): return [wrap_existing_vaults(v) for v in node] if isinstance(node, str) and node.lstrip().startswith("$ANSIBLE_VAULT"): return VaultScalar(node) return node def load_yaml_plain(path: Path) -> dict: """ Load any YAML (vaulted or not) via SafeLoader + our !vault constructor, then wrap existing vault‐blocks for correct literal dumping. """ text = path.read_text() data = yaml.load(text, Loader=SafeLoader) or {} return wrap_existing_vaults(data) def encrypt_with_vault(value: str, name: str, vault_password_file: str) -> str: cmd = [ "ansible-vault", "encrypt_string", value, f"--name={name}", "--vault-password-file", vault_password_file ] proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}") return proc.stdout def parse_overrides(pairs: list[str]) -> dict: out = {} for p in pairs: if "=" in p: k, v = p.split("=", 1) out[k.strip()] = v.strip() return out def load_application_id(role_path: Path) -> str: vars_file = role_path / "vars" / "main.yml" data = load_yaml_plain(vars_file) app_id = data.get("application_id") if not app_id: print(f"ERROR: 'application_id' missing in {vars_file}", file=sys.stderr) sys.exit(1) return app_id def apply_schema(schema: dict, inventory: dict, app_id: str, overrides: dict, vault_pw: str) -> dict: apps = inventory.setdefault("applications", {}) target = apps.setdefault(app_id, {}) def recurse(branch: dict, dest: dict, prefix: str = ""): for key, meta in branch.items(): full_key = f"{prefix}.{key}" if prefix else key # leaf node spec if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")): alg = meta["algorithm"] if alg == "plain": # must be supplied via --set if full_key not in overrides: print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=", file=sys.stderr) sys.exit(1) plain = overrides[full_key] else: # generate or override plain = overrides.get(full_key, generate_value(alg)) snippet = encrypt_with_vault(plain, key, vault_pw) lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) body = "\n".join(line[indent:] for line in lines[1:]) dest[key] = VaultScalar(body) # nested mapping elif isinstance(meta, dict): sub = dest.setdefault(key, {}) recurse(meta, sub, full_key) # literal passthrough else: dest[key] = meta recurse(schema, target) return inventory def encrypt_leaves(branch: dict, vault_pw: str): for k, v in list(branch.items()): if isinstance(v, dict): encrypt_leaves(v, vault_pw) else: plain = str(v) # skip if already vaulted if plain.lstrip().startswith("$ANSIBLE_VAULT"): continue snippet = encrypt_with_vault(plain, k, vault_pw) lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) body = "\n".join(line[indent:] for line in lines[1:]) branch[k] = VaultScalar(body) def encrypt_credentials_branch(node, vault_pw: str): if isinstance(node, dict): for key, val in node.items(): if key == "credentials" and isinstance(val, dict): encrypt_leaves(val, vault_pw) else: encrypt_credentials_branch(val, vault_pw) elif isinstance(node, list): for item in node: encrypt_credentials_branch(item, vault_pw) def main(): parser = argparse.ArgumentParser( description="Selectively vault credentials + become-password in your inventory." ) parser.add_argument("--role-path", required=True, help="Path to your role") parser.add_argument("--inventory-file", required=True, help="host_vars file to update") parser.add_argument("--vault-password-file",required=True, help="Vault password file") parser.add_argument("--set", nargs="*", default=[], help="Override values key.subkey=VALUE") args = parser.parse_args() role_path = Path(args.role_path) inv_file = Path(args.inventory_file) vault_pw = args.vault_password_file overrides = parse_overrides(args.set) # 1) Load & wrap any existing vault blocks inventory = load_yaml_plain(inv_file) # 2) Merge schema-driven credentials (plain ones must be overridden) schema = load_yaml_plain(role_path / "meta" / "schema.yml") app_id = load_application_id(role_path) inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw) # 3) Vault any leaves under 'credentials:' mappings encrypt_credentials_branch(inventory, vault_pw) # 4) Vault top-level ansible_become_password if present if "ansible_become_password" in inventory: val = str(inventory["ansible_become_password"]) if not val.lstrip().startswith("$ANSIBLE_VAULT"): snippet = encrypt_with_vault(val, "ansible_become_password", vault_pw) lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) body = "\n".join(line[indent:] for line in lines[1:]) inventory["ansible_become_password"] = VaultScalar(body) # 5) Overwrite file with proper !vault literal blocks only where needed with open(inv_file, "w", encoding="utf-8") as f: yaml.dump(inventory, f, sort_keys=False, Dumper=SafeDumper) print(f"✅ Inventory selectively vaulted → {inv_file}") if __name__ == "__main__": main()