computer-playbook/tests/unit/test_inventory_manager.py

144 lines
5.2 KiB
Python

import unittest
import sys
import os
import tempfile
import shutil
from pathlib import Path
from unittest.mock import patch
# Ensure the cli package is on sys.path
sys.path.insert(
0,
os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../cli")
),
)
from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar
from cli.utils.manager.inventory import InventoryManager
class TestInventoryManager(unittest.TestCase):
def setUp(self):
# Create a temporary directory for role and inventory files
self.tmpdir = Path(tempfile.mkdtemp())
# Patch YamlHandler.load_yaml
self.load_yaml_patcher = patch.object(
YamlHandler,
'load_yaml',
side_effect=self.fake_load_yaml
)
self.load_yaml_patcher.start()
# Patch VaultHandler.encrypt_string with correct signature
self.encrypt_patcher = patch.object(
VaultHandler,
'encrypt_string',
new=lambda self, plain, key: f"{key}: !vault |\n encrypted_{plain}"
)
self.encrypt_patcher.start()
def tearDown(self):
# Stop patchers
patch.stopall()
# Remove temporary directory
shutil.rmtree(self.tmpdir)
def fake_load_yaml(self, path):
path = Path(path)
# Return schema for meta/schema.yml
if path.match("*/meta/schema.yml"):
return {
"credentials": {
"plain_cred": {"description": "desc", "algorithm": "plain", "validation": {}},
"nested": {"inner": {"description": "desc2", "algorithm": "sha256", "validation": {}}}
}
}
# Return application_id for vars/main.yml
if path.match("*/vars/main.yml"):
return {"application_id": "testapp"}
# Return feature flags for vars/configuration.yml
if path.match("*/vars/configuration.yml"):
return {"features": {"central_database": True}}
# Return empty inventory for inventory.yml
if path.name == "inventory.yml":
return {}
raise FileNotFoundError(f"Unexpected load_yaml path: {path}")
def test_load_application_id_missing(self):
"""Loading application_id without it should raise SystemExit."""
role_dir = self.tmpdir / "role"
(role_dir / "vars").mkdir(parents=True)
(role_dir / "vars" / "main.yml").write_text("{}")
with patch.object(YamlHandler, 'load_yaml', return_value={}):
with self.assertRaises(SystemExit):
InventoryManager(role_dir, self.tmpdir / "inventory.yml", "pw", {}).load_application_id(role_dir)
def test_generate_value_algorithms(self):
"""Verify generate_value produces outputs of the expected form."""
# Bypass __init__ to avoid YAML loading
im = InventoryManager.__new__(InventoryManager)
# random_hex → 64 bytes hex = 128 chars
hex_val = im.generate_value("random_hex")
self.assertEqual(len(hex_val), 128)
self.assertTrue(all(c in "0123456789abcdef" for c in hex_val))
# sha256 → 64 hex chars
sha256_val = im.generate_value("sha256")
self.assertEqual(len(sha256_val), 64)
# sha1 → 40 hex chars
sha1_val = im.generate_value("sha1")
self.assertEqual(len(sha1_val), 40)
# bcrypt → starts with bcrypt prefix
bcrypt_val = im.generate_value("bcrypt")
self.assertTrue(bcrypt_val.startswith("$2"))
# alphanumeric → 64 chars
alnum = im.generate_value("alphanumeric")
self.assertEqual(len(alnum), 64)
self.assertTrue(alnum.isalnum())
# base64_prefixed_32 → starts with "base64:"
b64 = im.generate_value("base64_prefixed_32")
self.assertTrue(b64.startswith("base64:"))
def test_apply_schema_and_recurse(self):
"""
apply_schema should inject central_database password and vault nested.inner
"""
# Setup role directory
role_dir = self.tmpdir / "role"
(role_dir / "meta").mkdir(parents=True)
(role_dir / "vars").mkdir(parents=True)
# Create empty inventory.yml
inv_file = self.tmpdir / "inventory.yml"
inv_file.write_text(" ")
# Provide override for plain_cred to avoid SystemExit
overrides = {'credentials.plain_cred': 'OVERRIDE_PLAIN'}
# Instantiate manager with overrides
mgr = InventoryManager(role_dir, inv_file, "pw", overrides=overrides)
# Patch generate_value locally for predictable values
with patch.object(InventoryManager, 'generate_value', lambda self, alg: f"GEN_{alg}"):
result = mgr.apply_schema()
apps = result["applications"]["testapp"]
# central_database entry
self.assertEqual(apps["credentials"]["database_password"], "GEN_alphanumeric")
# plain_cred vaulted from override
self.assertIsInstance(apps["credentials"]["plain_cred"], VaultScalar)
# nested.inner should not be vaulted due to code's prefix check
self.assertEqual(
apps["credentials"]["nested"]["inner"],
{"description": "desc2", "algorithm": "sha256", "validation": {}},
)