mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-12-13 12:44:38 +00:00
Refactor defaults generation, credential creation, and inventory management
### Overview This commit introduces a broad set of improvements across the defaults generator, credential creation subsystem, inventory creation workflow, and InventoryManager core logic. ### Major Changes - Support empty or config/main.yml in defaults generator and ensure that applications with empty configs are still included in defaults_applications. - Add '--snippet' and '--allow-empty-plain' modes to create/credentials.py with non-destructive merging and correct plain-secret handling. - Ensure empty strings for 'plain' credentials are never encrypted. - Update InventoryManager to fully support allow_empty_plain and prevent accidental overwriting or encrypting existing VaultScalar or dict values. - Add full-size implementation of cli/create/inventory.py including dynamic inventory building, role filtering, host_vars management, and parallelised credential snippet generation. - Fix schemas (Magento, Nextcloud, OAuth2-Proxy, keyboard-color, etc.) to align with the new credential model and avoid test failures. - Improve get_app_conf consistency by ensuring credentials.* paths are always resolvable for applications even when config/main.yml is empty. ### Added Test Coverage - Unit tests for defaults generator handling empty configs. - Full test suite for create/inventory.py including merge logic and vault-safe host_vars loading. - Extensive tests for InventoryManager: plain-secret behavior, vault handling, and recursion logic. - Update or remove outdated tests referencing old schema behaviour. ### Context This commit is associated with a refactoring and debugging session documented here: https://chatgpt.com/share/692ec0e1-5018-800f-b568-d09a53e9d0ee
This commit is contained in:
0
tests/unit/module_utils/manager/__init__.py
Normal file
0
tests/unit/module_utils/manager/__init__.py
Normal file
374
tests/unit/module_utils/manager/test_inventory_manager.py
Normal file
374
tests/unit/module_utils/manager/test_inventory_manager.py
Normal file
@@ -0,0 +1,374 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
# Make project root importable so that `module_utils` can be imported
|
||||
ROOT_DIR = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), "../../../..")
|
||||
)
|
||||
sys.path.insert(0, ROOT_DIR)
|
||||
|
||||
from module_utils.manager.inventory import InventoryManager # type: ignore
|
||||
from module_utils.handler.vault import VaultScalar # type: ignore
|
||||
|
||||
|
||||
class TestInventoryManager(unittest.TestCase):
|
||||
def test_load_application_id_missing_exits(self):
|
||||
"""
|
||||
If vars/main.yml does not contain application_id, InventoryManager
|
||||
must print an error and exit with code 1.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
role_path = Path(tmpdir) / "role"
|
||||
inv_path = Path(tmpdir) / "inventory.yml"
|
||||
|
||||
role_path.mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "schema").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "vars").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "config").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Dummy files so that Path comparisons in the fake loader work
|
||||
(role_path / "schema" / "main.yml").write_text("{}", encoding="utf-8")
|
||||
(role_path / "vars" / "main.yml").write_text("{}", encoding="utf-8")
|
||||
(role_path / "config" / "main.yml").write_text("{}", encoding="utf-8")
|
||||
inv_path.write_text("{}", encoding="utf-8")
|
||||
|
||||
inventory_path = inv_path
|
||||
|
||||
def fake_load_yaml(path):
|
||||
p = Path(path)
|
||||
if p == inventory_path:
|
||||
return {}
|
||||
if p == role_path / "schema" / "main.yml":
|
||||
return {}
|
||||
if p == role_path / "vars" / "main.yml":
|
||||
# Missing application_id on purpose
|
||||
return {}
|
||||
if p == role_path / "config" / "main.yml":
|
||||
return {"features": {}}
|
||||
return {}
|
||||
|
||||
with mock.patch(
|
||||
"module_utils.manager.inventory.YamlHandler.load_yaml",
|
||||
side_effect=fake_load_yaml,
|
||||
), mock.patch(
|
||||
"module_utils.manager.inventory.VaultHandler"
|
||||
):
|
||||
with self.assertRaises(SystemExit) as ctx:
|
||||
InventoryManager(
|
||||
role_path=role_path,
|
||||
inventory_path=inventory_path,
|
||||
vault_pw="dummy",
|
||||
overrides={},
|
||||
)
|
||||
self.assertEqual(ctx.exception.code, 1)
|
||||
|
||||
def test_plain_without_override_and_allow_empty_plain_exits(self):
|
||||
"""
|
||||
For a `plain` algorithm credential, if no override is provided and
|
||||
allow_empty_plain=False, recurse_credentials/apply_schema must exit.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
role_path = Path(tmpdir) / "role"
|
||||
inv_path = Path(tmpdir) / "inventory.yml"
|
||||
|
||||
role_path.mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "schema").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "vars").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "config").mkdir(parents=True, exist_ok=True)
|
||||
inv_path.write_text("{}", encoding="utf-8")
|
||||
|
||||
inventory_path = inv_path
|
||||
|
||||
schema_data = {
|
||||
"credentials": {
|
||||
"api_key": {
|
||||
"description": "API key",
|
||||
"algorithm": "plain",
|
||||
"validation": {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def fake_load_yaml(path):
|
||||
p = Path(path)
|
||||
if p == inventory_path:
|
||||
return {"applications": {}}
|
||||
if p == role_path / "schema" / "main.yml":
|
||||
return schema_data
|
||||
if p == role_path / "vars" / "main.yml":
|
||||
return {"application_id": "app_test"}
|
||||
if p == role_path / "config" / "main.yml":
|
||||
return {"features": {}}
|
||||
return {}
|
||||
|
||||
with mock.patch(
|
||||
"module_utils.manager.inventory.YamlHandler.load_yaml",
|
||||
side_effect=fake_load_yaml,
|
||||
), mock.patch(
|
||||
"module_utils.manager.inventory.VaultHandler"
|
||||
):
|
||||
mgr = InventoryManager(
|
||||
role_path=role_path,
|
||||
inventory_path=inventory_path,
|
||||
vault_pw="dummy",
|
||||
overrides={}, # no plain override
|
||||
allow_empty_plain=False,
|
||||
)
|
||||
with self.assertRaises(SystemExit) as ctx:
|
||||
mgr.apply_schema()
|
||||
self.assertEqual(ctx.exception.code, 1)
|
||||
|
||||
def test_plain_with_allow_empty_plain_sets_empty_string_unencrypted(self):
|
||||
"""
|
||||
For a `plain` algorithm credential, if no override is provided and
|
||||
allow_empty_plain=True, the credential should be set to an empty string
|
||||
and must NOT be encrypted.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
role_path = Path(tmpdir) / "role"
|
||||
inv_path = Path(tmpdir) / "inventory.yml"
|
||||
|
||||
role_path.mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "schema").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "vars").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "config").mkdir(parents=True, exist_ok=True)
|
||||
inv_path.write_text("{}", encoding="utf-8")
|
||||
|
||||
inventory_path = inv_path
|
||||
|
||||
schema_data = {
|
||||
"credentials": {
|
||||
"api_key": {
|
||||
"description": "API key",
|
||||
"algorithm": "plain",
|
||||
"validation": {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def fake_load_yaml(path):
|
||||
p = Path(path)
|
||||
if p == inventory_path:
|
||||
return {"applications": {}}
|
||||
if p == role_path / "schema" / "main.yml":
|
||||
return schema_data
|
||||
if p == role_path / "vars" / "main.yml":
|
||||
return {"application_id": "app_test"}
|
||||
if p == role_path / "config" / "main.yml":
|
||||
return {"features": {}}
|
||||
return {}
|
||||
|
||||
with mock.patch(
|
||||
"module_utils.manager.inventory.YamlHandler.load_yaml",
|
||||
side_effect=fake_load_yaml,
|
||||
), mock.patch(
|
||||
"module_utils.manager.inventory.VaultHandler"
|
||||
) as mock_vault_cls:
|
||||
# VaultHandler instance
|
||||
mock_vault = mock_vault_cls.return_value
|
||||
mock_vault.encrypt_string.return_value = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTED"
|
||||
|
||||
mgr = InventoryManager(
|
||||
role_path=role_path,
|
||||
inventory_path=inventory_path,
|
||||
vault_pw="dummy",
|
||||
overrides={}, # no override for plain
|
||||
allow_empty_plain=True,
|
||||
)
|
||||
inv = mgr.apply_schema()
|
||||
|
||||
apps = inv.get("applications", {})
|
||||
app_block = apps.get("app_test", {})
|
||||
creds = app_block.get("credentials", {})
|
||||
|
||||
# api_key must be present and must be a literal empty string
|
||||
self.assertIn("api_key", creds)
|
||||
self.assertEqual(creds["api_key"], "")
|
||||
|
||||
# Empty string must not trigger encryption
|
||||
mock_vault.encrypt_string.assert_not_called()
|
||||
|
||||
def test_non_plain_algorithm_encrypts_and_sets_vaultscalar(self):
|
||||
"""
|
||||
For non-plain algorithms, recurse_credentials must generate a value
|
||||
and encrypt it into a VaultScalar, unless an existing VaultScalar
|
||||
is already present.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
role_path = Path(tmpdir) / "role"
|
||||
inv_path = Path(tmpdir) / "inventory.yml"
|
||||
|
||||
role_path.mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "schema").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "vars").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "config").mkdir(parents=True, exist_ok=True)
|
||||
inv_path.write_text("{}", encoding="utf-8")
|
||||
|
||||
inventory_path = inv_path
|
||||
|
||||
schema_data = {
|
||||
"credentials": {
|
||||
"api_key": {
|
||||
"description": "API key",
|
||||
"algorithm": "random_hex_16",
|
||||
"validation": {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def fake_load_yaml(path):
|
||||
p = Path(path)
|
||||
if p == inventory_path:
|
||||
return {"applications": {}}
|
||||
if p == role_path / "schema" / "main.yml":
|
||||
return schema_data
|
||||
if p == role_path / "vars" / "main.yml":
|
||||
return {"application_id": "app_test"}
|
||||
if p == role_path / "config" / "main.yml":
|
||||
return {"features": {}}
|
||||
return {}
|
||||
|
||||
fake_snippet = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTEDVALUE"
|
||||
|
||||
with mock.patch(
|
||||
"module_utils.manager.inventory.YamlHandler.load_yaml",
|
||||
side_effect=fake_load_yaml,
|
||||
), mock.patch(
|
||||
"module_utils.manager.inventory.VaultHandler"
|
||||
) as mock_vault_cls, mock.patch.object(
|
||||
InventoryManager,
|
||||
"generate_value",
|
||||
return_value="PLAINVAL",
|
||||
):
|
||||
mock_vault = mock_vault_cls.return_value
|
||||
mock_vault.encrypt_string.return_value = fake_snippet
|
||||
|
||||
mgr = InventoryManager(
|
||||
role_path=role_path,
|
||||
inventory_path=inventory_path,
|
||||
vault_pw="dummy",
|
||||
overrides={},
|
||||
allow_empty_plain=False,
|
||||
)
|
||||
inv = mgr.apply_schema()
|
||||
|
||||
apps = inv.get("applications", {})
|
||||
app_block = apps.get("app_test", {})
|
||||
creds = app_block.get("credentials", {})
|
||||
|
||||
self.assertIn("api_key", creds)
|
||||
value = creds["api_key"]
|
||||
|
||||
# api_key must be a VaultScalar
|
||||
self.assertIsInstance(value, VaultScalar)
|
||||
# Its underlying body should contain the vault header line
|
||||
self.assertIn("$ANSIBLE_VAULT", str(value))
|
||||
|
||||
# Encryption must have been called with generated plaintext and key
|
||||
mock_vault.encrypt_string.assert_called_once_with("PLAINVAL", "api_key")
|
||||
|
||||
def test_recurse_skips_existing_dict_and_vaultscalar(self):
|
||||
"""
|
||||
If the destination already contains:
|
||||
- a dict for a credential key, or
|
||||
- a VaultScalar for a credential key,
|
||||
recurse_credentials must skip re-encryption and leave existing values
|
||||
untouched.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
role_path = Path(tmpdir) / "role"
|
||||
inv_path = Path(tmpdir) / "inventory.yml"
|
||||
|
||||
role_path.mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "schema").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "vars").mkdir(parents=True, exist_ok=True)
|
||||
(role_path / "config").mkdir(parents=True, exist_ok=True)
|
||||
inv_path.write_text("{}", encoding="utf-8")
|
||||
|
||||
inventory_path = inv_path
|
||||
|
||||
# Existing credentials in inventory
|
||||
existing_vault = VaultScalar("EXISTING_BODY")
|
||||
existing_dict = {"nested": "value"}
|
||||
|
||||
inventory_data = {
|
||||
"applications": {
|
||||
"app_test": {
|
||||
"credentials": {
|
||||
"already_vaulted": existing_vault,
|
||||
"complex": existing_dict,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
schema_data = {
|
||||
"credentials": {
|
||||
"already_vaulted": {
|
||||
"description": "Vaulted",
|
||||
"algorithm": "random_hex_16",
|
||||
"validation": {},
|
||||
},
|
||||
"complex": {
|
||||
"description": "Complex dict",
|
||||
"algorithm": "random_hex_16",
|
||||
"validation": {},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
def fake_load_yaml(path):
|
||||
p = Path(path)
|
||||
if p == inventory_path:
|
||||
return inventory_data
|
||||
if p == role_path / "schema" / "main.yml":
|
||||
return schema_data
|
||||
if p == role_path / "vars" / "main.yml":
|
||||
return {"application_id": "app_test"}
|
||||
if p == role_path / "config" / "main.yml":
|
||||
return {"features": {}}
|
||||
return {}
|
||||
|
||||
with mock.patch(
|
||||
"module_utils.manager.inventory.YamlHandler.load_yaml",
|
||||
side_effect=fake_load_yaml,
|
||||
), mock.patch(
|
||||
"module_utils.manager.inventory.VaultHandler"
|
||||
) as mock_vault_cls, mock.patch.object(
|
||||
InventoryManager,
|
||||
"generate_value",
|
||||
return_value="IGNORED",
|
||||
):
|
||||
mock_vault = mock_vault_cls.return_value
|
||||
mock_vault.encrypt_string.side_effect = AssertionError(
|
||||
"encrypt_string should not be called for existing VaultScalar/dict"
|
||||
)
|
||||
|
||||
mgr = InventoryManager(
|
||||
role_path=role_path,
|
||||
inventory_path=inventory_path,
|
||||
vault_pw="dummy",
|
||||
overrides={},
|
||||
allow_empty_plain=False,
|
||||
)
|
||||
inv = mgr.apply_schema()
|
||||
|
||||
apps = inv.get("applications", {})
|
||||
app_block = apps.get("app_test", {})
|
||||
creds = app_block.get("credentials", {})
|
||||
|
||||
# Both keys must still be present
|
||||
self.assertIn("already_vaulted", creds)
|
||||
self.assertIn("complex", creds)
|
||||
|
||||
# Types and values must be preserved
|
||||
self.assertIs(creds["already_vaulted"], existing_vault)
|
||||
self.assertIs(creds["complex"], existing_dict)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user