2025-05-20 12:53:10 +02:00

51 lines
2.1 KiB
Python

import subprocess
from typing import Any, Dict
from yaml.loader import SafeLoader
from yaml.dumper import SafeDumper
class VaultScalar(str):
"""A subclass of str to represent vault-encrypted strings."""
pass
def _vault_constructor(loader, node):
"""Custom constructor to handle !vault tag as plain text."""
return node.value
def _vault_representer(dumper, data):
"""Custom representer to dump VaultScalar as literal blocks."""
return dumper.represent_scalar('!vault', data, style='|')
SafeLoader.add_constructor('!vault', _vault_constructor)
SafeDumper.add_representer(VaultScalar, _vault_representer)
class VaultHandler:
def __init__(self, vault_password_file: str):
self.vault_password_file = vault_password_file
def encrypt_string(self, value: str, name: str) -> str:
"""Encrypt a string using ansible-vault."""
cmd = [
"ansible-vault", "encrypt_string",
value, f"--name={name}",
"--vault-password-file", self.vault_password_file
]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}")
return proc.stdout
def encrypt_leaves(self, branch: Dict[str, Any], vault_pw: str):
"""Recursively encrypt all leaves (plain text values) under the credentials section."""
for key, value in branch.items():
if isinstance(value, dict):
self.encrypt_leaves(value, vault_pw) # Recurse into nested dictionaries
else:
# Skip if already vaulted (i.e., starts with $ANSIBLE_VAULT)
if isinstance(value, str) and not value.lstrip().startswith("$ANSIBLE_VAULT"):
snippet = self.encrypt_string(value, key)
lines = snippet.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip())
body = "\n".join(line[indent:] for line in lines[1:])
branch[key] = VaultScalar(body) # Store encrypted value as VaultScalar