mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-06-25 03:38:59 +02:00
144 lines
5.2 KiB
Python
144 lines
5.2 KiB
Python
import unittest
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
# Ensure the cli package is on sys.path
|
|
sys.path.insert(
|
|
0,
|
|
os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "../../cli")
|
|
),
|
|
)
|
|
|
|
from utils.handler.yaml import YamlHandler
|
|
from utils.handler.vault import VaultHandler, VaultScalar
|
|
from cli.utils.manager.inventory import InventoryManager
|
|
|
|
|
|
class TestInventoryManager(unittest.TestCase):
|
|
def setUp(self):
|
|
# Create a temporary directory for role and inventory files
|
|
self.tmpdir = Path(tempfile.mkdtemp())
|
|
|
|
# Patch YamlHandler.load_yaml
|
|
self.load_yaml_patcher = patch.object(
|
|
YamlHandler,
|
|
'load_yaml',
|
|
side_effect=self.fake_load_yaml
|
|
)
|
|
self.load_yaml_patcher.start()
|
|
|
|
# Patch VaultHandler.encrypt_string with correct signature
|
|
self.encrypt_patcher = patch.object(
|
|
VaultHandler,
|
|
'encrypt_string',
|
|
new=lambda self, plain, key: f"{key}: !vault |\n encrypted_{plain}"
|
|
)
|
|
self.encrypt_patcher.start()
|
|
|
|
def tearDown(self):
|
|
# Stop patchers
|
|
patch.stopall()
|
|
# Remove temporary directory
|
|
shutil.rmtree(self.tmpdir)
|
|
|
|
def fake_load_yaml(self, path):
|
|
path = Path(path)
|
|
# Return schema for meta/schema.yml
|
|
if path.match("*/meta/schema.yml"):
|
|
return {
|
|
"credentials": {
|
|
"plain_cred": {"description": "desc", "algorithm": "plain", "validation": {}},
|
|
"nested": {"inner": {"description": "desc2", "algorithm": "sha256", "validation": {}}}
|
|
}
|
|
}
|
|
# Return application_id for vars/main.yml
|
|
if path.match("*/vars/main.yml"):
|
|
return {"application_id": "testapp"}
|
|
# Return feature flags for vars/configuration.yml
|
|
if path.match("*/vars/configuration.yml"):
|
|
return {"features": {"central_database": True}}
|
|
# Return empty inventory for inventory.yml
|
|
if path.name == "inventory.yml":
|
|
return {}
|
|
raise FileNotFoundError(f"Unexpected load_yaml path: {path}")
|
|
|
|
def test_load_application_id_missing(self):
|
|
"""Loading application_id without it should raise SystemExit."""
|
|
role_dir = self.tmpdir / "role"
|
|
(role_dir / "vars").mkdir(parents=True)
|
|
(role_dir / "vars" / "main.yml").write_text("{}")
|
|
|
|
with patch.object(YamlHandler, 'load_yaml', return_value={}):
|
|
with self.assertRaises(SystemExit):
|
|
InventoryManager(role_dir, self.tmpdir / "inventory.yml", "pw", {}).load_application_id(role_dir)
|
|
|
|
def test_generate_value_algorithms(self):
|
|
"""Verify generate_value produces outputs of the expected form."""
|
|
# Bypass __init__ to avoid YAML loading
|
|
im = InventoryManager.__new__(InventoryManager)
|
|
|
|
# random_hex → 64 bytes hex = 128 chars
|
|
hex_val = im.generate_value("random_hex")
|
|
self.assertEqual(len(hex_val), 128)
|
|
self.assertTrue(all(c in "0123456789abcdef" for c in hex_val))
|
|
|
|
# sha256 → 64 hex chars
|
|
sha256_val = im.generate_value("sha256")
|
|
self.assertEqual(len(sha256_val), 64)
|
|
|
|
# sha1 → 40 hex chars
|
|
sha1_val = im.generate_value("sha1")
|
|
self.assertEqual(len(sha1_val), 40)
|
|
|
|
# bcrypt → starts with bcrypt prefix
|
|
bcrypt_val = im.generate_value("bcrypt")
|
|
self.assertTrue(bcrypt_val.startswith("$2"))
|
|
|
|
# alphanumeric → 64 chars
|
|
alnum = im.generate_value("alphanumeric")
|
|
self.assertEqual(len(alnum), 64)
|
|
self.assertTrue(alnum.isalnum())
|
|
|
|
# base64_prefixed_32 → starts with "base64:"
|
|
b64 = im.generate_value("base64_prefixed_32")
|
|
self.assertTrue(b64.startswith("base64:"))
|
|
|
|
def test_apply_schema_and_recurse(self):
|
|
"""
|
|
apply_schema should inject central_database password and vault nested.inner
|
|
"""
|
|
# Setup role directory
|
|
role_dir = self.tmpdir / "role"
|
|
(role_dir / "meta").mkdir(parents=True)
|
|
(role_dir / "vars").mkdir(parents=True)
|
|
|
|
# Create empty inventory.yml
|
|
inv_file = self.tmpdir / "inventory.yml"
|
|
inv_file.write_text(" ")
|
|
|
|
# Provide override for plain_cred to avoid SystemExit
|
|
overrides = {'credentials.plain_cred': 'OVERRIDE_PLAIN'}
|
|
|
|
# Instantiate manager with overrides
|
|
mgr = InventoryManager(role_dir, inv_file, "pw", overrides=overrides)
|
|
|
|
# Patch generate_value locally for predictable values
|
|
with patch.object(InventoryManager, 'generate_value', lambda self, alg: f"GEN_{alg}"):
|
|
result = mgr.apply_schema()
|
|
|
|
apps = result["applications"]["testapp"]
|
|
# central_database entry
|
|
self.assertEqual(apps["credentials"]["database_password"], "GEN_alphanumeric")
|
|
# plain_cred vaulted from override
|
|
self.assertIsInstance(apps["credentials"]["plain_cred"], VaultScalar)
|
|
# nested.inner should not be vaulted due to code's prefix check
|
|
self.assertEqual(
|
|
apps["credentials"]["nested"]["inner"],
|
|
{"description": "desc2", "algorithm": "sha256", "validation": {}},
|
|
)
|