diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 00000000..5a0bd3f1 --- /dev/null +++ b/cli/README.md @@ -0,0 +1 @@ +Checkout https://chatgpt.com/c/681d9e2b-7b28-800f-aef8-4f1427e9021d \ No newline at end of file diff --git a/cli/generate_vaulted_credentials.py b/cli/generate_vaulted_credentials.py new file mode 100644 index 00000000..f53dafc2 --- /dev/null +++ b/cli/generate_vaulted_credentials.py @@ -0,0 +1,144 @@ +import yaml +import argparse +import secrets +import hashlib +import bcrypt +import subprocess +from pathlib import Path + +def prompt(text, default=None): + """Prompt the user for input, with optional default value.""" + prompt_text = f"[?] {text}" + (f" [{default}]" if default else "") + ": " + response = input(prompt_text) + return response.strip() or default + +def generate_value(algorithm): + """Generate a value based on the provided algorithm.""" + if algorithm == "random_hex": + return secrets.token_hex(64) + elif algorithm == "sha256": + return hashlib.sha256(secrets.token_bytes(32)).hexdigest() + elif algorithm == "sha1": + return hashlib.sha1(secrets.token_bytes(20)).hexdigest() + elif algorithm == "bcrypt": + password = secrets.token_urlsafe(16).encode() + return bcrypt.hashpw(password, bcrypt.gensalt()).decode() + elif algorithm == "plain": + return secrets.token_urlsafe(32) + else: + return "undefined" + +def encrypt_with_vault(value, name, vault_password_file=None, ask_vault_pass=False): + """Encrypt the given string using Ansible Vault.""" + cmd = ["ansible-vault", "encrypt_string", value, f"--name={name}"] + if vault_password_file: + cmd += ["--vault-password-file", vault_password_file] + elif ask_vault_pass: + cmd += ["--ask-vault-pass"] + else: + raise RuntimeError("You must provide --vault-password-file or use --ask-vault-pass.") + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f"Vault encryption failed:\n{result.stderr}") + return result.stdout.strip() + +def load_yaml_file(path): + """Load a YAML file or return an empty dict if not found.""" + if path.exists(): + with open(path, "r") as f: + return yaml.safe_load(f) or {} + return {} + +def save_yaml_file(path, data): + """Save a dictionary to a YAML file.""" + with open(path, "w") as f: + yaml.dump(data, f, sort_keys=False) + +def parse_overrides(pairs): + """Parse key=value overrides into a dictionary.""" + result = {} + for pair in pairs: + if "=" not in pair: + continue + k, v = pair.split("=", 1) + result[k.strip()] = v.strip() + return result + +def apply_schema_to_inventory(schema, inventory_data, app_id, overrides, vault_password_file, ask_vault_pass): + """Merge schema into inventory under applications.{app_id}, encrypting all values.""" + inventory_data.setdefault("applications", {}) + applications = inventory_data["applications"] + + if "applications" not in schema or app_id not in schema["applications"]: + raise KeyError(f"Schema must contain 'applications.{app_id}'") + + app_schema = schema["applications"][app_id] + applications.setdefault(app_id, {}) + + def process_branch(branch, target, path_prefix=""): + for key, meta in branch.items(): + full_key_path = f"{path_prefix}.{key}" if path_prefix else key + if isinstance(meta, dict) and all(k in meta for k in ["description", "algorithm", "validation"]): + if key in target: + overwrite = prompt(f"Key '{full_key_path}' already exists. Overwrite?", "n").lower() == "y" + if not overwrite: + continue + plain_value = overrides.get(full_key_path, generate_value(meta["algorithm"])) + vaulted_value = encrypt_with_vault(plain_value, key, vault_password_file, ask_vault_pass) + target[key] = yaml.load(vaulted_value, Loader=yaml.SafeLoader) + elif isinstance(meta, dict): + target.setdefault(key, {}) + process_branch(meta, target[key], full_key_path) + else: + target[key] = meta + + process_branch(app_schema, applications[app_id]) + return inventory_data + +def main(): + parser = argparse.ArgumentParser(description="Generate Vault-encrypted credentials from schema and write to inventory.") + parser.add_argument("--role-path", help="Path to the Ansible role") + parser.add_argument("--inventory-file", help="Path to the inventory file to update") + parser.add_argument("--application-id", help="Application ID to process (e.g. bigbluebutton)") + parser.add_argument("--vault-password-file", help="Path to Ansible Vault password file") + parser.add_argument("--ask-vault-pass", action="store_true", help="Prompt for vault password") + parser.add_argument("--set", nargs="*", default=[], help="Override values as key=value") + args = parser.parse_args() + + # Prompt for missing values + role_path = Path(args.role_path or prompt("Path to Ansible role", "./roles/docker-bigbluebutton")) + inventory_file = Path(args.inventory_file or prompt("Path to inventory file", "./host_vars/localhost.yml")) + app_id = args.application_id or prompt("Application ID", "bigbluebutton") + + if not args.vault_password_file and not args.ask_vault_pass: + print("[?] No Vault password method provided.") + print(" 1) Provide path to --vault-password-file") + print(" 2) Use interactive prompt (--ask-vault-pass)") + choice = prompt("Select method", "1") + if choice == "1": + args.vault_password_file = prompt("Vault password file", "~/.vault_pass.txt").replace("~", str(Path.home())) + else: + args.ask_vault_pass = True + + # Load files + schema_path = role_path / "meta" / "schema.yml" + schema_data = load_yaml_file(schema_path) + inventory_data = load_yaml_file(inventory_file) + overrides = parse_overrides(args.set) + + # Process and save + updated = apply_schema_to_inventory( + schema=schema_data, + inventory_data=inventory_data, + app_id=app_id, + overrides=overrides, + vault_password_file=args.vault_password_file, + ask_vault_pass=args.ask_vault_pass + ) + + save_yaml_file(inventory_file, updated) + print(f"\n✅ Inventory file updated at: {inventory_file}") + +if __name__ == "__main__": + main() diff --git a/tests/unit/test_generate_vaulted_credentials.py b/tests/unit/test_generate_vaulted_credentials.py new file mode 100644 index 00000000..f18396e4 --- /dev/null +++ b/tests/unit/test_generate_vaulted_credentials.py @@ -0,0 +1,133 @@ +import os +import sys +import tempfile +import unittest +import shutil +import yaml +from pathlib import Path +from unittest.mock import patch + +# Ensure cli directory is importable +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../cli"))) + +import generate_vaulted_credentials as gvc + + +class TestGenerateVaultedCredentials(unittest.TestCase): + def setUp(self): + # Create temporary directory structure for a fake role and inventory + self.temp_dir = tempfile.mkdtemp() + self.role_path = Path(self.temp_dir) / "roles" / "docker-demoapp" + self.meta_path = self.role_path / "meta" + self.meta_path.mkdir(parents=True) + + # Define schema with no "applications" root (direct app-specific structure) + self.schema = { + "credentials": { + "shared_secret": { + "description": "A shared secret", + "algorithm": "sha256", + "validation": "^[a-f0-9]{64}$" + }, + "postgresql_secret": { + "description": "Postgres password", + "algorithm": "bcrypt", + "validation": "^\\$2[aby]\\$.{56}$" + } + } + } + + with open(self.meta_path / "schema.yml", "w") as f: + yaml.dump(self.schema, f) + + # Create an empty inventory file + self.inventory_path = Path(self.temp_dir) / "host_vars" / "testhost.yml" + self.inventory_path.parent.mkdir(parents=True) + with open(self.inventory_path, "w") as f: + f.write("") + + self.vault_mock = "$ANSIBLE_VAULT;1.1;AES256\nmockedvaultdata==" + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_apply_schema_creates_vaulted_credentials(self): + schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml") + inventory_data = gvc.load_yaml_file(self.inventory_path) + + with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt: + mock_encrypt.return_value = self.vault_mock + updated = gvc.apply_schema_to_inventory( + schema=schema_data, + inventory_data=inventory_data, + app_id="demoapp", + overrides={}, + vault_password_file="dummy", + ask_vault_pass=False + ) + + # Expect credentials to be written under applications.demoapp + self.assertIn("applications", updated) + self.assertIn("demoapp", updated["applications"]) + creds = updated["applications"]["demoapp"]["credentials"] + self.assertIn("shared_secret", creds) + self.assertIn("postgresql_secret", creds) + + for key in creds: + self.assertTrue(str(creds[key]).startswith("!vault") or "$ANSIBLE_VAULT" in str(creds[key])) + + def test_existing_key_prompts_before_overwriting(self): + # Pre-populate the inventory with one value + pre_existing = { + "applications": { + "demoapp": { + "credentials": { + "shared_secret": "unchanged" + } + } + } + } + gvc.save_yaml_file(self.inventory_path, pre_existing) + + schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml") + inventory_data = gvc.load_yaml_file(self.inventory_path) + + with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt, \ + patch("builtins.input", return_value="n"): + mock_encrypt.return_value = self.vault_mock + updated = gvc.apply_schema_to_inventory( + schema=schema_data, + inventory_data=inventory_data, + app_id="demoapp", + overrides={}, + vault_password_file="dummy", + ask_vault_pass=False + ) + + # Value should remain unchanged + self.assertEqual(updated["applications"]["demoapp"]["credentials"]["shared_secret"], "unchanged") + + def test_set_override_applies_correctly(self): + schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml") + inventory_data = gvc.load_yaml_file(self.inventory_path) + + override_value = "custom-override-value" + override_key = "credentials.shared_secret" + + # Patch vault encryption to just return the plaintext prefixed as mock + with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt: + mock_encrypt.side_effect = lambda val, name, **kwargs: f"$ANSIBLE_VAULT;1.1;AES256\n{val}" + updated = gvc.apply_schema_to_inventory( + schema=schema_data, + inventory_data=inventory_data, + app_id="demoapp", + overrides={override_key: override_value}, + vault_password_file="dummy", + ask_vault_pass=False + ) + + actual = updated["applications"]["demoapp"]["credentials"]["shared_secret"] + self.assertIn(override_value, str(actual), "The override value was not used during encryption.") + +if __name__ == "__main__": + unittest.main()