From db095b77dc22b314bad4b7c3f09ced99a8f8b7e1 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Thu, 15 May 2025 16:35:57 +0200 Subject: [PATCH] Solved linebreak bug --- cli/generate_vaulted_credentials.py | 112 +++++++++++++--------------- 1 file changed, 53 insertions(+), 59 deletions(-) diff --git a/cli/generate_vaulted_credentials.py b/cli/generate_vaulted_credentials.py index 90d855b1..6f8fbabf 100644 --- a/cli/generate_vaulted_credentials.py +++ b/cli/generate_vaulted_credentials.py @@ -8,24 +8,25 @@ from pathlib import Path import yaml from yaml.loader import SafeLoader +from yaml.dumper import SafeDumper # ───────────────────────────────────────────────────────────────────────────── -# Let PyYAML treat !vault blocks as ordinary multiline strings +# On load: treat any !vault tag as plain text def _vault_constructor(loader, node): return node.value SafeLoader.add_constructor('!vault', _vault_constructor) -# VaultScalar so PyYAML emits a !vault literal block on output +# A str subclass so PyYAML emits !vault literal blocks on dump class VaultScalar(str): pass def _vault_representer(dumper, data): return dumper.represent_scalar('!vault', data, style='|') -yaml.SafeDumper.add_representer(VaultScalar, _vault_representer) +SafeDumper.add_representer(VaultScalar, _vault_representer) # ───────────────────────────────────────────────────────────────────────────── -def generate_value(algorithm): +def generate_value(algorithm: str) -> str: if algorithm == "random_hex": return secrets.token_hex(64) if algorithm == "sha256": @@ -39,23 +40,29 @@ def generate_value(algorithm): return secrets.token_urlsafe(32) return "undefined" -def decrypt_inventory(path: Path, vault_password_file: str): +def wrap_existing_vaults(node): """ - Try `ansible-vault view`; on "not vaulted" fallback to yaml.safe_load - (with !vault constructor). + Recursively walk the data and wrap any str that begins with + '$ANSIBLE_VAULT' in a VaultScalar so it dumps as a literal. """ - proc = subprocess.run( - ["ansible-vault", "view", str(path), "--vault-password-file", vault_password_file], - capture_output=True, text=True - ) - if proc.returncode == 0: - return yaml.safe_load(proc.stdout) or {} - # fallback if not vaulted - if "not a vault encrypted file" in proc.stderr.lower(): - return yaml.safe_load(path.read_text()) or {} - raise RuntimeError(f"ansible-vault view failed:\n{proc.stderr}") + if isinstance(node, dict): + return {k: wrap_existing_vaults(v) for k, v in node.items()} + if isinstance(node, list): + return [wrap_existing_vaults(v) for v in node] + if isinstance(node, str) and node.lstrip().startswith("$ANSIBLE_VAULT"): + return VaultScalar(node) + return node -def encrypt_with_vault(value: str, name: str, vault_password_file: str): +def load_yaml_plain(path: Path) -> dict: + """ + Load any YAML (vaulted or not) via SafeLoader + our !vault constructor, + then wrap existing vault‐blocks for correct literal dumping. + """ + text = path.read_text() + data = yaml.load(text, Loader=SafeLoader) or {} + return wrap_existing_vaults(data) + +def encrypt_with_vault(value: str, name: str, vault_password_file: str) -> str: cmd = [ "ansible-vault", "encrypt_string", value, f"--name={name}", @@ -66,15 +73,7 @@ def encrypt_with_vault(value: str, name: str, vault_password_file: str): raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}") return proc.stdout -def load_yaml(path: Path): - if not path.exists(): - return {} - return yaml.safe_load(path.read_text()) or {} - -def save_yaml(path: Path, data): - path.write_text(yaml.dump(data, sort_keys=False, Dumper=yaml.SafeDumper)) - -def parse_overrides(pairs): +def parse_overrides(pairs: list[str]) -> dict: out = {} for p in pairs: if "=" in p: @@ -82,11 +81,9 @@ def parse_overrides(pairs): out[k.strip()] = v.strip() return out -def load_application_id(role_path: Path): +def load_application_id(role_path: Path) -> str: vars_file = role_path / "vars" / "main.yml" - if not vars_file.exists(): - raise FileNotFoundError(f"{vars_file} not found") - data = load_yaml(vars_file) + data = load_yaml_plain(vars_file) app_id = data.get("application_id") if not app_id: raise KeyError(f"'application_id' missing in {vars_file}") @@ -96,15 +93,15 @@ def apply_schema(schema: dict, inventory: dict, app_id: str, overrides: dict, - vault_pw: str): + vault_pw: str) -> dict: apps = inventory.setdefault("applications", {}) target = apps.setdefault(app_id, {}) - def recurse(branch, dest, prefix=""): + def recurse(branch: dict, dest: dict, prefix: str = ""): for key, meta in branch.items(): full = f"{prefix}.{key}" if prefix else key - # leaf spec + # leaf node if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")): plain = overrides.get(full, generate_value(meta["algorithm"])) snippet = encrypt_with_vault(plain, key, vault_pw) @@ -113,12 +110,12 @@ def apply_schema(schema: dict, body = "\n".join(line[indent:] for line in lines[1:]) dest[key] = VaultScalar(body) - # nested + # nested mapping elif isinstance(meta, dict): sub = dest.setdefault(key, {}) recurse(meta, sub, full) - # passthrough + # literal passthrough else: dest[key] = meta @@ -141,27 +138,23 @@ def encrypt_leaves(branch: dict, vault_pw: str): def encrypt_credentials_branch(node, vault_pw: str): if isinstance(node, dict): - for k, v in node.items(): - if k == "credentials" and isinstance(v, dict): - encrypt_leaves(v, vault_pw) + for key, val in node.items(): + if key == "credentials" and isinstance(val, dict): + encrypt_leaves(val, vault_pw) else: - encrypt_credentials_branch(v, vault_pw) + encrypt_credentials_branch(val, vault_pw) elif isinstance(node, list): for item in node: encrypt_credentials_branch(item, vault_pw) def main(): parser = argparse.ArgumentParser( - description="Selectively vault credentials + become-password in an inventory file" + description="Selectively vault credentials + become-password in your inventory." ) - parser.add_argument("--role-path", required=True, - help="Path to your Ansible role") - parser.add_argument("--inventory-file", required=True, - help="Vaulted host_vars file to update") - parser.add_argument("--vault-password-file", required=True, - help="Ansible Vault password file") - parser.add_argument("--set", nargs="*", default=[], - help="Override values as key.subkey=VALUE") + parser.add_argument("--role-path", required=True, help="Path to your role") + parser.add_argument("--inventory-file", required=True, help="host_vars file to update") + parser.add_argument("--vault-password-file", required=True, help="Vault password file") + parser.add_argument("--set", nargs="*", default=[], help="Override values key.subkey=VALUE") args = parser.parse_args() role_path = Path(args.role_path) @@ -169,19 +162,18 @@ def main(): vault_pw = args.vault_password_file overrides = parse_overrides(args.set) + # 1) Load & wrap existing vault blocks + inventory = load_yaml_plain(inv_file) + + # 2) Merge in any schema-driven credentials + schema = load_yaml_plain(role_path / "meta" / "schema.yml") app_id = load_application_id(role_path) - - # 1) decrypt-or-plain-load - inventory = decrypt_inventory(inv_file, vault_pw) - - # 2) merge schema-driven credentials - schema = load_yaml(role_path / "meta" / "schema.yml") inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw) - # 3) vault leaves under credentials: + # 3) Vault any leaves under 'credentials:' mappings encrypt_credentials_branch(inventory, vault_pw) - # 4) vault top-level become password + # 4) Vault top-level ansible_become_password if present if "ansible_become_password" in inventory: val = str(inventory["ansible_become_password"]) if not val.lstrip().startswith("$ANSIBLE_VAULT"): @@ -191,8 +183,10 @@ def main(): body = "\n".join(line[indent:] for line in lines[1:]) inventory["ansible_become_password"] = VaultScalar(body) - # 5) write back YAML - save_yaml(inv_file, inventory) + # 5) Overwrite file with proper !vault | blocks only where needed + with open(inv_file, "w") as f: + yaml.dump(inventory, f, sort_keys=False, Dumper=SafeDumper) + print(f"✅ Inventory selectively vaulted → {inv_file}") if __name__ == "__main__":