Optimized credential logics

This commit is contained in:
Kevin Veen-Birkenbach 2025-05-20 13:39:43 +02:00
parent 331da375b7
commit 5ad8a7e857
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
4 changed files with 101 additions and 25 deletions

View File

@ -9,6 +9,7 @@ from utils.handler.vault import VaultHandler, VaultScalar
from utils.handler.yaml import YamlHandler
from yaml.dumper import SafeDumper
def ask_for_confirmation(key: str) -> bool:
"""Prompt the user for confirmation to overwrite an existing value."""
confirmation = input(f"Are you sure you want to overwrite the value for '{key}'? (y/n): ").strip().lower()
@ -40,8 +41,19 @@ def main():
# 1) Apply schema and update inventory
updated_inventory = manager.apply_schema()
# 2) Vault any leaves under 'credentials:' mappings
manager.vault_handler.encrypt_leaves(updated_inventory, args.vault_password_file)
# 2) Apply vault encryption ONLY to 'credentials' fields (we no longer apply it globally)
credentials = updated_inventory.get("applications", {}).get(manager.app_id, {}).get("credentials", {})
for key, value in credentials.items():
if not value.lstrip().startswith("$ANSIBLE_VAULT"): # Only apply encryption if the value is not already vaulted
if key in credentials and not args.force:
if not ask_for_confirmation(key): # Ask for confirmation before overwriting
print(f"Skipping overwrite of '{key}'.")
continue
encrypted_value = manager.vault_handler.encrypt_string(value, key)
lines = encrypted_value.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip())
body = "\n".join(line[indent:] for line in lines[1:])
credentials[key] = VaultScalar(body) # Store encrypted value as VaultScalar
# 3) Vault top-level ansible_become_password if present
if "ansible_become_password" in updated_inventory:
@ -53,19 +65,12 @@ def main():
body = "\n".join(line[indent:] for line in lines[1:])
updated_inventory["ansible_become_password"] = VaultScalar(body)
# 4) Ask for confirmation before overwriting existing values
if not args.force:
for key in updated_inventory.get("applications", {}).get(manager.app_id, {}).get("credentials", {}).keys():
if key in updated_inventory["applications"][manager.app_id]["credentials"]:
if not ask_for_confirmation(key):
print(f"Skipping overwrite of '{key}'.")
continue
# 5) Save the updated inventory to file
# 4) Save the updated inventory to file
with open(args.inventory_file, "w", encoding="utf-8") as f:
yaml.dump(updated_inventory, f, sort_keys=False, Dumper=SafeDumper)
print(f"✅ Inventory selectively vaulted → {args.inventory_file}")
if __name__ == "__main__":
main()

66
cli/encrypt_inventory.py Normal file
View File

