From c2eed82353cbd4e90c133052a6c2a04bd2c8ec30 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Thu, 15 May 2025 16:40:46 +0200 Subject: [PATCH] Updated tests --- .../unit/test_generate_vaulted_credentials.py | 192 ++++++------------ 1 file changed, 66 insertions(+), 126 deletions(-) diff --git a/tests/unit/test_generate_vaulted_credentials.py b/tests/unit/test_generate_vaulted_credentials.py index 1d2fda48..1b3329af 100644 --- a/tests/unit/test_generate_vaulted_credentials.py +++ b/tests/unit/test_generate_vaulted_credentials.py @@ -1,136 +1,76 @@ -import os -import sys -import tempfile -import unittest -import shutil -import yaml +# tests/unit/test_generate_vaulted_credentials.py + +import pytest +import sys, os from pathlib import Path -from unittest.mock import patch -# Ensure cli directory is importable -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../cli"))) +# 1) Add project root (two levels up) so 'cli' is on the path +PROJECT_ROOT = Path(__file__).parent.parent.parent.resolve() +sys.path.insert(0, str(PROJECT_ROOT)) -import generate_vaulted_credentials as gvc +# 2) Import from the cli package +import cli.generate_vaulted_credentials as gvc +class DummyProc: + def __init__(self, returncode, stdout, stderr=''): + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr -class TestGenerateVaultedCredentials(unittest.TestCase): - def setUp(self): - # Create temporary directory structure for a fake role and inventory - self.temp_dir = tempfile.mkdtemp() - self.role_path = Path(self.temp_dir) / "roles" / "docker-demoapp" - self.meta_path = self.role_path / "meta" - self.meta_path.mkdir(parents=True) +# Monkeypatch subprocess.run for encrypt_with_vault +@pytest.fixture(autouse=True) +def mock_subprocess_run(monkeypatch): + def fake_run(cmd, capture_output, text): + name = None + # find --name= in args + for arg in cmd: + if arg.startswith("--name="): + name = arg.split("=",1)[1] + val = cmd[ cmd.index(name) - 1 ] if name else "key" + # simulate Ansible output + snippet = f"{name or 'key'}: !vault |\n encrypted_{val}" + return DummyProc(0, snippet) + monkeypatch.setattr(gvc.subprocess, 'run', fake_run) - # Define schema with no "applications" root (direct app-specific structure) - self.schema = { - "credentials": { - "shared_secret": { - "description": "A shared secret", - "algorithm": "sha256", - "validation": "^[a-f0-9]{64}$" - }, - "postgresql_secret": { - "description": "Postgres password", - "algorithm": "bcrypt", - "validation": "^\\$2[aby]\\$.{56}$" - } - } - } +def test_wrap_existing_vaults(): + data = { + 'a': '$ANSIBLE_VAULT;1.1;AES256...blob', + 'b': {'c': 'normal', 'd': '$ANSIBLE_VAULT;1.1;AES256...other'}, + 'e': ['x', '$ANSIBLE_VAULT;1.1;AES256...list'] + } + wrapped = gvc.wrap_existing_vaults(data) + assert isinstance(wrapped['a'], gvc.VaultScalar) + assert isinstance(wrapped['b']['d'], gvc.VaultScalar) + assert isinstance(wrapped['e'][1], gvc.VaultScalar) + assert wrapped['b']['c'] == 'normal' + assert wrapped['e'][0] == 'x' - with open(self.meta_path / "schema.yml", "w") as f: - yaml.dump(self.schema, f) +@pytest.mark.parametrize("pairs,expected", [ + (['k=v'], {'k': 'v'}), + (['a.b=1', 'c=two'], {'a.b': '1', 'c': 'two'}), + (['noeq'], {}), +]) +def test_parse_overrides(pairs, expected): + assert gvc.parse_overrides(pairs) == expected - # Create an empty inventory file - self.inventory_path = Path(self.temp_dir) / "host_vars" / "testhost.yml" - self.inventory_path.parent.mkdir(parents=True) - with open(self.inventory_path, "w") as f: - f.write("") +def test_apply_schema_and_vault(tmp_path): + schema = { + 'cred': {'description':'d','algorithm':'plain','validation':{}}, + 'nested': {'inner': {'description':'d2','algorithm':'plain','validation':{}}} + } + inv = {} + updated = gvc.apply_schema(schema, inv, 'app', {}, 'pwfile') + apps = updated['applications']['app'] + assert isinstance(apps['cred'], gvc.VaultScalar) + assert isinstance(apps['nested']['inner'], gvc.VaultScalar) - self.vault_mock = "$ANSIBLE_VAULT;1.1;AES256\nmockedvaultdata==" +def test_encrypt_leaves_and_credentials(): + branch = {'p':'v','nested':{'q':'u'}} + gvc.encrypt_leaves(branch, 'pwfile') + assert isinstance(branch['p'], gvc.VaultScalar) + assert isinstance(branch['nested']['q'], gvc.VaultScalar) - def tearDown(self): - shutil.rmtree(self.temp_dir) - - def test_apply_schema_creates_vaulted_credentials(self): - schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml") - inventory_data = gvc.load_yaml_file(self.inventory_path) - - with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt: - mock_encrypt.return_value = self.vault_mock - updated = gvc.apply_schema_to_inventory( - schema=schema_data, - inventory_data=inventory_data, - app_id="demoapp", - overrides={}, - vault_password_file="dummy", - ask_vault_pass=False - ) - - # Expect credentials to be written under applications.demoapp - self.assertIn("applications", updated) - self.assertIn("demoapp", updated["applications"]) - creds = updated["applications"]["demoapp"]["credentials"] - self.assertIn("shared_secret", creds) - self.assertIn("postgresql_secret", creds) - - for key in creds: - self.assertTrue(str(creds[key]).startswith("!vault") or "$ANSIBLE_VAULT" in str(creds[key])) - - def test_existing_key_prompts_before_overwriting(self): - # Pre-populate the inventory with one value - pre_existing = { - "applications": { - "demoapp": { - "credentials": { - "shared_secret": "unchanged" - } - } - } - } - gvc.save_yaml_file(self.inventory_path, pre_existing) - - schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml") - inventory_data = gvc.load_yaml_file(self.inventory_path) - - with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt, \ - patch("builtins.input", return_value="n"): - mock_encrypt.return_value = self.vault_mock - updated = gvc.apply_schema_to_inventory( - schema=schema_data, - inventory_data=inventory_data, - app_id="demoapp", - overrides={}, - vault_password_file="dummy", - ask_vault_pass=False - ) - - # Value should remain unchanged - self.assertEqual(updated["applications"]["demoapp"]["credentials"]["shared_secret"], "unchanged") - - def test_set_override_applies_correctly(self): - schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml") - inventory_data = gvc.load_yaml_file(self.inventory_path) - - override_value = "custom-override-value" - override_key = "credentials.shared_secret" - - # 👇 Patch die Methode innerhalb des importierten Moduls gvc - with patch.object(gvc, "encrypt_with_vault") as mock_encrypt, \ - patch("builtins.input", return_value="n"): - mock_encrypt.side_effect = lambda val, name, *_args, **_kwargs: f"$ANSIBLE_VAULT;1.1;AES256\n{val}" - - updated = gvc.apply_schema_to_inventory( - schema=schema_data, - inventory_data=inventory_data, - app_id="demoapp", - overrides={override_key: override_value}, - vault_password_file="dummy", - ask_vault_pass=False - ) - - actual = updated["applications"]["demoapp"]["credentials"]["shared_secret"] - self.assertIn(override_value, str(actual), "The override value was not used during encryption.") - - -if __name__ == "__main__": - unittest.main() + inv = {'credentials':{'a':'b'}, 'x':{'credentials':{'c':'d'}}} + gvc.encrypt_credentials_branch(inv, 'pwfile') + assert isinstance(inv['credentials']['a'], gvc.VaultScalar) + assert isinstance(inv['x']['credentials']['c'], gvc.VaultScalar)