Solved linebreak bug

This commit is contained in:
Kevin Veen-Birkenbach 2025-05-15 16:35:57 +02:00
parent 9eca1958ec
commit db095b77dc
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E

View File

@ -8,24 +8,25 @@ from pathlib import Path
import yaml import yaml
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
from yaml.dumper import SafeDumper
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
# Let PyYAML treat !vault blocks as ordinary multiline strings # On load: treat any !vault tag as plain text
def _vault_constructor(loader, node): def _vault_constructor(loader, node):
return node.value return node.value
SafeLoader.add_constructor('!vault', _vault_constructor) SafeLoader.add_constructor('!vault', _vault_constructor)
# VaultScalar so PyYAML emits a !vault literal block on output # A str subclass so PyYAML emits !vault literal blocks on dump
class VaultScalar(str): class VaultScalar(str):
pass pass
def _vault_representer(dumper, data): def _vault_representer(dumper, data):
return dumper.represent_scalar('!vault', data, style='|') return dumper.represent_scalar('!vault', data, style='|')
yaml.SafeDumper.add_representer(VaultScalar, _vault_representer) SafeDumper.add_representer(VaultScalar, _vault_representer)
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
def generate_value(algorithm): def generate_value(algorithm: str) -> str:
if algorithm == "random_hex": if algorithm == "random_hex":
return secrets.token_hex(64) return secrets.token_hex(64)
if algorithm == "sha256": if algorithm == "sha256":
@ -39,23 +40,29 @@ def generate_value(algorithm):
return secrets.token_urlsafe(32) return secrets.token_urlsafe(32)
return "undefined" return "undefined"
def decrypt_inventory(path: Path, vault_password_file: str): def wrap_existing_vaults(node):
""" """
Try `ansible-vault view`; on "not vaulted" fallback to yaml.safe_load Recursively walk the data and wrap any str that begins with
(with !vault constructor). '$ANSIBLE_VAULT' in a VaultScalar so it dumps as a literal.
""" """
proc = subprocess.run( if isinstance(node, dict):
["ansible-vault", "view", str(path), "--vault-password-file", vault_password_file], return {k: wrap_existing_vaults(v) for k, v in node.items()}
capture_output=True, text=True if isinstance(node, list):
) return [wrap_existing_vaults(v) for v in node]
if proc.returncode == 0: if isinstance(node, str) and node.lstrip().startswith("$ANSIBLE_VAULT"):
return yaml.safe_load(proc.stdout) or {} return VaultScalar(node)
# fallback if not vaulted return node
if "not a vault encrypted file" in proc.stderr.lower():
return yaml.safe_load(path.read_text()) or {}
raise RuntimeError(f"ansible-vault view failed:\n{proc.stderr}")
def encrypt_with_vault(value: str, name: str, vault_password_file: str): def load_yaml_plain(path: Path) -> dict:
"""
Load any YAML (vaulted or not) via SafeLoader + our !vault constructor,
then wrap existing vaultblocks for correct literal dumping.
"""
text = path.read_text()
data = yaml.load(text, Loader=SafeLoader) or {}
return wrap_existing_vaults(data)
def encrypt_with_vault(value: str, name: str, vault_password_file: str) -> str:
cmd = [ cmd = [
"ansible-vault", "encrypt_string", "ansible-vault", "encrypt_string",
value, f"--name={name}", value, f"--name={name}",
@ -66,15 +73,7 @@ def encrypt_with_vault(value: str, name: str, vault_password_file: str):
raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}") raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}")
return proc.stdout return proc.stdout
def load_yaml(path: Path): def parse_overrides(pairs: list[str]) -> dict:
if not path.exists():
return {}
return yaml.safe_load(path.read_text()) or {}
def save_yaml(path: Path, data):
path.write_text(yaml.dump(data, sort_keys=False, Dumper=yaml.SafeDumper))
def parse_overrides(pairs):
out = {} out = {}
for p in pairs: for p in pairs:
if "=" in p: if "=" in p:
@ -82,11 +81,9 @@ def parse_overrides(pairs):
out[k.strip()] = v.strip() out[k.strip()] = v.strip()
return out return out
def load_application_id(role_path: Path): def load_application_id(role_path: Path) -> str:
vars_file = role_path / "vars" / "main.yml" vars_file = role_path / "vars" / "main.yml"
if not vars_file.exists(): data = load_yaml_plain(vars_file)
raise FileNotFoundError(f"{vars_file} not found")
data = load_yaml(vars_file)
app_id = data.get("application_id") app_id = data.get("application_id")
if not app_id: if not app_id:
raise KeyError(f"'application_id' missing in {vars_file}") raise KeyError(f"'application_id' missing in {vars_file}")
@ -96,15 +93,15 @@ def apply_schema(schema: dict,
inventory: dict, inventory: dict,
app_id: str, app_id: str,
overrides: dict, overrides: dict,
vault_pw: str): vault_pw: str) -> dict:
apps = inventory.setdefault("applications", {}) apps = inventory.setdefault("applications", {})
target = apps.setdefault(app_id, {}) target = apps.setdefault(app_id, {})
def recurse(branch, dest, prefix=""): def recurse(branch: dict, dest: dict, prefix: str = ""):
for key, meta in branch.items(): for key, meta in branch.items():
full = f"{prefix}.{key}" if prefix else key full = f"{prefix}.{key}" if prefix else key
# leaf spec # leaf node
if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")): if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")):
plain = overrides.get(full, generate_value(meta["algorithm"])) plain = overrides.get(full, generate_value(meta["algorithm"]))
snippet = encrypt_with_vault(plain, key, vault_pw) snippet = encrypt_with_vault(plain, key, vault_pw)
@ -113,12 +110,12 @@ def apply_schema(schema: dict,
body = "\n".join(line[indent:] for line in lines[1:]) body = "\n".join(line[indent:] for line in lines[1:])
dest[key] = VaultScalar(body) dest[key] = VaultScalar(body)
# nested # nested mapping
elif isinstance(meta, dict): elif isinstance(meta, dict):
sub = dest.setdefault(key, {}) sub = dest.setdefault(key, {})
recurse(meta, sub, full) recurse(meta, sub, full)
# passthrough # literal passthrough
else: else:
dest[key] = meta dest[key] = meta
@ -141,27 +138,23 @@ def encrypt_leaves(branch: dict, vault_pw: str):
def encrypt_credentials_branch(node, vault_pw: str): def encrypt_credentials_branch(node, vault_pw: str):
if isinstance(node, dict): if isinstance(node, dict):
for k, v in node.items(): for key, val in node.items():
if k == "credentials" and isinstance(v, dict): if key == "credentials" and isinstance(val, dict):
encrypt_leaves(v, vault_pw) encrypt_leaves(val, vault_pw)
else: else:
encrypt_credentials_branch(v, vault_pw) encrypt_credentials_branch(val, vault_pw)
elif isinstance(node, list): elif isinstance(node, list):
for item in node: for item in node:
encrypt_credentials_branch(item, vault_pw) encrypt_credentials_branch(item, vault_pw)
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Selectively vault credentials + become-password in an inventory file" description="Selectively vault credentials + become-password in your inventory."
) )
parser.add_argument("--role-path", required=True, parser.add_argument("--role-path", required=True, help="Path to your role")
help="Path to your Ansible role") parser.add_argument("--inventory-file", required=True, help="host_vars file to update")
parser.add_argument("--inventory-file", required=True, parser.add_argument("--vault-password-file", required=True, help="Vault password file")
help="Vaulted host_vars file to update") parser.add_argument("--set", nargs="*", default=[], help="Override values key.subkey=VALUE")
parser.add_argument("--vault-password-file", required=True,
help="Ansible Vault password file")
parser.add_argument("--set", nargs="*", default=[],
help="Override values as key.subkey=VALUE")
args = parser.parse_args() args = parser.parse_args()
role_path = Path(args.role_path) role_path = Path(args.role_path)
@ -169,19 +162,18 @@ def main():
vault_pw = args.vault_password_file vault_pw = args.vault_password_file
overrides = parse_overrides(args.set) overrides = parse_overrides(args.set)
# 1) Load & wrap existing vault blocks
inventory = load_yaml_plain(inv_file)
# 2) Merge in any schema-driven credentials
schema = load_yaml_plain(role_path / "meta" / "schema.yml")
app_id = load_application_id(role_path) app_id = load_application_id(role_path)
# 1) decrypt-or-plain-load
inventory = decrypt_inventory(inv_file, vault_pw)
# 2) merge schema-driven credentials
schema = load_yaml(role_path / "meta" / "schema.yml")
inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw) inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw)
# 3) vault leaves under credentials: # 3) Vault any leaves under 'credentials:' mappings
encrypt_credentials_branch(inventory, vault_pw) encrypt_credentials_branch(inventory, vault_pw)
# 4) vault top-level become password # 4) Vault top-level ansible_become_password if present
if "ansible_become_password" in inventory: if "ansible_become_password" in inventory:
val = str(inventory["ansible_become_password"]) val = str(inventory["ansible_become_password"])
if not val.lstrip().startswith("$ANSIBLE_VAULT"): if not val.lstrip().startswith("$ANSIBLE_VAULT"):
@ -191,8 +183,10 @@ def main():
body = "\n".join(line[indent:] for line in lines[1:]) body = "\n".join(line[indent:] for line in lines[1:])
inventory["ansible_become_password"] = VaultScalar(body) inventory["ansible_become_password"] = VaultScalar(body)
# 5) write back YAML # 5) Overwrite file with proper !vault | blocks only where needed
save_yaml(inv_file, inventory) with open(inv_file, "w") as f:
yaml.dump(inventory, f, sort_keys=False, Dumper=SafeDumper)
print(f"✅ Inventory selectively vaulted → {inv_file}") print(f"✅ Inventory selectively vaulted → {inv_file}")
if __name__ == "__main__": if __name__ == "__main__":