Added --set for plain

This commit is contained in:
Kevin Veen-Birkenbach 2025-05-15 17:13:19 +02:00
parent 778c4803ed
commit 77e32cc5a6
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E

View File

@ -4,6 +4,7 @@ import secrets
import hashlib import hashlib
import bcrypt import bcrypt
import subprocess import subprocess
import sys
from pathlib import Path from pathlib import Path
import yaml import yaml
@ -36,14 +37,13 @@ def generate_value(algorithm: str) -> str:
if algorithm == "bcrypt": if algorithm == "bcrypt":
pw = secrets.token_urlsafe(16).encode() pw = secrets.token_urlsafe(16).encode()
return bcrypt.hashpw(pw, bcrypt.gensalt()).decode() return bcrypt.hashpw(pw, bcrypt.gensalt()).decode()
if algorithm == "plain": # we should never auto-generate for "plain"
return secrets.token_urlsafe(32)
return "undefined" return "undefined"
def wrap_existing_vaults(node): def wrap_existing_vaults(node):
""" """
Recursively walk the data and wrap any str that begins with Recursively wrap any str that begins with '$ANSIBLE_VAULT'
'$ANSIBLE_VAULT' in a VaultScalar so it dumps as a literal. in a VaultScalar so it dumps as a literal block.
""" """
if isinstance(node, dict): if isinstance(node, dict):
return {k: wrap_existing_vaults(v) for k, v in node.items()} return {k: wrap_existing_vaults(v) for k, v in node.items()}
@ -86,7 +86,8 @@ def load_application_id(role_path: Path) -> str:
data = load_yaml_plain(vars_file) data = load_yaml_plain(vars_file)
app_id = data.get("application_id") app_id = data.get("application_id")
if not app_id: if not app_id:
raise KeyError(f"'application_id' missing in {vars_file}") print(f"ERROR: 'application_id' missing in {vars_file}", file=sys.stderr)
sys.exit(1)
return app_id return app_id
def apply_schema(schema: dict, def apply_schema(schema: dict,
@ -99,11 +100,21 @@ def apply_schema(schema: dict,
def recurse(branch: dict, dest: dict, prefix: str = ""): def recurse(branch: dict, dest: dict, prefix: str = ""):
for key, meta in branch.items(): for key, meta in branch.items():
full = f"{prefix}.{key}" if prefix else key full_key = f"{prefix}.{key}" if prefix else key
# leaf node # leaf node spec
if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")): if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")):
plain = overrides.get(full, generate_value(meta["algorithm"])) alg = meta["algorithm"]
if alg == "plain":
# must be supplied via --set
if full_key not in overrides:
print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=<value>", file=sys.stderr)
sys.exit(1)
plain = overrides[full_key]
else:
# generate or override
plain = overrides.get(full_key, generate_value(alg))
snippet = encrypt_with_vault(plain, key, vault_pw) snippet = encrypt_with_vault(plain, key, vault_pw)
lines = snippet.splitlines() lines = snippet.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip()) indent = len(lines[1]) - len(lines[1].lstrip())
@ -113,7 +124,7 @@ def apply_schema(schema: dict,
# nested mapping # nested mapping
elif isinstance(meta, dict): elif isinstance(meta, dict):
sub = dest.setdefault(key, {}) sub = dest.setdefault(key, {})
recurse(meta, sub, full) recurse(meta, sub, full_key)
# literal passthrough # literal passthrough
else: else:
@ -128,6 +139,7 @@ def encrypt_leaves(branch: dict, vault_pw: str):
encrypt_leaves(v, vault_pw) encrypt_leaves(v, vault_pw)
else: else:
plain = str(v) plain = str(v)
# skip if already vaulted
if plain.lstrip().startswith("$ANSIBLE_VAULT"): if plain.lstrip().startswith("$ANSIBLE_VAULT"):
continue continue
snippet = encrypt_with_vault(plain, k, vault_pw) snippet = encrypt_with_vault(plain, k, vault_pw)
@ -153,7 +165,7 @@ def main():
) )
parser.add_argument("--role-path", required=True, help="Path to your role") parser.add_argument("--role-path", required=True, help="Path to your role")
parser.add_argument("--inventory-file", required=True, help="host_vars file to update") parser.add_argument("--inventory-file", required=True, help="host_vars file to update")
parser.add_argument("--vault-password-file", required=True, help="Vault password file") parser.add_argument("--vault-password-file",required=True, help="Vault password file")
parser.add_argument("--set", nargs="*", default=[], help="Override values key.subkey=VALUE") parser.add_argument("--set", nargs="*", default=[], help="Override values key.subkey=VALUE")
args = parser.parse_args() args = parser.parse_args()
@ -162,10 +174,10 @@ def main():
vault_pw = args.vault_password_file vault_pw = args.vault_password_file
overrides = parse_overrides(args.set) overrides = parse_overrides(args.set)
# 1) Load & wrap existing vault blocks # 1) Load & wrap any existing vault blocks
inventory = load_yaml_plain(inv_file) inventory = load_yaml_plain(inv_file)
# 2) Merge in any schema-driven credentials # 2) Merge schema-driven credentials (plain ones must be overridden)
schema = load_yaml_plain(role_path / "meta" / "schema.yml") schema = load_yaml_plain(role_path / "meta" / "schema.yml")
app_id = load_application_id(role_path) app_id = load_application_id(role_path)
inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw) inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw)
@ -183,8 +195,8 @@ def main():
body = "\n".join(line[indent:] for line in lines[1:]) body = "\n".join(line[indent:] for line in lines[1:])
inventory["ansible_become_password"] = VaultScalar(body) inventory["ansible_become_password"] = VaultScalar(body)
# 5) Overwrite file with proper !vault | blocks only where needed # 5) Overwrite file with proper !vault literal blocks only where needed
with open(inv_file, "w") as f: with open(inv_file, "w", encoding="utf-8") as f:
yaml.dump(inventory, f, sort_keys=False, Dumper=SafeDumper) yaml.dump(inventory, f, sort_keys=False, Dumper=SafeDumper)
print(f"✅ Inventory selectively vaulted → {inv_file}") print(f"✅ Inventory selectively vaulted → {inv_file}")