mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-12-10 11:26:24 +00:00
### Overview This commit introduces a broad set of improvements across the defaults generator, credential creation subsystem, inventory creation workflow, and InventoryManager core logic. ### Major Changes - Support empty or config/main.yml in defaults generator and ensure that applications with empty configs are still included in defaults_applications. - Add '--snippet' and '--allow-empty-plain' modes to create/credentials.py with non-destructive merging and correct plain-secret handling. - Ensure empty strings for 'plain' credentials are never encrypted. - Update InventoryManager to fully support allow_empty_plain and prevent accidental overwriting or encrypting existing VaultScalar or dict values. - Add full-size implementation of cli/create/inventory.py including dynamic inventory building, role filtering, host_vars management, and parallelised credential snippet generation. - Fix schemas (Magento, Nextcloud, OAuth2-Proxy, keyboard-color, etc.) to align with the new credential model and avoid test failures. - Improve get_app_conf consistency by ensuring credentials.* paths are always resolvable for applications even when config/main.yml is empty. ### Added Test Coverage - Unit tests for defaults generator handling empty configs. - Full test suite for create/inventory.py including merge logic and vault-safe host_vars loading. - Extensive tests for InventoryManager: plain-secret behavior, vault handling, and recursion logic. - Update or remove outdated tests referencing old schema behaviour. ### Context This commit is associated with a refactoring and debugging session documented here: https://chatgpt.com/share/692ec0e1-5018-800f-b568-d09a53e9d0ee
163 lines
7.3 KiB
Python
163 lines
7.3 KiB
Python
import os
|
|
import sys
|
|
import unittest
|
|
from unittest import mock
|
|
|
|
# Ensure cli module is importable
|
|
dir_path = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), '../../../cli')
|
|
)
|
|
sys.path.insert(0, dir_path)
|
|
|
|
# Import functions and classes to test
|
|
from cli.create.credentials import ask_for_confirmation, main
|
|
from module_utils.handler.vault import VaultHandler, VaultScalar
|
|
import subprocess
|
|
import tempfile
|
|
import yaml
|
|
|
|
class TestCreateCredentials(unittest.TestCase):
|
|
def test_ask_for_confirmation_yes(self):
|
|
with mock.patch('builtins.input', return_value='y'):
|
|
self.assertTrue(ask_for_confirmation('test_key'))
|
|
|
|
def test_ask_for_confirmation_no(self):
|
|
with mock.patch('builtins.input', return_value='n'):
|
|
self.assertFalse(ask_for_confirmation('test_key'))
|
|
|
|
def test_vault_encrypt_string_success(self):
|
|
handler = VaultHandler('dummy_pw_file')
|
|
# Mock subprocess.run to simulate successful vault encryption
|
|
fake_output = 'Encrypted data'
|
|
completed = subprocess.CompletedProcess(
|
|
args=['ansible-vault'], returncode=0, stdout=fake_output, stderr=''
|
|
)
|
|
with mock.patch('subprocess.run', return_value=completed) as proc_run:
|
|
result = handler.encrypt_string('plain_val', 'name')
|
|
proc_run.assert_called_once()
|
|
self.assertEqual(result, fake_output)
|
|
|
|
def test_vault_encrypt_string_failure(self):
|
|
handler = VaultHandler('dummy_pw_file')
|
|
# Mock subprocess.run to simulate failure
|
|
completed = subprocess.CompletedProcess(
|
|
args=['ansible-vault'], returncode=1, stdout='', stderr='error')
|
|
with mock.patch('subprocess.run', return_value=completed):
|
|
with self.assertRaises(RuntimeError):
|
|
handler.encrypt_string('plain_val', 'name')
|
|
|
|
def test_main_overrides_and_file_writing(self):
|
|
# Setup temporary files for role-path vars and inventory
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
role_path = os.path.join(tmpdir, 'role')
|
|
os.makedirs(os.path.join(role_path, 'config'))
|
|
os.makedirs(os.path.join(role_path, 'schema'))
|
|
os.makedirs(os.path.join(role_path, 'vars'))
|
|
# Create vars/main.yml with application_id
|
|
main_vars = {'application_id': 'app_test'}
|
|
with open(os.path.join(role_path, 'vars', 'main.yml'), 'w') as f:
|
|
yaml.dump(main_vars, f)
|
|
# Create config/main.yml with features disabled
|
|
config = {'features': {'central_database': False}}
|
|
with open(os.path.join(role_path, "config" , "main.yml"), 'w') as f:
|
|
yaml.dump(config, f)
|
|
# Create schema.yml defining plain credential
|
|
schema = {'credentials': {'api_key': {'description': 'API key', 'algorithm': 'plain', 'validation': {}}}}
|
|
with open(os.path.join(role_path, 'schema', 'main.yml'), 'w') as f:
|
|
yaml.dump(schema, f)
|
|
# Prepare inventory file
|
|
inventory_file = os.path.join(tmpdir, 'inventory.yml')
|
|
with open(inventory_file, 'w') as f:
|
|
yaml.dump({}, f)
|
|
vault_pw_file = os.path.join(tmpdir, 'pw.txt')
|
|
with open(vault_pw_file, 'w') as f:
|
|
f.write('pw')
|
|
|
|
# Simulate ansible-vault encrypt_string output for api_key
|
|
fake_snippet = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTEDVALUE"
|
|
completed = subprocess.CompletedProcess(
|
|
args=['ansible-vault'], returncode=0, stdout=fake_snippet, stderr=''
|
|
)
|
|
with mock.patch('subprocess.run', return_value=completed):
|
|
# Run main with override for credentials.api_key and force to skip prompt
|
|
sys.argv = [
|
|
'create/credentials.py',
|
|
'--role-path', role_path,
|
|
'--inventory-file', inventory_file,
|
|
'--vault-password-file', vault_pw_file,
|
|
'--set', 'credentials.api_key=SECRET',
|
|
'--force'
|
|
]
|
|
# Should complete without error
|
|
main()
|
|
# Verify inventory file updated with vaulted api_key
|
|
data = yaml.safe_load(open(inventory_file))
|
|
creds = data['applications']['app_test']['credentials']
|
|
self.assertIn('api_key', creds)
|
|
# VaultScalar serializes to a vault block, safe_load returns a string containing the vault header
|
|
self.assertIsInstance(creds['api_key'], str)
|
|
self.assertTrue(creds['api_key'].lstrip().startswith('$ANSIBLE_VAULT'))
|
|
|
|
def test_main_plain_algorithm_allow_empty_plain_sets_empty_string_without_vault(self):
|
|
"""
|
|
When --allow-empty-plain is used, a 'plain' credential without override
|
|
should be set to "" and *not* encrypted (no ansible-vault calls).
|
|
"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
role_path = os.path.join(tmpdir, 'role')
|
|
os.makedirs(os.path.join(role_path, 'config'))
|
|
os.makedirs(os.path.join(role_path, 'schema'))
|
|
os.makedirs(os.path.join(role_path, 'vars'))
|
|
|
|
# vars/main.yml with application_id
|
|
main_vars = {'application_id': 'app_empty_plain'}
|
|
with open(os.path.join(role_path, 'vars', 'main.yml'), 'w') as f:
|
|
yaml.dump(main_vars, f)
|
|
|
|
# config/main.yml
|
|
config = {'features': {'central_database': False}}
|
|
with open(os.path.join(role_path, "config", "main.yml"), 'w') as f:
|
|
yaml.dump(config, f)
|
|
|
|
# schema/main.yml: plain credential *without* overrides
|
|
schema = {
|
|
'credentials': {
|
|
'api_key': {
|
|
'description': 'API key',
|
|
'algorithm': 'plain',
|
|
'validation': {}
|
|
}
|
|
}
|
|
}
|
|
with open(os.path.join(role_path, 'schema', 'main.yml'), 'w') as f:
|
|
yaml.dump(schema, f)
|
|
|
|
# Empty inventory file
|
|
inventory_file = os.path.join(tmpdir, 'inventory.yml')
|
|
with open(inventory_file, 'w') as f:
|
|
yaml.dump({}, f)
|
|
|
|
# Vault password file
|
|
vault_pw_file = os.path.join(tmpdir, 'pw.txt')
|
|
with open(vault_pw_file, 'w') as f:
|
|
f.write('pw')
|
|
|
|
# Ensure ansible-vault is *not* called at all in this scenario
|
|
def fail_run(*_args, **_kwargs):
|
|
raise AssertionError("ansible-vault must not be called for allow_empty_plain + empty plain")
|
|
|
|
with mock.patch('subprocess.run', side_effect=fail_run):
|
|
sys.argv = [
|
|
'create/credentials.py',
|
|
'--role-path', role_path,
|
|
'--inventory-file', inventory_file,
|
|
'--vault-password-file', vault_pw_file,
|
|
'--allow-empty-plain',
|
|
]
|
|
main()
|
|
|
|
data = yaml.safe_load(open(inventory_file))
|
|
creds = data['applications']['app_empty_plain']['credentials']
|
|
# api_key should exist and be an empty string, not a vault block
|
|
self.assertIn('api_key', creds)
|
|
self.assertEqual(creds['api_key'], "") |