@ -0,0 +1,66 @@
import argparse
import subprocess
import sys
from pathlib import Path
import yaml
from typing import Dict, Any
from utils.handler.vault import VaultHandler, VaultScalar
from utils.handler.yaml import YamlHandler
from yaml.dumper import SafeDumper
def ask_for_confirmation(key: str) -> bool:
"""Prompt the user for confirmation to overwrite an existing value."""
confirmation = input(f"Do you want to encrypt the value for '{key}'? (y/n): ").strip().lower()
return confirmation == 'y'
def encrypt_recursively(data: Any, vault_handler: VaultHandler, ask_confirmation: bool = True, prefix: str = "") -> Any:
"""Recursively encrypt values in the data."""
if isinstance(data, dict):
for key, value in data.items():
new_prefix = f"{prefix}.{key}" if prefix else key
data[key] = encrypt_recursively(value, vault_handler, ask_confirmation, new_prefix)
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = encrypt_recursively(item, vault_handler, ask_confirmation, prefix)
elif isinstance(data, str):
# Only encrypt if it's not already vaulted
if not data.lstrip().startswith("$ANSIBLE_VAULT"):
if ask_confirmation:
# Ask for confirmation before encrypting if not `--all`
if not ask_for_confirmation(prefix):
print(f"Skipping encryption for '{prefix}'.")
return data
encrypted_value = vault_handler.encrypt_string(data, prefix)
lines = encrypted_value.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip())
body = "\n".join(line[indent:] for line in lines[1:])
return VaultScalar(body) # Store encrypted value as VaultScalar
return data
def main():
parser = argparse.ArgumentParser(
description="Encrypt all fields, ask for confirmation unless --all is specified."
)
parser.add_argument("--inventory-file", required=True, help="Host vars file to update")
parser.add_argument("--vault-password-file", required=True, help="Vault password file")
parser.add_argument("--all", action="store_true", help="Encrypt all fields without confirmation")
args = parser.parse_args()
# Initialize the VaultHandler and load the inventory
vault_handler = VaultHandler(vault_password_file=args.vault_password_file)
updated_inventory = YamlHandler.load_yaml(Path(args.inventory_file))
# 1) Encrypt all fields recursively
updated_inventory = encrypt_recursively(updated_inventory, vault_handler, ask_confirmation=not args.all)
# 2) Save the updated inventory to file
with open(args.inventory_file, "w", encoding="utf-8") as f:
yaml.dump(updated_inventory, f, sort_keys=False, Dumper=SafeDumper)
print(f"✅ Inventory selectively vaulted → {args.inventory_file}")
if __name__ == "__main__":
main()

View File

@ -47,15 +47,17 @@ class InventoryManager:
"value": self.generate_value("alphanumeric") # Generate the password value
}
self.recurse(self.schema, target)
# Apply recursion only for the `credentials` section
self.recurse_credentials(self.schema, target)
return self.inventory
def recurse(self, branch: dict, dest: dict, prefix: str = ""):
"""Recursively process the schema and generate values."""
def recurse_credentials(self, branch: dict, dest: dict, prefix: str = ""):
"""Recursively process only the 'credentials' section and generate values."""
for key, meta in branch.items():
full_key = f"{prefix}.{key}" if prefix else key
if isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")):
# Only process 'credentials' section for encryption
if prefix == "credentials" and isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")):
alg = meta["algorithm"]
if alg == "plain":
# Must be supplied via --set
@ -66,6 +68,14 @@ class InventoryManager:
else:
plain = self.overrides.get(full_key, self.generate_value(alg))
# Check if the value already starts with '$ANSIBLE_VAULT', indicating it's already vaulted
existing_value = dest.get(key)
if existing_value and existing_value.lstrip().startswith("$ANSIBLE_VAULT"):
# Skip vaulting if the value is already vaulted
print(f"Skipping encryption for '{key}', as it is already vaulted.")
continue
# Encrypt only if it's not already vaulted
snippet = self.vault_handler.encrypt_string(plain, key)
lines = snippet.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip())
@ -74,11 +84,11 @@ class InventoryManager:
elif isinstance(meta, dict):
sub = dest.setdefault(key, {})
self.recurse(meta, sub, full_key)
self.recurse_credentials(meta, sub, full_key)
else:
dest[key] = meta
def generate_secure_alphanumeric(length: int) -> str:
def generate_secure_alphanumeric(self, length: int) -> str:
"""Generate a cryptographically secure random alphanumeric string of the given length."""
characters = string.ascii_letters + string.digits # a-zA-Z0-9
return ''.join(secrets.choice(characters) for _ in range(length))
@ -95,5 +105,5 @@ class InventoryManager:
pw = secrets.token_urlsafe(16).encode()
return bcrypt.hashpw(pw, bcrypt.gensalt()).decode()
if algorithm == "alphanumeric":
return generate_secure_alphanumeric(64)
return "undefined"
return self.generate_secure_alphanumeric(64)
return "undefined"

View File

@ -1,9 +1,4 @@
credentials:
database_password:
description: "Password for the PeerTube PostgreSQL database"
algorithm: "bcrypt"
validation: "^\\$2[aby]\\$.{56}$"
secret:
description: "PeerTube secret used for session signing and CSRF protection"
algorithm: "sha256"