diff --git a/module_utils/manager/inventory.py b/module_utils/manager/inventory.py index 7a1fafdc..75a9905e 100644 --- a/module_utils/manager/inventory.py +++ b/module_utils/manager/inventory.py @@ -142,7 +142,8 @@ class InventoryManager: """ if algorithm == "random_hex": return secrets.token_hex(64) - + if algorithm == "random_hex_32": + return secrets.token_hex(32) if algorithm == "sha256": return hashlib.sha256(secrets.token_bytes(32)).hexdigest() if algorithm == "sha1": diff --git a/roles/web-app-bigbluebutton/schema/main.yml b/roles/web-app-bigbluebutton/schema/main.yml index 8f90f369..fcee8ee6 100644 --- a/roles/web-app-bigbluebutton/schema/main.yml +++ b/roles/web-app-bigbluebutton/schema/main.yml @@ -5,7 +5,7 @@ credentials: validation: "^[a-f0-9]{64}$" etherpad_api_key: description: "API key for Etherpad integration" - algorithm: "plain" + algorithm: "random_hex_32" validation: "^[a-zA-Z0-9]{32}$" rails_secret: description: "Secret key for Rails backend" @@ -17,7 +17,7 @@ credentials: validation: "^\\$2[aby]\\$.{56}$" fsesl_password: description: "Password for FreeSWITCH ESL connection" - algorithm: "plain" + algorithm: "alphanumeric_32" validation: "^.{8,}$" turn_secret: description: "TURN server shared secret" diff --git a/tests/unit/module_utils/test_inventory_manager.py b/tests/unit/module_utils/test_inventory_manager.py new file mode 100644 index 00000000..b81e5340 --- /dev/null +++ b/tests/unit/module_utils/test_inventory_manager.py @@ -0,0 +1,220 @@ +import unittest +from unittest.mock import patch, MagicMock +from pathlib import Path +import base64 +import re + +# Import target module +import module_utils.manager.inventory as inv_mod + + +class TestInventoryManager(unittest.TestCase): + + def setUp(self): + # Dummy paths (no real files will be read) + self.role_path = Path("/tmp/role") + self.inv_path = Path("/tmp/inventory.yml") + self.vault_pw = "secret" + + # Minimal default data + self.inventory_yaml = {"applications": {}} + self.config_yaml = { + "features": { + "central_database": True, + "oauth2": True, + } + } + # Schema with one autogenerated key, one plain key, and one nested dict + self.schema_yaml = { + "credentials": { + "etherpad_api_key": { + "description": "API key", + "algorithm": "random_hex_32", + "validation": {"regex": "^[0-9a-f]{64}$"}, + }, + "plain_needed": { + "description": "Needs --set", + "algorithm": "plain", + "validation": {}, + }, + "skip_dict": { + "description": "will be skipped if dict", + "algorithm": "random_hex_16", + "validation": {}, + } + }, + "non_credentials": { # not encrypted, just copied + "flag": True + } + } + + # Dummy VaultHandler output + self.vault_snippet = "VAULT|\n MAGIC\n SECRET\n" + + # Mock YamlHandler + self.yaml_loader = MagicMock() + + def _yaml_side_effect(pathlike): + p = str(pathlike) + if p.endswith("/vars/main.yml"): + return {"application_id": "web-app-bigbluebutton"} + if p.endswith("/schema/main.yml"): + return self.schema_yaml + if p.endswith("/config/main.yml"): + return self.config_yaml + if p == str(self.inv_path): + return self.inventory_yaml + return {} + + self.yaml_loader.side_effect = _yaml_side_effect + + # Dummy VaultScalar (only type matters) + class _DummyVaultScalar(str): + pass + self.DummyVaultScalar = _DummyVaultScalar + + # Dummy VaultHandler + self.vault_handler_mock = MagicMock() + self.vault_handler_mock.encrypt_string.return_value = self.vault_snippet + + # Start patches + self.p_yaml = patch.object(inv_mod.YamlHandler, "load_yaml", self.yaml_loader) + self.p_vault_scalar = patch.object(inv_mod, "VaultScalar", self.DummyVaultScalar) + self.p_vault_handler_ctor = patch.object(inv_mod, "VaultHandler", return_value=self.vault_handler_mock) + + self.p_yaml.start() + self.p_vault_scalar.start() + self.p_vault_handler_ctor.start() + + def tearDown(self): + self.p_yaml.stop() + self.p_vault_scalar.stop() + self.p_vault_handler_ctor.stop() + + def _make_manager(self, overrides=None): + return inv_mod.InventoryManager( + role_path=self.role_path, + inventory_path=self.inv_path, + vault_pw=self.vault_pw, + overrides=overrides or {} + ) + + # ---------- Tests ---------- + + def test_load_application_id_missing_exits(self): + # Vars/main.yml without application_id + def _yaml_side_effect_missing(pathlike): + p = str(pathlike) + if p.endswith("/vars/main.yml"): + return {} # missing + if p.endswith("/schema/main.yml"): + return self.schema_yaml + if p.endswith("/config/main.yml"): + return self.config_yaml + if p == str(self.inv_path): + return self.inventory_yaml + return {} + self.yaml_loader.side_effect = _yaml_side_effect_missing + + with self.assertRaises(SystemExit): + inv_mod.InventoryManager( + role_path=self.role_path, + inventory_path=self.inv_path, + vault_pw=self.vault_pw, + overrides={} + ) + + def test_apply_schema_with_features_and_plain_override(self): + overrides = {"credentials.plain_needed": "my-plain-secret"} + mgr = self._make_manager(overrides=overrides) + out = mgr.apply_schema() + + app = out["applications"]["web-app-bigbluebutton"] + creds = app["credentials"] + + # Feature-based values SHOULD BE PLAIN STRINGS in Option B + self.assertIsInstance(creds["database_password"], str) + self.assertIsInstance(creds["oauth2_proxy_cookie_secret"], str) + + # Schema-driven secret (autogenerated) SHOULD be vaulted + self.assertIsInstance(creds["etherpad_api_key"], self.DummyVaultScalar) + + # Plain secret via override SHOULD be vaulted + self.assertIsInstance(creds["plain_needed"], self.DummyVaultScalar) + + # Non-credentials section is just copied + self.assertEqual(app["non_credentials"]["flag"], True) + + # encrypt_string should be called at least for 'etherpad_api_key' and 'plain_needed' + self.assertGreaterEqual(self.vault_handler_mock.encrypt_string.call_count, 2) + + # Body extracted correctly for vaulted item + self.assertEqual(str(creds["etherpad_api_key"]), "MAGIC\nSECRET") + + def test_plain_without_override_exits(self): + mgr = self._make_manager(overrides={}) + with self.assertRaises(SystemExit): + mgr.apply_schema() + + def test_skip_if_value_is_dict_or_vaultscalar(self): + # Pretend inventory already has dict and VaultScalar + self.inventory_yaml = { + "applications": { + "web-app-bigbluebutton": { + "credentials": { + "skip_dict": {"already": "a dict"}, + "etherpad_api_key": self.DummyVaultScalar("EXISTING"), + } + } + } + } + overrides = {"credentials.plain_needed": "x"} + mgr = self._make_manager(overrides=overrides) + out = mgr.apply_schema() + creds = out["applications"]["web-app-bigbluebutton"]["credentials"] + + # dict preserved + self.assertIsInstance(creds["skip_dict"], dict) + self.assertEqual(creds["skip_dict"]["already"], "a dict") + + # VaultScalar preserved + self.assertIsInstance(creds["etherpad_api_key"], self.DummyVaultScalar) + self.assertEqual(str(creds["etherpad_api_key"]), "EXISTING") + + def test_generate_value_variants(self): + mgr = self._make_manager() + + rh = mgr.generate_value("random_hex") + self.assertTrue(re.fullmatch(r"[0-9a-f]{128}", rh)) + + rh32 = mgr.generate_value("random_hex_32") + self.assertTrue(re.fullmatch(r"[0-9a-f]{64}", rh32)) + + rh16 = mgr.generate_value("random_hex_16") + self.assertTrue(re.fullmatch(r"[0-9a-f]{32}", rh16)) + + sha256 = mgr.generate_value("sha256") + self.assertTrue(re.fullmatch(r"[0-9a-f]{64}", sha256)) + + sha1 = mgr.generate_value("sha1") + self.assertTrue(re.fullmatch(r"[0-9a-f]{40}", sha1)) + + b64p = mgr.generate_value("base64_prefixed_32") + self.assertTrue(b64p.startswith("base64:")) + decoded = base64.b64decode(b64p.split("base64:", 1)[1].encode()) + self.assertEqual(len(decoded), 32) + + alnum = mgr.generate_value("alphanumeric") + self.assertEqual(len(alnum), 64) + self.assertTrue(re.fullmatch(r"[A-Za-z0-9]{64}", alnum)) + + bcrypt_val = mgr.generate_value("bcrypt") + self.assertNotIn("$", bcrypt_val) + self.assertGreater(len(bcrypt_val), 20) + + undef = mgr.generate_value("does_not_exist") + self.assertEqual(undef, "undefined") + + +if __name__ == "__main__": + unittest.main()