Reorgenized test structure and added validation of inventory before deployment

This commit is contained in:
2025-07-04 01:14:00 +02:00
parent fe04f1955f
commit a9f55579a2
37 changed files with 241 additions and 42 deletions

View File

View File

@@ -0,0 +1,101 @@
import os
import sys
import unittest
from unittest import mock
# Ensure cli module is importable
dir_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), '../../../cli')
)
sys.path.insert(0, dir_path)
# Import functions and classes to test
from create_credentials import ask_for_confirmation, main
from utils.handler.vault import VaultHandler, VaultScalar
import subprocess
import tempfile
import yaml
class TestCreateCredentials(unittest.TestCase):
def test_ask_for_confirmation_yes(self):
with mock.patch('builtins.input', return_value='y'):
self.assertTrue(ask_for_confirmation('test_key'))
def test_ask_for_confirmation_no(self):
with mock.patch('builtins.input', return_value='n'):
self.assertFalse(ask_for_confirmation('test_key'))
def test_vault_encrypt_string_success(self):
handler = VaultHandler('dummy_pw_file')
# Mock subprocess.run to simulate successful vault encryption
fake_output = 'Encrypted data'
completed = subprocess.CompletedProcess(
args=['ansible-vault'], returncode=0, stdout=fake_output, stderr=''
)
with mock.patch('subprocess.run', return_value=completed) as proc_run:
result = handler.encrypt_string('plain_val', 'name')
proc_run.assert_called_once()
self.assertEqual(result, fake_output)
def test_vault_encrypt_string_failure(self):
handler = VaultHandler('dummy_pw_file')
# Mock subprocess.run to simulate failure
completed = subprocess.CompletedProcess(
args=['ansible-vault'], returncode=1, stdout='', stderr='error')
with mock.patch('subprocess.run', return_value=completed):
with self.assertRaises(RuntimeError):
handler.encrypt_string('plain_val', 'name')
def test_main_overrides_and_file_writing(self):
# Setup temporary files for role-path vars and inventory
with tempfile.TemporaryDirectory() as tmpdir:
role_path = os.path.join(tmpdir, 'role')
os.makedirs(os.path.join(role_path, 'vars'))
os.makedirs(os.path.join(role_path, 'meta'))
# Create vars/main.yml with application_id
main_vars = {'application_id': 'app_test'}
with open(os.path.join(role_path, 'vars', 'main.yml'), 'w') as f:
yaml.dump(main_vars, f)
# Create vars/configuration.yml with features disabled
config = {'features': {'central_database': False}}
with open(os.path.join(role_path, 'vars', 'configuration.yml'), 'w') as f:
yaml.dump(config, f)
# Create schema.yml defining plain credential
schema = {'credentials': {'api_key': {'description': 'API key', 'algorithm': 'plain', 'validation': {}}}}
with open(os.path.join(role_path, 'meta', 'schema.yml'), 'w') as f:
yaml.dump(schema, f)
# Prepare inventory file
inventory_file = os.path.join(tmpdir, 'inventory.yml')
with open(inventory_file, 'w') as f:
yaml.dump({}, f)
vault_pw_file = os.path.join(tmpdir, 'pw.txt')
with open(vault_pw_file, 'w') as f:
f.write('pw')
# Simulate ansible-vault encrypt_string output for api_key
fake_snippet = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTEDVALUE"
completed = subprocess.CompletedProcess(
args=['ansible-vault'], returncode=0, stdout=fake_snippet, stderr=''
)
with mock.patch('subprocess.run', return_value=completed):
# Run main with override for credentials.api_key and force to skip prompt
sys.argv = [
'create_credentials.py',
'--role-path', role_path,
'--inventory-file', inventory_file,
'--vault-password-file', vault_pw_file,
'--set', 'credentials.api_key=SECRET',
'--force'
]
# Should complete without error
main()
# Verify inventory file updated with vaulted api_key
data = yaml.safe_load(open(inventory_file))
creds = data['applications']['app_test']['credentials']
self.assertIn('api_key', creds)
# VaultScalar serializes to a vault block, safe_load returns a string containing the vault header
self.assertIsInstance(creds['api_key'], str)
self.assertTrue(creds['api_key'].lstrip().startswith('$ANSIBLE_VAULT'))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,65 @@
import os
import unittest
import tempfile
import shutil
import yaml
from pathlib import Path
import subprocess
class TestGenerateDefaultApplicationsUsers(unittest.TestCase):
def setUp(self):
# Setup temporary roles directory
self.temp_dir = Path(tempfile.mkdtemp())
self.roles_dir = self.temp_dir / "roles"
self.roles_dir.mkdir()
# Sample role with users meta
self.role = self.roles_dir / "docker-app-with-users"
(self.role / "vars").mkdir(parents=True)
(self.role / "meta").mkdir(parents=True)
# Write application_id and configuration
(self.role / "vars" / "main.yml").write_text("application_id: app_with_users\n")
(self.role / "vars" / "configuration.yml").write_text("setting: value\n")
# Write users meta
users_meta = {
'users': {
'alice': {'uid': 2001, 'gid': 2001},
'bob': {'uid': 2002, 'gid': 2002}
}
}
with (self.role / "meta" / "users.yml").open('w', encoding='utf-8') as f:
yaml.dump(users_meta, f)
# Output file path
self.output_file = self.temp_dir / "output.yml"
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_users_injection(self):
"""
When a users.yml exists with defined users, the script should inject a 'users'
mapping in the generated YAML, mapping each username to a Jinja2 reference.
"""
script_path = Path(__file__).resolve().parents[3] / "cli" / "generate_applications.py"
result = subprocess.run([
"python3", str(script_path),
"--roles-dir", str(self.roles_dir),
"--output-file", str(self.output_file)
], capture_output=True, text=True)
self.assertEqual(result.returncode, 0, msg=result.stderr)
data = yaml.safe_load(self.output_file.read_text())
apps = data.get('defaults_applications', {})
# Only the app with users should be present
self.assertIn('app_with_users', apps)
# 'users' section should be present and correct
users_map = apps['app_with_users']['users']
expected = {'alice': '{{ users["alice"] }}', 'bob': '{{ users["bob"] }}'}
self.assertEqual(users_map, expected)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,55 @@
import os
import unittest
import tempfile
import shutil
import yaml
from pathlib import Path
import subprocess
class TestGenerateDefaultApplications(unittest.TestCase):
def setUp(self):
# Create temp role structure
self.temp_dir = Path(tempfile.mkdtemp())
self.roles_dir = self.temp_dir / "roles"
self.roles_dir.mkdir()
# Sample role
self.sample_role = self.roles_dir / "docker-testapp"
(self.sample_role / "vars").mkdir(parents=True)
# Write application_id and configuration
(self.sample_role / "vars" / "main.yml").write_text("application_id: testapp\n")
(self.sample_role / "vars" / "configuration.yml").write_text("foo: bar\nbaz: 123\n")
# Output file path
self.output_file = self.temp_dir / "group_vars" / "all" / "04_applications.yml"
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_script_generates_expected_yaml(self):
script_path = Path(__file__).resolve().parent.parent.parent.parent / "cli" / "generate_applications.py"
result = subprocess.run(
[
"python3", str(script_path),
"--roles-dir", str(self.roles_dir),
"--output-file", str(self.output_file)
],
capture_output=True,
text=True,
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertTrue(self.output_file.exists(), "Output file was not created.")
data = yaml.safe_load(self.output_file.read_text())
self.assertIn("defaults_applications", data)
self.assertIn("testapp", data["defaults_applications"])
self.assertEqual(data["defaults_applications"]["testapp"]["foo"], "bar")
self.assertEqual(data["defaults_applications"]["testapp"]["baz"], 123)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,85 @@
#!/usr/bin/env python3
import os
import sys
import unittest
import tempfile
import shutil
import yaml
# Adjust path to include cli/ folder
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../..", "cli")))
from generate_playbook import build_dependency_graph, topological_sort, generate_playbook_entries
class TestGeneratePlaybook(unittest.TestCase):
def setUp(self):
# Create a temporary directory to simulate roles
self.temp_dir = tempfile.mkdtemp()
# Define mock roles and dependencies
self.roles = {
'role-a': {'run_after': [], 'application_id': 'a'},
'role-b': {'run_after': ['role-a'], 'application_id': 'b'},
'role-c': {'run_after': ['role-b'], 'application_id': 'c'},
'role-d': {'run_after': [], 'application_id': 'd'},
}
for role_name, meta in self.roles.items():
role_path = os.path.join(self.temp_dir, role_name)
os.makedirs(os.path.join(role_path, 'meta'), exist_ok=True)
os.makedirs(os.path.join(role_path, 'vars'), exist_ok=True)
meta_file = {
'galaxy_info': {
'run_after': meta['run_after']
}
}
vars_file = {
'application_id': meta['application_id']
}
with open(os.path.join(role_path, 'meta', 'main.yml'), 'w') as f:
yaml.dump(meta_file, f)
with open(os.path.join(role_path, 'vars', 'main.yml'), 'w') as f:
yaml.dump(vars_file, f)
def tearDown(self):
# Clean up the temporary directory
shutil.rmtree(self.temp_dir)
def test_dependency_graph_and_sort(self):
graph, in_degree, roles = build_dependency_graph(self.temp_dir)
self.assertIn('role-a', graph)
self.assertIn('role-b', graph)
self.assertEqual(graph['role-a'], ['role-b'])
self.assertEqual(graph['role-b'], ['role-c'])
self.assertEqual(graph['role-c'], [])
self.assertEqual(in_degree['role-c'], 1)
self.assertEqual(in_degree['role-b'], 1)
self.assertEqual(in_degree['role-a'], 0)
self.assertEqual(in_degree['role-d'], 0)
sorted_roles = topological_sort(graph, in_degree)
# The expected order must be a → b → c, d can be anywhere before or after
self.assertTrue(sorted_roles.index('role-a') < sorted_roles.index('role-b') < sorted_roles.index('role-c'))
def test_generate_playbook_entries(self):
entries = generate_playbook_entries(self.temp_dir)
text = ''.join(entries)
self.assertIn("setup a", text)
self.assertIn("setup b", text)
self.assertIn("setup c", text)
self.assertIn("setup d", text)
# Order must preserve run_after
a_index = text.index("setup a")
b_index = text.index("setup b")
c_index = text.index("setup c")
self.assertTrue(a_index < b_index < c_index)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,136 @@
import os
import sys
import unittest
import tempfile
import shutil
import yaml
from collections import OrderedDict
# Add cli/ to import path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../..", "cli")))
import generate_users
class TestGenerateUsers(unittest.TestCase):
def test_build_users_auto_increment_and_overrides(self):
defs = {
'alice': {},
'bob': {'uid': 2000, 'email': 'bob@custom.com', 'description': 'Custom user'},
'carol': {}
}
users = generate_users.build_users(
defs=defs,
primary_domain='example.com',
start_id=1001,
become_pwd='pw'
)
# alice should get uid/gid 1001
self.assertEqual(users['alice']['uid'], 1001)
self.assertEqual(users['alice']['gid'], 1001)
self.assertEqual(users['alice']['email'], 'alice@example.com')
# bob overrides
self.assertEqual(users['bob']['uid'], 2000)
self.assertEqual(users['bob']['gid'], 2000)
self.assertEqual(users['bob']['email'], 'bob@custom.com')
self.assertIn('description', users['bob'])
# carol should get next free id = 1002
self.assertEqual(users['carol']['uid'], 1002)
self.assertEqual(users['carol']['gid'], 1002)
def test_build_users_default_lookup_password(self):
"""
When no 'password' override is provided,
the become_pwd lookup template string must be used as the password.
"""
defs = {'frank': {}}
lookup_template = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}'
users = generate_users.build_users(
defs=defs,
primary_domain='example.com',
start_id=1001,
become_pwd=lookup_template
)
self.assertEqual(
users['frank']['password'],
lookup_template,
"The lookup template string was not correctly applied as the default password"
)
def test_build_users_override_password(self):
"""
When a 'password' override is provided,
that custom password must be used instead of become_pwd.
"""
defs = {'eva': {'password': 'custompw'}}
lookup_template = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}'
users = generate_users.build_users(
defs=defs,
primary_domain='example.com',
start_id=1001,
become_pwd=lookup_template
)
self.assertEqual(
users['eva']['password'],
'custompw',
"The override password was not correctly applied"
)
def test_build_users_duplicate_override_uid(self):
defs = {
'u1': {'uid': 1001},
'u2': {'uid': 1001}
}
with self.assertRaises(ValueError):
generate_users.build_users(defs, 'ex.com', 1001, 'pw')
def test_build_users_shared_gid_allowed(self):
# Allow two users to share the same GID when one overrides gid and the other uses that as uid
defs = {
'a': {'uid': 1500},
'b': {'gid': 1500}
}
users = generate_users.build_users(defs, 'ex.com', 1500, 'pw')
# Both should have gid 1500
self.assertEqual(users['a']['gid'], 1500)
self.assertEqual(users['b']['gid'], 1500)
def test_build_users_duplicate_username_email(self):
defs = {
'u1': {'username': 'same', 'email': 'same@ex.com'},
'u2': {'username': 'same'}
}
# second user with same username should raise
with self.assertRaises(ValueError):
generate_users.build_users(defs, 'ex.com', 1001, 'pw')
def test_dictify_converts_ordereddict(self):
od = generate_users.OrderedDict([('a', 1), ('b', {'c': 2})])
result = generate_users.dictify(OrderedDict(od))
self.assertIsInstance(result, dict)
self.assertEqual(result, {'a': 1, 'b': {'c': 2}})
def test_load_user_defs_and_conflict(self):
# create temp roles structure
tmp = tempfile.mkdtemp()
try:
os.makedirs(os.path.join(tmp, 'role1/meta'))
os.makedirs(os.path.join(tmp, 'role2/meta'))
# role1 defines user x
with open(os.path.join(tmp, 'role1/meta/users.yml'), 'w') as f:
yaml.safe_dump({'users': {'x': {'email': 'x@a'}}}, f)
# role2 defines same user x with same value
with open(os.path.join(tmp, 'role2/meta/users.yml'), 'w') as f:
yaml.safe_dump({'users': {'x': {'email': 'x@a'}}}, f)
defs = generate_users.load_user_defs(tmp)
self.assertIn('x', defs)
# now conflict definition
with open(os.path.join(tmp, 'role2/meta/users.yml'), 'w') as f:
yaml.safe_dump({'users': {'x': {'email': 'x@b'}}}, f)
with self.assertRaises(ValueError):
generate_users.load_user_defs(tmp)
finally:
shutil.rmtree(tmp)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,156 @@
import unittest
import sys
import os
import tempfile
import shutil
from pathlib import Path
from unittest.mock import patch
# Ensure the cli package is on sys.path
sys.path.insert(
0,
os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../cli")
),
)
from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar
from cli.utils.manager.inventory import InventoryManager
class TestInventoryManager(unittest.TestCase):
def setUp(self):
# Create a temporary directory for role and inventory files
self.tmpdir = Path(tempfile.mkdtemp())
# Patch YamlHandler.load_yaml
self.load_yaml_patcher = patch.object(
YamlHandler,
'load_yaml',
side_effect=self.fake_load_yaml
)
self.load_yaml_patcher.start()
# Patch VaultHandler.encrypt_string with correct signature
self.encrypt_patcher = patch.object(
VaultHandler,
'encrypt_string',
new=lambda self, plain, key: f"{key}: !vault |\n encrypted_{plain}"
)
self.encrypt_patcher.start()
def tearDown(self):
# Stop patchers
patch.stopall()
# Remove temporary directory
shutil.rmtree(self.tmpdir)
def fake_load_yaml(self, path):
path = Path(path)
# Return schema for meta/schema.yml
if path.match("*/meta/schema.yml"):
return {
"credentials": {
"plain_cred": {"description": "desc", "algorithm": "plain", "validation": {}},
"nested": {"inner": {"description": "desc2", "algorithm": "sha256", "validation": {}}}
}
}
# Return application_id for vars/main.yml
if path.match("*/vars/main.yml"):
return {"application_id": "testapp"}
# Return feature flags for vars/configuration.yml
if path.match("*/vars/configuration.yml"):
return {"features": {"central_database": True}}
# Return empty inventory for inventory.yml
if path.name == "inventory.yml":
return {}
raise FileNotFoundError(f"Unexpected load_yaml path: {path}")
def test_load_application_id_missing(self):
"""Loading application_id without it should raise SystemExit."""
role_dir = self.tmpdir / "role"
(role_dir / "vars").mkdir(parents=True)
(role_dir / "vars" / "main.yml").write_text("{}")
with patch.object(YamlHandler, 'load_yaml', return_value={}):
with self.assertRaises(SystemExit):
InventoryManager(role_dir, self.tmpdir / "inventory.yml", "pw", {}).load_application_id(role_dir)
def test_generate_value_algorithms(self):
"""Verify generate_value produces outputs of the expected form and contains no dollar signs."""
# Bypass __init__ to avoid YAML loading
im = InventoryManager.__new__(InventoryManager)
# random_hex → 64 bytes hex = 128 chars
hex_val = im.generate_value("random_hex")
self.assertEqual(len(hex_val), 128)
self.assertTrue(all(c in "0123456789abcdef" for c in hex_val))
self.assertNotIn('$', hex_val) # no dollar sign
# sha256 → 64 hex chars
sha256_val = im.generate_value("sha256")
self.assertEqual(len(sha256_val), 64)
self.assertNotIn('$', sha256_val) # no dollar sign
# sha1 → 40 hex chars
sha1_val = im.generate_value("sha1")
self.assertEqual(len(sha1_val), 40)
self.assertNotIn('$', sha1_val) # no dollar sign
# bcrypt → should *not* start with '$2' after escaping, and contain no '$'
bcrypt_val = im.generate_value("bcrypt")
self.assertFalse(bcrypt_val.startswith("$2"))
self.assertNotIn('$', bcrypt_val) # no dollar sign
# alphanumeric → 64 chars
alnum = im.generate_value("alphanumeric")
self.assertEqual(len(alnum), 64)
self.assertTrue(alnum.isalnum())
self.assertNotIn('$', alnum) # no dollar sign
# base64_prefixed_32 → starts with "base64:"
b64 = im.generate_value("base64_prefixed_32")
self.assertTrue(b64.startswith("base64:"))
self.assertNotIn('$', b64) # no dollar sign
# random_hex_16 → 32 hex chars
hex16 = im.generate_value("random_hex_16")
self.assertEqual(len(hex16), 32)
self.assertTrue(all(c in "0123456789abcdef" for c in hex16))
self.assertNotIn('$', hex16) # no dollar sign
def test_apply_schema_and_recurse(self):
"""
apply_schema should inject central_database password and vault nested.inner
"""
# Setup role directory
role_dir = self.tmpdir / "role"
(role_dir / "meta").mkdir(parents=True)
(role_dir / "vars").mkdir(parents=True)
# Create empty inventory.yml
inv_file = self.tmpdir / "inventory.yml"
inv_file.write_text(" ")
# Provide override for plain_cred to avoid SystemExit
overrides = {'credentials.plain_cred': 'OVERRIDE_PLAIN'}
# Instantiate manager with overrides
mgr = InventoryManager(role_dir, inv_file, "pw", overrides=overrides)
# Patch generate_value locally for predictable values
with patch.object(InventoryManager, 'generate_value', lambda self, alg: f"GEN_{alg}"):
result = mgr.apply_schema()
apps = result["applications"]["testapp"]
# central_database entry
self.assertEqual(apps["credentials"]["database_password"], "GEN_alphanumeric")
# plain_cred vaulted from override
self.assertIsInstance(apps["credentials"]["plain_cred"], VaultScalar)
# nested.inner should not be vaulted due to code's prefix check
self.assertEqual(
apps["credentials"]["nested"]["inner"],
{"description": "desc2", "algorithm": "sha256", "validation": {}},
)

