Files
computer-playbook/tests/unit/module_utils/manager/test_inventory_manager.py
Kevin Veen-Birkenbach c0e26275f8 Refactor defaults generation, credential creation, and inventory management
### Overview
This commit introduces a broad set of improvements across the defaults
generator, credential creation subsystem, inventory creation workflow,
and InventoryManager core logic.

### Major Changes
- Support empty or  config/main.yml in defaults generator and ensure that
  applications with empty configs are still included in defaults_applications.
- Add '--snippet' and '--allow-empty-plain' modes to create/credentials.py
  with non-destructive merging and correct plain-secret handling.
- Ensure empty strings for 'plain' credentials are never encrypted.
- Update InventoryManager to fully support allow_empty_plain and prevent
  accidental overwriting or encrypting existing VaultScalar or dict values.
- Add full-size implementation of cli/create/inventory.py including
  dynamic inventory building, role filtering, host_vars management, and
  parallelised credential snippet generation.
- Fix schemas (Magento, Nextcloud, OAuth2-Proxy, keyboard-color, etc.) to
  align with the new credential model and avoid test failures.
- Improve get_app_conf consistency by ensuring credentials.* paths are
  always resolvable for applications even when config/main.yml is empty.

### Added Test Coverage
- Unit tests for defaults generator handling empty configs.
- Full test suite for create/inventory.py including merge logic and
  vault-safe host_vars loading.
- Extensive tests for InventoryManager: plain-secret behavior,
  vault handling, and recursion logic.
- Update or remove outdated tests referencing old schema behaviour.

### Context
This commit is associated with a refactoring and debugging session documented here:
https://chatgpt.com/share/692ec0e1-5018-800f-b568-d09a53e9d0ee
2025-12-02 11:54:55 +01:00

375 lines
14 KiB
Python

