mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-05-09 23:05:43 +02:00
Added draft for new role schema cli management tools and tests
This commit is contained in:
parent
9d9f11cb3d
commit
9764941c7e
1
cli/README.md
Normal file
1
cli/README.md
Normal file
@ -0,0 +1 @@
|
||||
Checkout https://chatgpt.com/c/681d9e2b-7b28-800f-aef8-4f1427e9021d
|
144
cli/generate_vaulted_credentials.py
Normal file
144
cli/generate_vaulted_credentials.py
Normal file
@ -0,0 +1,144 @@
|
||||
import yaml
|
||||
import argparse
|
||||
import secrets
|
||||
import hashlib
|
||||
import bcrypt
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
def prompt(text, default=None):
|
||||
"""Prompt the user for input, with optional default value."""
|
||||
prompt_text = f"[?] {text}" + (f" [{default}]" if default else "") + ": "
|
||||
response = input(prompt_text)
|
||||
return response.strip() or default
|
||||
|
||||
def generate_value(algorithm):
|
||||
"""Generate a value based on the provided algorithm."""
|
||||
if algorithm == "random_hex":
|
||||
return secrets.token_hex(64)
|
||||
elif algorithm == "sha256":
|
||||
return hashlib.sha256(secrets.token_bytes(32)).hexdigest()
|
||||
elif algorithm == "sha1":
|
||||
return hashlib.sha1(secrets.token_bytes(20)).hexdigest()
|
||||
elif algorithm == "bcrypt":
|
||||
password = secrets.token_urlsafe(16).encode()
|
||||
return bcrypt.hashpw(password, bcrypt.gensalt()).decode()
|
||||
elif algorithm == "plain":
|
||||
return secrets.token_urlsafe(32)
|
||||
else:
|
||||
return "undefined"
|
||||
|
||||
def encrypt_with_vault(value, name, vault_password_file=None, ask_vault_pass=False):
|
||||
"""Encrypt the given string using Ansible Vault."""
|
||||
cmd = ["ansible-vault", "encrypt_string", value, f"--name={name}"]
|
||||
if vault_password_file:
|
||||
cmd += ["--vault-password-file", vault_password_file]
|
||||
elif ask_vault_pass:
|
||||
cmd += ["--ask-vault-pass"]
|
||||
else:
|
||||
raise RuntimeError("You must provide --vault-password-file or use --ask-vault-pass.")
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Vault encryption failed:\n{result.stderr}")
|
||||
return result.stdout.strip()
|
||||
|
||||
def load_yaml_file(path):
|
||||
"""Load a YAML file or return an empty dict if not found."""
|
||||
if path.exists():
|
||||
with open(path, "r") as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
return {}
|
||||
|
||||
def save_yaml_file(path, data):
|
||||
"""Save a dictionary to a YAML file."""
|
||||
with open(path, "w") as f:
|
||||
yaml.dump(data, f, sort_keys=False)
|
||||
|
||||
def parse_overrides(pairs):
|
||||
"""Parse key=value overrides into a dictionary."""
|
||||
result = {}
|
||||
for pair in pairs:
|
||||
if "=" not in pair:
|
||||
continue
|
||||
k, v = pair.split("=", 1)
|
||||
result[k.strip()] = v.strip()
|
||||
return result
|
||||
|
||||
def apply_schema_to_inventory(schema, inventory_data, app_id, overrides, vault_password_file, ask_vault_pass):
|
||||
"""Merge schema into inventory under applications.{app_id}, encrypting all values."""
|
||||
inventory_data.setdefault("applications", {})
|
||||
applications = inventory_data["applications"]
|
||||
|
||||
if "applications" not in schema or app_id not in schema["applications"]:
|
||||
raise KeyError(f"Schema must contain 'applications.{app_id}'")
|
||||
|
||||
app_schema = schema["applications"][app_id]
|
||||
applications.setdefault(app_id, {})
|
||||
|
||||
def process_branch(branch, target, path_prefix=""):
|
||||
for key, meta in branch.items():
|
||||
full_key_path = f"{path_prefix}.{key}" if path_prefix else key
|
||||
if isinstance(meta, dict) and all(k in meta for k in ["description", "algorithm", "validation"]):
|
||||
if key in target:
|
||||
overwrite = prompt(f"Key '{full_key_path}' already exists. Overwrite?", "n").lower() == "y"
|
||||
if not overwrite:
|
||||
continue
|
||||
plain_value = overrides.get(full_key_path, generate_value(meta["algorithm"]))
|
||||
vaulted_value = encrypt_with_vault(plain_value, key, vault_password_file, ask_vault_pass)
|
||||
target[key] = yaml.load(vaulted_value, Loader=yaml.SafeLoader)
|
||||
elif isinstance(meta, dict):
|
||||
target.setdefault(key, {})
|
||||
process_branch(meta, target[key], full_key_path)
|
||||
else:
|
||||
target[key] = meta
|
||||
|
||||
process_branch(app_schema, applications[app_id])
|
||||
return inventory_data
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Generate Vault-encrypted credentials from schema and write to inventory.")
|
||||
parser.add_argument("--role-path", help="Path to the Ansible role")
|
||||
parser.add_argument("--inventory-file", help="Path to the inventory file to update")
|
||||
parser.add_argument("--application-id", help="Application ID to process (e.g. bigbluebutton)")
|
||||
parser.add_argument("--vault-password-file", help="Path to Ansible Vault password file")
|
||||
parser.add_argument("--ask-vault-pass", action="store_true", help="Prompt for vault password")
|
||||
parser.add_argument("--set", nargs="*", default=[], help="Override values as key=value")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Prompt for missing values
|
||||
role_path = Path(args.role_path or prompt("Path to Ansible role", "./roles/docker-bigbluebutton"))
|
||||
inventory_file = Path(args.inventory_file or prompt("Path to inventory file", "./host_vars/localhost.yml"))
|
||||
app_id = args.application_id or prompt("Application ID", "bigbluebutton")
|
||||
|
||||
if not args.vault_password_file and not args.ask_vault_pass:
|
||||
print("[?] No Vault password method provided.")
|
||||
print(" 1) Provide path to --vault-password-file")
|
||||
print(" 2) Use interactive prompt (--ask-vault-pass)")
|
||||
choice = prompt("Select method", "1")
|
||||
if choice == "1":
|
||||
args.vault_password_file = prompt("Vault password file", "~/.vault_pass.txt").replace("~", str(Path.home()))
|
||||
else:
|
||||
args.ask_vault_pass = True
|
||||
|
||||
# Load files
|
||||
schema_path = role_path / "meta" / "schema.yml"
|
||||
schema_data = load_yaml_file(schema_path)
|
||||
inventory_data = load_yaml_file(inventory_file)
|
||||
overrides = parse_overrides(args.set)
|
||||
|
||||
# Process and save
|
||||
updated = apply_schema_to_inventory(
|
||||
schema=schema_data,
|
||||
inventory_data=inventory_data,
|
||||
app_id=app_id,
|
||||
overrides=overrides,
|
||||
vault_password_file=args.vault_password_file,
|
||||
ask_vault_pass=args.ask_vault_pass
|
||||
)
|
||||
|
||||
save_yaml_file(inventory_file, updated)
|
||||
print(f"\n✅ Inventory file updated at: {inventory_file}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
133
tests/unit/test_generate_vaulted_credentials.py
Normal file
133
tests/unit/test_generate_vaulted_credentials.py
Normal file
@ -0,0 +1,133 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
import shutil
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
# Ensure cli directory is importable
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../cli")))
|
||||
|
||||
import generate_vaulted_credentials as gvc
|
||||
|
||||
|
||||
class TestGenerateVaultedCredentials(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Create temporary directory structure for a fake role and inventory
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.role_path = Path(self.temp_dir) / "roles" / "docker-demoapp"
|
||||
self.meta_path = self.role_path / "meta"
|
||||
self.meta_path.mkdir(parents=True)
|
||||
|
||||
# Define schema with no "applications" root (direct app-specific structure)
|
||||
self.schema = {
|
||||
"credentials": {
|
||||
"shared_secret": {
|
||||
"description": "A shared secret",
|
||||
"algorithm": "sha256",
|
||||
"validation": "^[a-f0-9]{64}$"
|
||||
},
|
||||
"postgresql_secret": {
|
||||
"description": "Postgres password",
|
||||
"algorithm": "bcrypt",
|
||||
"validation": "^\\$2[aby]\\$.{56}$"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
with open(self.meta_path / "schema.yml", "w") as f:
|
||||
yaml.dump(self.schema, f)
|
||||
|
||||
# Create an empty inventory file
|
||||
self.inventory_path = Path(self.temp_dir) / "host_vars" / "testhost.yml"
|
||||
self.inventory_path.parent.mkdir(parents=True)
|
||||
with open(self.inventory_path, "w") as f:
|
||||
f.write("")
|
||||
|
||||
self.vault_mock = "$ANSIBLE_VAULT;1.1;AES256\nmockedvaultdata=="
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_apply_schema_creates_vaulted_credentials(self):
|
||||
schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
|
||||
inventory_data = gvc.load_yaml_file(self.inventory_path)
|
||||
|
||||
with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt:
|
||||
mock_encrypt.return_value = self.vault_mock
|
||||
updated = gvc.apply_schema_to_inventory(
|
||||
schema=schema_data,
|
||||
inventory_data=inventory_data,
|
||||
app_id="demoapp",
|
||||
overrides={},
|
||||
vault_password_file="dummy",
|
||||
ask_vault_pass=False
|
||||
)
|
||||
|
||||
# Expect credentials to be written under applications.demoapp
|
||||
self.assertIn("applications", updated)
|
||||
self.assertIn("demoapp", updated["applications"])
|
||||
creds = updated["applications"]["demoapp"]["credentials"]
|
||||
self.assertIn("shared_secret", creds)
|
||||
self.assertIn("postgresql_secret", creds)
|
||||
|
||||
for key in creds:
|
||||
self.assertTrue(str(creds[key]).startswith("!vault") or "$ANSIBLE_VAULT" in str(creds[key]))
|
||||
|
||||
def test_existing_key_prompts_before_overwriting(self):
|
||||
# Pre-populate the inventory with one value
|
||||
pre_existing = {
|
||||
"applications": {
|
||||
"demoapp": {
|
||||
"credentials": {
|
||||
"shared_secret": "unchanged"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
gvc.save_yaml_file(self.inventory_path, pre_existing)
|
||||
|
||||
schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
|
||||
inventory_data = gvc.load_yaml_file(self.inventory_path)
|
||||
|
||||
with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt, \
|
||||
patch("builtins.input", return_value="n"):
|
||||
mock_encrypt.return_value = self.vault_mock
|
||||
updated = gvc.apply_schema_to_inventory(
|
||||
schema=schema_data,
|
||||
inventory_data=inventory_data,
|
||||
app_id="demoapp",
|
||||
overrides={},
|
||||
vault_password_file="dummy",
|
||||
ask_vault_pass=False
|
||||
)
|
||||
|
||||
# Value should remain unchanged
|
||||
self.assertEqual(updated["applications"]["demoapp"]["credentials"]["shared_secret"], "unchanged")
|
||||
|
||||
def test_set_override_applies_correctly(self):
|
||||
schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
|
||||
inventory_data = gvc.load_yaml_file(self.inventory_path)
|
||||
|
||||
override_value = "custom-override-value"
|
||||
override_key = "credentials.shared_secret"
|
||||
|
||||
# Patch vault encryption to just return the plaintext prefixed as mock
|
||||
with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt:
|
||||
mock_encrypt.side_effect = lambda val, name, **kwargs: f"$ANSIBLE_VAULT;1.1;AES256\n{val}"
|
||||
updated = gvc.apply_schema_to_inventory(
|
||||
schema=schema_data,
|
||||
inventory_data=inventory_data,
|
||||
app_id="demoapp",
|
||||
overrides={override_key: override_value},
|
||||
vault_password_file="dummy",
|
||||
ask_vault_pass=False
|
||||
)
|
||||
|
||||
actual = updated["applications"]["demoapp"]["credentials"]["shared_secret"]
|
||||
self.assertIn(override_value, str(actual), "The override value was not used during encryption.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
x
Reference in New Issue
Block a user