mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-05-18 18:50:32 +02:00
Updated tests
This commit is contained in:
parent
db095b77dc
commit
c2eed82353
@ -1,136 +1,76 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
import shutil
|
||||
import yaml
|
||||
# tests/unit/test_generate_vaulted_credentials.py
|
||||
|
||||
import pytest
|
||||
import sys, os
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
# Ensure cli directory is importable
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../cli")))
|
||||
# 1) Add project root (two levels up) so 'cli' is on the path
|
||||
PROJECT_ROOT = Path(__file__).parent.parent.parent.resolve()
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
import generate_vaulted_credentials as gvc
|
||||
# 2) Import from the cli package
|
||||
import cli.generate_vaulted_credentials as gvc
|
||||
|
||||
class DummyProc:
|
||||
def __init__(self, returncode, stdout, stderr=''):
|
||||
self.returncode = returncode
|
||||
self.stdout = stdout
|
||||
self.stderr = stderr
|
||||
|
||||
class TestGenerateVaultedCredentials(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Create temporary directory structure for a fake role and inventory
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.role_path = Path(self.temp_dir) / "roles" / "docker-demoapp"
|
||||
self.meta_path = self.role_path / "meta"
|
||||
self.meta_path.mkdir(parents=True)
|
||||
# Monkeypatch subprocess.run for encrypt_with_vault
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_subprocess_run(monkeypatch):
|
||||
def fake_run(cmd, capture_output, text):
|
||||
name = None
|
||||
# find --name=<key> in args
|
||||
for arg in cmd:
|
||||
if arg.startswith("--name="):
|
||||
name = arg.split("=",1)[1]
|
||||
val = cmd[ cmd.index(name) - 1 ] if name else "key"
|
||||
# simulate Ansible output
|
||||
snippet = f"{name or 'key'}: !vault |\n encrypted_{val}"
|
||||
return DummyProc(0, snippet)
|
||||
monkeypatch.setattr(gvc.subprocess, 'run', fake_run)
|
||||
|
||||
# Define schema with no "applications" root (direct app-specific structure)
|
||||
self.schema = {
|
||||
"credentials": {
|
||||
"shared_secret": {
|
||||
"description": "A shared secret",
|
||||
"algorithm": "sha256",
|
||||
"validation": "^[a-f0-9]{64}$"
|
||||
},
|
||||
"postgresql_secret": {
|
||||
"description": "Postgres password",
|
||||
"algorithm": "bcrypt",
|
||||
"validation": "^\\$2[aby]\\$.{56}$"
|
||||
}
|
||||
def test_wrap_existing_vaults():
|
||||
data = {
|
||||
'a': '$ANSIBLE_VAULT;1.1;AES256...blob',
|
||||
'b': {'c': 'normal', 'd': '$ANSIBLE_VAULT;1.1;AES256...other'},
|
||||
'e': ['x', '$ANSIBLE_VAULT;1.1;AES256...list']
|
||||
}
|
||||
wrapped = gvc.wrap_existing_vaults(data)
|
||||
assert isinstance(wrapped['a'], gvc.VaultScalar)
|
||||
assert isinstance(wrapped['b']['d'], gvc.VaultScalar)
|
||||
assert isinstance(wrapped['e'][1], gvc.VaultScalar)
|
||||
assert wrapped['b']['c'] == 'normal'
|
||||
assert wrapped['e'][0] == 'x'
|
||||
|
||||
@pytest.mark.parametrize("pairs,expected", [
|
||||
(['k=v'], {'k': 'v'}),
|
||||
(['a.b=1', 'c=two'], {'a.b': '1', 'c': 'two'}),
|
||||
(['noeq'], {}),
|
||||
])
|
||||
def test_parse_overrides(pairs, expected):
|
||||
assert gvc.parse_overrides(pairs) == expected
|
||||
|
||||
def test_apply_schema_and_vault(tmp_path):
|
||||
schema = {
|
||||
'cred': {'description':'d','algorithm':'plain','validation':{}},
|
||||
'nested': {'inner': {'description':'d2','algorithm':'plain','validation':{}}}
|
||||
}
|
||||
inv = {}
|
||||
updated = gvc.apply_schema(schema, inv, 'app', {}, 'pwfile')
|
||||
apps = updated['applications']['app']
|
||||
assert isinstance(apps['cred'], gvc.VaultScalar)
|
||||
assert isinstance(apps['nested']['inner'], gvc.VaultScalar)
|
||||
|
||||
with open(self.meta_path / "schema.yml", "w") as f:
|
||||
yaml.dump(self.schema, f)
|
||||
def test_encrypt_leaves_and_credentials():
|
||||
branch = {'p':'v','nested':{'q':'u'}}
|
||||
gvc.encrypt_leaves(branch, 'pwfile')
|
||||
assert isinstance(branch['p'], gvc.VaultScalar)
|
||||
assert isinstance(branch['nested']['q'], gvc.VaultScalar)
|
||||
|
||||
# Create an empty inventory file
|
||||
self.inventory_path = Path(self.temp_dir) / "host_vars" / "testhost.yml"
|
||||
self.inventory_path.parent.mkdir(parents=True)
|
||||
with open(self.inventory_path, "w") as f:
|
||||
f.write("")
|
||||
|
||||
self.vault_mock = "$ANSIBLE_VAULT;1.1;AES256\nmockedvaultdata=="
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_apply_schema_creates_vaulted_credentials(self):
|
||||
schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
|
||||
inventory_data = gvc.load_yaml_file(self.inventory_path)
|
||||
|
||||
with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt:
|
||||
mock_encrypt.return_value = self.vault_mock
|
||||
updated = gvc.apply_schema_to_inventory(
|
||||
schema=schema_data,
|
||||
inventory_data=inventory_data,
|
||||
app_id="demoapp",
|
||||
overrides={},
|
||||
vault_password_file="dummy",
|
||||
ask_vault_pass=False
|
||||
)
|
||||
|
||||
# Expect credentials to be written under applications.demoapp
|
||||
self.assertIn("applications", updated)
|
||||
self.assertIn("demoapp", updated["applications"])
|
||||
creds = updated["applications"]["demoapp"]["credentials"]
|
||||
self.assertIn("shared_secret", creds)
|
||||
self.assertIn("postgresql_secret", creds)
|
||||
|
||||
for key in creds:
|
||||
self.assertTrue(str(creds[key]).startswith("!vault") or "$ANSIBLE_VAULT" in str(creds[key]))
|
||||
|
||||
def test_existing_key_prompts_before_overwriting(self):
|
||||
# Pre-populate the inventory with one value
|
||||
pre_existing = {
|
||||
"applications": {
|
||||
"demoapp": {
|
||||
"credentials": {
|
||||
"shared_secret": "unchanged"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
gvc.save_yaml_file(self.inventory_path, pre_existing)
|
||||
|
||||
schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
|
||||
inventory_data = gvc.load_yaml_file(self.inventory_path)
|
||||
|
||||
with patch("generate_vaulted_credentials.encrypt_with_vault") as mock_encrypt, \
|
||||
patch("builtins.input", return_value="n"):
|
||||
mock_encrypt.return_value = self.vault_mock
|
||||
updated = gvc.apply_schema_to_inventory(
|
||||
schema=schema_data,
|
||||
inventory_data=inventory_data,
|
||||
app_id="demoapp",
|
||||
overrides={},
|
||||
vault_password_file="dummy",
|
||||
ask_vault_pass=False
|
||||
)
|
||||
|
||||
# Value should remain unchanged
|
||||
self.assertEqual(updated["applications"]["demoapp"]["credentials"]["shared_secret"], "unchanged")
|
||||
|
||||
def test_set_override_applies_correctly(self):
|
||||
schema_data = gvc.load_yaml_file(self.meta_path / "schema.yml")
|
||||
inventory_data = gvc.load_yaml_file(self.inventory_path)
|
||||
|
||||
override_value = "custom-override-value"
|
||||
override_key = "credentials.shared_secret"
|
||||
|
||||
# 👇 Patch die Methode innerhalb des importierten Moduls gvc
|
||||
with patch.object(gvc, "encrypt_with_vault") as mock_encrypt, \
|
||||
patch("builtins.input", return_value="n"):
|
||||
mock_encrypt.side_effect = lambda val, name, *_args, **_kwargs: f"$ANSIBLE_VAULT;1.1;AES256\n{val}"
|
||||
|
||||
updated = gvc.apply_schema_to_inventory(
|
||||
schema=schema_data,
|
||||
inventory_data=inventory_data,
|
||||
app_id="demoapp",
|
||||
overrides={override_key: override_value},
|
||||
vault_password_file="dummy",
|
||||
ask_vault_pass=False
|
||||
)
|
||||
|
||||
actual = updated["applications"]["demoapp"]["credentials"]["shared_secret"]
|
||||
self.assertIn(override_value, str(actual), "The override value was not used during encryption.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
inv = {'credentials':{'a':'b'}, 'x':{'credentials':{'c':'d'}}}
|
||||
gvc.encrypt_credentials_branch(inv, 'pwfile')
|
||||
assert isinstance(inv['credentials']['a'], gvc.VaultScalar)
|
||||
assert isinstance(inv['x']['credentials']['c'], gvc.VaultScalar)
|
||||
|
Loading…
x
Reference in New Issue
Block a user