import unittest from unittest.mock import patch, MagicMock from pathlib import Path import base64 import re # Import target module import module_utils.manager.inventory as inv_mod class TestInventoryManager(unittest.TestCase): def setUp(self): # Dummy paths (no real files will be read) self.role_path = Path("/tmp/role") self.inv_path = Path("/tmp/inventory.yml") self.vault_pw = "secret" # Minimal default data self.inventory_yaml = {"applications": {}} self.config_yaml = { "features": { "central_database": True, "oauth2": True, } } # Schema with one autogenerated key, one plain key, and one nested dict self.schema_yaml = { "credentials": { "etherpad_api_key": { "description": "API key", "algorithm": "random_hex_32", "validation": {"regex": "^[0-9a-f]{64}$"}, }, "plain_needed": { "description": "Needs --set", "algorithm": "plain", "validation": {}, }, "skip_dict": { "description": "will be skipped if dict", "algorithm": "random_hex_16", "validation": {}, } }, "non_credentials": { # not encrypted, just copied "flag": True } } # Dummy VaultHandler output self.vault_snippet = "VAULT|\n MAGIC\n SECRET\n" # Mock YamlHandler self.yaml_loader = MagicMock() def _yaml_side_effect(pathlike): p = str(pathlike) if p.endswith("/vars/main.yml"): return {"application_id": "web-app-bigbluebutton"} if p.endswith("/schema/main.yml"): return self.schema_yaml if p.endswith("/config/main.yml"): return self.config_yaml if p == str(self.inv_path): return self.inventory_yaml return {} self.yaml_loader.side_effect = _yaml_side_effect # Dummy VaultScalar (only type matters) class _DummyVaultScalar(str): pass self.DummyVaultScalar = _DummyVaultScalar # Dummy VaultHandler self.vault_handler_mock = MagicMock() self.vault_handler_mock.encrypt_string.return_value = self.vault_snippet # Start patches self.p_yaml = patch.object(inv_mod.YamlHandler, "load_yaml", self.yaml_loader) self.p_vault_scalar = patch.object(inv_mod, "VaultScalar", self.DummyVaultScalar) self.p_vault_handler_ctor = patch.object(inv_mod, "VaultHandler", return_value=self.vault_handler_mock) self.p_yaml.start() self.p_vault_scalar.start() self.p_vault_handler_ctor.start() def tearDown(self): self.p_yaml.stop() self.p_vault_scalar.stop() self.p_vault_handler_ctor.stop() def _make_manager(self, overrides=None): return inv_mod.InventoryManager( role_path=self.role_path, inventory_path=self.inv_path, vault_pw=self.vault_pw, overrides=overrides or {} ) # ---------- Tests ---------- def test_load_application_id_missing_exits(self): # Vars/main.yml without application_id def _yaml_side_effect_missing(pathlike): p = str(pathlike) if p.endswith("/vars/main.yml"): return {} # missing if p.endswith("/schema/main.yml"): return self.schema_yaml if p.endswith("/config/main.yml"): return self.config_yaml if p == str(self.inv_path): return self.inventory_yaml return {} self.yaml_loader.side_effect = _yaml_side_effect_missing with self.assertRaises(SystemExit): inv_mod.InventoryManager( role_path=self.role_path, inventory_path=self.inv_path, vault_pw=self.vault_pw, overrides={} ) def test_apply_schema_with_features_and_plain_override(self): overrides = {"credentials.plain_needed": "my-plain-secret"} mgr = self._make_manager(overrides=overrides) out = mgr.apply_schema() app = out["applications"]["web-app-bigbluebutton"] creds = app["credentials"] # Feature-based values SHOULD BE PLAIN STRINGS in Option B self.assertIsInstance(creds["database_password"], str) self.assertIsInstance(creds["oauth2_proxy_cookie_secret"], str) # Schema-driven secret (autogenerated) SHOULD be vaulted self.assertIsInstance(creds["etherpad_api_key"], self.DummyVaultScalar) # Plain secret via override SHOULD be vaulted self.assertIsInstance(creds["plain_needed"], self.DummyVaultScalar) # Non-credentials section is just copied self.assertEqual(app["non_credentials"]["flag"], True) # encrypt_string should be called at least for 'etherpad_api_key' and 'plain_needed' self.assertGreaterEqual(self.vault_handler_mock.encrypt_string.call_count, 2) # Body extracted correctly for vaulted item self.assertEqual(str(creds["etherpad_api_key"]), "MAGIC\nSECRET") def test_plain_without_override_exits(self): mgr = self._make_manager(overrides={}) with self.assertRaises(SystemExit): mgr.apply_schema() def test_skip_if_value_is_dict_or_vaultscalar(self): # Pretend inventory already has dict and VaultScalar self.inventory_yaml = { "applications": { "web-app-bigbluebutton": { "credentials": { "skip_dict": {"already": "a dict"}, "etherpad_api_key": self.DummyVaultScalar("EXISTING"), } } } } overrides = {"credentials.plain_needed": "x"} mgr = self._make_manager(overrides=overrides) out = mgr.apply_schema() creds = out["applications"]["web-app-bigbluebutton"]["credentials"] # dict preserved self.assertIsInstance(creds["skip_dict"], dict) self.assertEqual(creds["skip_dict"]["already"], "a dict") # VaultScalar preserved self.assertIsInstance(creds["etherpad_api_key"], self.DummyVaultScalar) self.assertEqual(str(creds["etherpad_api_key"]), "EXISTING") def test_generate_value_variants(self): mgr = self._make_manager() rh = mgr.generate_value("random_hex") self.assertTrue(re.fullmatch(r"[0-9a-f]{128}", rh)) rh32 = mgr.generate_value("random_hex_32") self.assertTrue(re.fullmatch(r"[0-9a-f]{64}", rh32)) rh16 = mgr.generate_value("random_hex_16") self.assertTrue(re.fullmatch(r"[0-9a-f]{32}", rh16)) sha256 = mgr.generate_value("sha256") self.assertTrue(re.fullmatch(r"[0-9a-f]{64}", sha256)) sha1 = mgr.generate_value("sha1") self.assertTrue(re.fullmatch(r"[0-9a-f]{40}", sha1)) b64p = mgr.generate_value("base64_prefixed_32") self.assertTrue(b64p.startswith("base64:")) decoded = base64.b64decode(b64p.split("base64:", 1)[1].encode()) self.assertEqual(len(decoded), 32) alnum = mgr.generate_value("alphanumeric") self.assertEqual(len(alnum), 64) self.assertTrue(re.fullmatch(r"[A-Za-z0-9]{64}", alnum)) bcrypt_val = mgr.generate_value("bcrypt") self.assertNotIn("$", bcrypt_val) self.assertGreater(len(bcrypt_val), 20) undef = mgr.generate_value("does_not_exist") self.assertEqual(undef, "undefined") if __name__ == "__main__": unittest.main()