mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-05-18 02:34:38 +02:00
206 lines
7.9 KiB
Python
206 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import secrets
|
||
import hashlib
|
||
import bcrypt
|
||
import subprocess
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
import yaml
|
||
from yaml.loader import SafeLoader
|
||
from yaml.dumper import SafeDumper
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# On load: treat any !vault tag as plain text
|
||
def _vault_constructor(loader, node):
|
||
return node.value
|
||
SafeLoader.add_constructor('!vault', _vault_constructor)
|
||
|
||
# A str subclass so PyYAML emits !vault literal blocks on dump
|
||
class VaultScalar(str):
|
||
pass
|
||
|
||
def _vault_representer(dumper, data):
|
||
return dumper.represent_scalar('!vault', data, style='|')
|
||
|
||
SafeDumper.add_representer(VaultScalar, _vault_representer)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def generate_value(algorithm: str) -> str:
|
||
if algorithm == "random_hex":
|
||
return secrets.token_hex(64)
|
||
if algorithm == "sha256":
|
||
return hashlib.sha256(secrets.token_bytes(32)).hexdigest()
|
||
if algorithm == "sha1":
|
||
return hashlib.sha1(secrets.token_bytes(20)).hexdigest()
|
||
if algorithm == "bcrypt":
|
||
pw = secrets.token_urlsafe(16).encode()
|
||
return bcrypt.hashpw(pw, bcrypt.gensalt()).decode()
|
||
# we should never auto-generate for "plain"
|
||
return "undefined"
|
||
|
||
def wrap_existing_vaults(node):
|
||
"""
|
||
Recursively wrap any str that begins with '$ANSIBLE_VAULT'
|
||
in a VaultScalar so it dumps as a literal block.
|
||
"""
|
||
if isinstance(node, dict):
|
||
return {k: wrap_existing_vaults(v) for k, v in node.items()}
|
||
if isinstance(node, list):
|
||
return [wrap_existing_vaults(v) for v in node]
|
||
if isinstance(node, str) and node.lstrip().startswith("$ANSIBLE_VAULT"):
|
||
return VaultScalar(node)
|
||
return node
|
||
|
||
def load_yaml_plain(path: Path) -> dict:
|
||
"""
|
||
Load any YAML (vaulted or not) via SafeLoader + our !vault constructor,
|
||
then wrap existing vault‐blocks for correct literal dumping.
|
||
"""
|
||
text = path.read_text()
|
||
data = yaml.load(text, Loader=SafeLoader) or {}
|
||
return wrap_existing_vaults(data)
|
||
|
||
def encrypt_with_vault(value: str, name: str, vault_password_file: str) -> str:
|
||
cmd = [
|
||
"ansible-vault", "encrypt_string",
|
||
value, f"--name={name}",
|
||
"--vault-password-file", vault_password_file
|
||
]
|
||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}")
|
||
return proc.stdout
|
||
|
||
def parse_overrides(pairs: list[str]) -> dict:
|
||
out = {}
|
||
for p in pairs:
|
||
if "=" in p:
|
||
k, v = p.split("=", 1)
|
||
out[k.strip()] = v.strip()
|
||
return out
|
||
|
||
def load_application_id(role_path: Path) -> str:
|
||
vars_file = role_path / "vars" / "main.yml"
|
||
data = load_yaml_plain(vars_file)
|
||
app_id = data.get("application_id")
|
||
if not app_id:
|
||
print(f"ERROR: 'application_id' missing in {vars_file}", file=sys.stderr)
|
||
sys.exit(1)
|
||
return app_id
|
||
|
||
def apply_schema(schema: dict,
|
||
inventory: dict,
|
||
app_id: str,
|
||
overrides: dict,
|
||
vault_pw: str) -> dict:
|
||
apps = inventory.setdefault("applications", {})
|
||
target = apps.setdefault(app_id, {})
|
||
|
||
def recurse(branch: dict, dest: dict, prefix: str = ""):
|
||
for key, meta in branch.items():
|
||
full_key = f"{prefix}.{key}" if prefix else key
|
||
|
||
# leaf node spec
|
||
if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")):
|
||
alg = meta["algorithm"]
|
||
if alg == "plain":
|
||
# must be supplied via --set
|
||
if full_key not in overrides:
|
||
print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=<value>", file=sys.stderr)
|
||
sys.exit(1)
|
||
plain = overrides[full_key]
|
||
else:
|
||
# generate or override
|
||
plain = overrides.get(full_key, generate_value(alg))
|
||
|
||
snippet = encrypt_with_vault(plain, key, vault_pw)
|
||
lines = snippet.splitlines()
|
||
indent = len(lines[1]) - len(lines[1].lstrip())
|
||
body = "\n".join(line[indent:] for line in lines[1:])
|
||
dest[key] = VaultScalar(body)
|
||
|
||
# nested mapping
|
||
elif isinstance(meta, dict):
|
||
sub = dest.setdefault(key, {})
|
||
recurse(meta, sub, full_key)
|
||
|
||
# literal passthrough
|
||
else:
|
||
dest[key] = meta
|
||
|
||
recurse(schema, target)
|
||
return inventory
|
||
|
||
def encrypt_leaves(branch: dict, vault_pw: str):
|
||
for k, v in list(branch.items()):
|
||
if isinstance(v, dict):
|
||
encrypt_leaves(v, vault_pw)
|
||
else:
|
||
plain = str(v)
|
||
# skip if already vaulted
|
||
if plain.lstrip().startswith("$ANSIBLE_VAULT"):
|
||
continue
|
||
snippet = encrypt_with_vault(plain, k, vault_pw)
|
||
lines = snippet.splitlines()
|
||
indent = len(lines[1]) - len(lines[1].lstrip())
|
||
body = "\n".join(line[indent:] for line in lines[1:])
|
||
branch[k] = VaultScalar(body)
|
||
|
||
def encrypt_credentials_branch(node, vault_pw: str):
|
||
if isinstance(node, dict):
|
||
for key, val in node.items():
|
||
if key == "credentials" and isinstance(val, dict):
|
||
encrypt_leaves(val, vault_pw)
|
||
else:
|
||
encrypt_credentials_branch(val, vault_pw)
|
||
elif isinstance(node, list):
|
||
for item in node:
|
||
encrypt_credentials_branch(item, vault_pw)
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Selectively vault credentials + become-password in your inventory."
|
||
)
|
||
parser.add_argument("--role-path", required=True, help="Path to your role")
|
||
parser.add_argument("--inventory-file", required=True, help="host_vars file to update")
|
||
parser.add_argument("--vault-password-file",required=True, help="Vault password file")
|
||
parser.add_argument("--set", nargs="*", default=[], help="Override values key.subkey=VALUE")
|
||
args = parser.parse_args()
|
||
|
||
role_path = Path(args.role_path)
|
||
inv_file = Path(args.inventory_file)
|
||
vault_pw = args.vault_password_file
|
||
overrides = parse_overrides(args.set)
|
||
|
||
# 1) Load & wrap any existing vault blocks
|
||
inventory = load_yaml_plain(inv_file)
|
||
|
||
# 2) Merge schema-driven credentials (plain ones must be overridden)
|
||
schema = load_yaml_plain(role_path / "meta" / "schema.yml")
|
||
app_id = load_application_id(role_path)
|
||
inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw)
|
||
|
||
# 3) Vault any leaves under 'credentials:' mappings
|
||
encrypt_credentials_branch(inventory, vault_pw)
|
||
|
||
# 4) Vault top-level ansible_become_password if present
|
||
if "ansible_become_password" in inventory:
|
||
val = str(inventory["ansible_become_password"])
|
||
if not val.lstrip().startswith("$ANSIBLE_VAULT"):
|
||
snippet = encrypt_with_vault(val, "ansible_become_password", vault_pw)
|
||
lines = snippet.splitlines()
|
||
indent = len(lines[1]) - len(lines[1].lstrip())
|
||
body = "\n".join(line[indent:] for line in lines[1:])
|
||
inventory["ansible_become_password"] = VaultScalar(body)
|
||
|
||
# 5) Overwrite file with proper !vault literal blocks only where needed
|
||
with open(inv_file, "w", encoding="utf-8") as f:
|
||
yaml.dump(inventory, f, sort_keys=False, Dumper=SafeDumper)
|
||
|
||
print(f"✅ Inventory selectively vaulted → {inv_file}")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|