import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest import mock
# Make project root importable so that `module_utils` can be imported
ROOT_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../..")
)
sys.path.insert(0, ROOT_DIR)
from module_utils.manager.inventory import InventoryManager # type: ignore
from module_utils.handler.vault import VaultScalar # type: ignore
class TestInventoryManager(unittest.TestCase):
def test_load_application_id_missing_exits(self):
"""
If vars/main.yml does not contain application_id, InventoryManager
must print an error and exit with code 1.
"""
with tempfile.TemporaryDirectory() as tmpdir:
role_path = Path(tmpdir) / "role"
inv_path = Path(tmpdir) / "inventory.yml"
role_path.mkdir(parents=True, exist_ok=True)
(role_path / "schema").mkdir(parents=True, exist_ok=True)
(role_path / "vars").mkdir(parents=True, exist_ok=True)
(role_path / "config").mkdir(parents=True, exist_ok=True)
# Dummy files so that Path comparisons in the fake loader work
(role_path / "schema" / "main.yml").write_text("{}", encoding="utf-8")
(role_path / "vars" / "main.yml").write_text("{}", encoding="utf-8")
(role_path / "config" / "main.yml").write_text("{}", encoding="utf-8")
inv_path.write_text("{}", encoding="utf-8")
inventory_path = inv_path
def fake_load_yaml(path):
p = Path(path)
if p == inventory_path:
return {}
if p == role_path / "schema" / "main.yml":
return {}
if p == role_path / "vars" / "main.yml":
# Missing application_id on purpose
return {}
if p == role_path / "config" / "main.yml":
return {"features": {}}
return {}
with mock.patch(
"module_utils.manager.inventory.YamlHandler.load_yaml",
side_effect=fake_load_yaml,
), mock.patch(
"module_utils.manager.inventory.VaultHandler"
):
with self.assertRaises(SystemExit) as ctx:
InventoryManager(
role_path=role_path,
inventory_path=inventory_path,
vault_pw="dummy",
overrides={},
)
self.assertEqual(ctx.exception.code, 1)
def test_plain_without_override_and_allow_empty_plain_exits(self):
"""
For a `plain` algorithm credential, if no override is provided and
allow_empty_plain=False, recurse_credentials/apply_schema must exit.
"""
with tempfile.TemporaryDirectory() as tmpdir:
role_path = Path(tmpdir) / "role"
inv_path = Path(tmpdir) / "inventory.yml"
role_path.mkdir(parents=True, exist_ok=True)
(role_path / "schema").mkdir(parents=True, exist_ok=True)
(role_path / "vars").mkdir(parents=True, exist_ok=True)
(role_path / "config").mkdir(parents=True, exist_ok=True)
inv_path.write_text("{}", encoding="utf-8")
inventory_path = inv_path
schema_data = {
"credentials": {
"api_key": {
"description": "API key",
"algorithm": "plain",
"validation": {},
}
}
}
def fake_load_yaml(path):
p = Path(path)
if p == inventory_path:
return {"applications": {}}
if p == role_path / "schema" / "main.yml":
return schema_data
if p == role_path / "vars" / "main.yml":
return {"application_id": "app_test"}
if p == role_path / "config" / "main.yml":
return {"features": {}}
return {}
with mock.patch(
"module_utils.manager.inventory.YamlHandler.load_yaml",
side_effect=fake_load_yaml,
), mock.patch(
"module_utils.manager.inventory.VaultHandler"
):
mgr = InventoryManager(
role_path=role_path,
inventory_path=inventory_path,
vault_pw="dummy",
overrides={}, # no plain override
allow_empty_plain=False,
)
with self.assertRaises(SystemExit) as ctx:
mgr.apply_schema()
self.assertEqual(ctx.exception.code, 1)
def test_plain_with_allow_empty_plain_sets_empty_string_unencrypted(self):
"""
For a `plain` algorithm credential, if no override is provided and
allow_empty_plain=True, the credential should be set to an empty string
and must NOT be encrypted.
"""
with tempfile.TemporaryDirectory() as tmpdir:
role_path = Path(tmpdir) / "role"
inv_path = Path(tmpdir) / "inventory.yml"
role_path.mkdir(parents=True, exist_ok=True)
(role_path / "schema").mkdir(parents=True, exist_ok=True)
(role_path / "vars").mkdir(parents=True, exist_ok=True)
(role_path / "config").mkdir(parents=True, exist_ok=True)
inv_path.write_text("{}", encoding="utf-8")
inventory_path = inv_path
schema_data = {
"credentials": {
"api_key": {
"description": "API key",
"algorithm": "plain",
"validation": {},
}
}
}
def fake_load_yaml(path):
p = Path(path)
if p == inventory_path:
return {"applications": {}}
if p == role_path / "schema" / "main.yml":
return schema_data
if p == role_path / "vars" / "main.yml":
return {"application_id": "app_test"}
if p == role_path / "config" / "main.yml":
return {"features": {}}
return {}
with mock.patch(
"module_utils.manager.inventory.YamlHandler.load_yaml",
side_effect=fake_load_yaml,
), mock.patch(
"module_utils.manager.inventory.VaultHandler"
) as mock_vault_cls:
# VaultHandler instance
mock_vault = mock_vault_cls.return_value
mock_vault.encrypt_string.return_value = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTED"
mgr = InventoryManager(
role_path=role_path,
inventory_path=inventory_path,
vault_pw="dummy",
overrides={}, # no override for plain
allow_empty_plain=True,
)
inv = mgr.apply_schema()
apps = inv.get("applications", {})
app_block = apps.get("app_test", {})
creds = app_block.get("credentials", {})
# api_key must be present and must be a literal empty string
self.assertIn("api_key", creds)
self.assertEqual(creds["api_key"], "")
# Empty string must not trigger encryption
mock_vault.encrypt_string.assert_not_called()
def test_non_plain_algorithm_encrypts_and_sets_vaultscalar(self):
"""
For non-plain algorithms, recurse_credentials must generate a value
and encrypt it into a VaultScalar, unless an existing VaultScalar
is already present.
"""
with tempfile.TemporaryDirectory() as tmpdir:
role_path = Path(tmpdir) / "role"
inv_path = Path(tmpdir) / "inventory.yml"
role_path.mkdir(parents=True, exist_ok=True)
(role_path / "schema").mkdir(parents=True, exist_ok=True)
(role_path / "vars").mkdir(parents=True, exist_ok=True)
(role_path / "config").mkdir(parents=True, exist_ok=True)
inv_path.write_text("{}", encoding="utf-8")
inventory_path = inv_path
schema_data = {
"credentials": {
"api_key": {
"description": "API key",
"algorithm": "random_hex_16",
"validation": {},
}
}
}
def fake_load_yaml(path):
p = Path(path)
if p == inventory_path:
return {"applications": {}}
if p == role_path / "schema" / "main.yml":
return schema_data
if p == role_path / "vars" / "main.yml":
return {"application_id": "app_test"}
if p == role_path / "config" / "main.yml":
return {"features": {}}
return {}
fake_snippet = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTEDVALUE"
with mock.patch(
"module_utils.manager.inventory.YamlHandler.load_yaml",
side_effect=fake_load_yaml,
), mock.patch(
"module_utils.manager.inventory.VaultHandler"
) as mock_vault_cls, mock.patch.object(
InventoryManager,
"generate_value",
return_value="PLAINVAL",
):
mock_vault = mock_vault_cls.return_value
mock_vault.encrypt_string.return_value = fake_snippet
mgr = InventoryManager(
role_path=role_path,
inventory_path=inventory_path,
vault_pw="dummy",
overrides={},
allow_empty_plain=False,
)
inv = mgr.apply_schema()
apps = inv.get("applications", {})
app_block = apps.get("app_test", {})
creds = app_block.get("credentials", {})
self.assertIn("api_key", creds)
value = creds["api_key"]
# api_key must be a VaultScalar
self.assertIsInstance(value, VaultScalar)
# Its underlying body should contain the vault header line
self.assertIn("$ANSIBLE_VAULT", str(value))
# Encryption must have been called with generated plaintext and key
mock_vault.encrypt_string.assert_called_once_with("PLAINVAL", "api_key")
def test_recurse_skips_existing_dict_and_vaultscalar(self):
"""
If the destination already contains:
- a dict for a credential key, or
- a VaultScalar for a credential key,
recurse_credentials must skip re-encryption and leave existing values
untouched.
"""
with tempfile.TemporaryDirectory() as tmpdir:
role_path = Path(tmpdir) / "role"
inv_path = Path(tmpdir) / "inventory.yml"
role_path.mkdir(parents=True, exist_ok=True)
(role_path / "schema").mkdir(parents=True, exist_ok=True)
(role_path / "vars").mkdir(parents=True, exist_ok=True)
(role_path / "config").mkdir(parents=True, exist_ok=True)
inv_path.write_text("{}", encoding="utf-8")
inventory_path = inv_path
# Existing credentials in inventory
existing_vault = VaultScalar("EXISTING_BODY")
existing_dict = {"nested": "value"}
inventory_data = {
"applications": {
"app_test": {
"credentials": {
"already_vaulted": existing_vault,
"complex": existing_dict,
}
}
}
}
schema_data = {
"credentials": {
"already_vaulted": {
"description": "Vaulted",
"algorithm": "random_hex_16",
"validation": {},
},
"complex": {
"description": "Complex dict",
"algorithm": "random_hex_16",
"validation": {},
},
}
}
def fake_load_yaml(path):
p = Path(path)
if p == inventory_path:
return inventory_data
if p == role_path / "schema" / "main.yml":
return schema_data
if p == role_path / "vars" / "main.yml":
return {"application_id": "app_test"}
if p == role_path / "config" / "main.yml":
return {"features": {}}
return {}
with mock.patch(
"module_utils.manager.inventory.YamlHandler.load_yaml",
side_effect=fake_load_yaml,
), mock.patch(
"module_utils.manager.inventory.VaultHandler"
) as mock_vault_cls, mock.patch.object(
InventoryManager,
"generate_value",
return_value="IGNORED",
):
mock_vault = mock_vault_cls.return_value
mock_vault.encrypt_string.side_effect = AssertionError(
"encrypt_string should not be called for existing VaultScalar/dict"
)
mgr = InventoryManager(
role_path=role_path,
inventory_path=inventory_path,
vault_pw="dummy",
overrides={},
allow_empty_plain=False,
)
inv = mgr.apply_schema()
apps = inv.get("applications", {})
app_block = apps.get("app_test", {})
creds = app_block.get("credentials", {})
# Both keys must still be present
self.assertIn("already_vaulted", creds)
self.assertIn("complex", creds)
# Types and values must be preserved
self.assertIs(creds["already_vaulted"], existing_vault)
self.assertIs(creds["complex"], existing_dict)
if __name__ == "__main__":
unittest.main()