View File

@@ -0,0 +1,134 @@
import unittest
import tempfile
import shutil
import os
from pathlib import Path
import subprocess
import sys
import yaml
SCRIPT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../cli/validate_inventory.py"))
class TestValidateInventory(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.group_vars_all = Path(self.temp_dir) / "group_vars" / "all"
self.group_vars_all.mkdir(parents=True)
self.inventory_dir = Path(self.temp_dir) / "inventory"
self.inventory_dir.mkdir()
# Create default applications file
self.default_applications = {
"defaults_applications": {
"app1": {
"port": 8080,
"enabled": True,
"settings": {
"theme": "dark"
}
}
}
}
(self.group_vars_all / "01_applications.yml").write_text(
yaml.dump(self.default_applications), encoding="utf-8"
)
# Create default users file
self.default_users = {
"default_users": {
"alice": {
"email": "alice@example.com",
"role": "admin"
}
}
}
(self.group_vars_all / "01_users.yml").write_text(
yaml.dump(self.default_users), encoding="utf-8"
)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def run_script(self, expected_code=0):
result = subprocess.run(
[sys.executable, SCRIPT_PATH, str(self.inventory_dir)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
cwd=self.temp_dir
)
if result.returncode != expected_code:
print("STDOUT:", result.stdout)
print("STDERR:", result.stderr)
return result
def test_valid_inventory(self):
(self.inventory_dir / "group_vars.yml").write_text(yaml.dump({
"applications": {
"app1": {
"port": 8080,
"enabled": True,
"settings": {
"theme": "dark"
}
}
},
"users": {
"alice": {
"email": "alice@example.com",
"role": "admin",
"password": "secret"
}
}
}), encoding="utf-8")
result = self.run_script(expected_code=0)
self.assertIn("Inventory directory is valid against defaults", result.stdout)
def test_unknown_user_warning(self):
(self.inventory_dir / "invalid_users.yml").write_text(yaml.dump({
"users": {
"bob": {
"email": "bob@example.com",
"role": "user"
}
}
}), encoding="utf-8")
result = self.run_script(expected_code=0)
self.assertIn("Warning", result.stderr)
def test_missing_user_key_fails(self):
(self.inventory_dir / "invalid_key.yml").write_text(yaml.dump({
"users": {
"alice": {
"email": "alice@example.com",
"role": "admin",
"extra": "unexpected"
}
}
}), encoding="utf-8")
result = self.run_script(expected_code=1)
self.assertIn("Missing default for user 'alice': key 'extra'", result.stderr)
def test_missing_application_key_fails(self):
(self.inventory_dir / "missing_key.yml").write_text(yaml.dump({
"applications": {
"app1": {
"port": 8080,
"enabled": True,
"settings": {
"theme": "dark"
},
"extra_setting": True
}
}
}), encoding="utf-8")
result = self.run_script(expected_code=1)
self.assertIn("Missing default for app1: extra_setting", result.stdout)
if __name__ == "__main__":
unittest.main()