Restructured CLI logic

This commit is contained in:
2025-07-10 21:26:44 +02:00
parent 8457325b5c
commit c160c58a5c
44 changed files with 97 additions and 155 deletions

0
utils/__init__.py Normal file
View File

View File

50
utils/handler/vault.py Normal file
View File

@@ -0,0 +1,50 @@
import subprocess
from typing import Any, Dict
from yaml.loader import SafeLoader
from yaml.dumper import SafeDumper
class VaultScalar(str):
"""A subclass of str to represent vault-encrypted strings."""
pass
def _vault_constructor(loader, node):
"""Custom constructor to handle !vault tag as plain text."""
return node.value
def _vault_representer(dumper, data):
"""Custom representer to dump VaultScalar as literal blocks."""
return dumper.represent_scalar('!vault', data, style='|')
SafeLoader.add_constructor('!vault', _vault_constructor)
SafeDumper.add_representer(VaultScalar, _vault_representer)
class VaultHandler:
def __init__(self, vault_password_file: str):
self.vault_password_file = vault_password_file
def encrypt_string(self, value: str, name: str) -> str:
"""Encrypt a string using ansible-vault."""
cmd = [
"ansible-vault", "encrypt_string",
value, f"--name={name}",
"--vault-password-file", self.vault_password_file
]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}")
return proc.stdout
def encrypt_leaves(self, branch: Dict[str, Any], vault_pw: str):
"""Recursively encrypt all leaves (plain text values) under the credentials section."""
for key, value in branch.items():
if isinstance(value, dict):
self.encrypt_leaves(value, vault_pw) # Recurse into nested dictionaries
else:
# Skip if already vaulted (i.e., starts with $ANSIBLE_VAULT)
if isinstance(value, str) and not value.lstrip().startswith("$ANSIBLE_VAULT"):
snippet = self.encrypt_string(value, key)
lines = snippet.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip())
body = "\n".join(line[indent:] for line in lines[1:])
branch[key] = VaultScalar(body) # Store encrypted value as VaultScalar

23
utils/handler/yaml.py Normal file
View File

@@ -0,0 +1,23 @@
import yaml
from yaml.loader import SafeLoader
from typing import Any, Dict
from utils.handler.vault import VaultScalar
class YamlHandler:
@staticmethod
def load_yaml(path) -> Dict:
"""Load the YAML file and wrap existing !vault entries."""
text = path.read_text()
data = yaml.load(text, Loader=SafeLoader) or {}
return YamlHandler.wrap_existing_vaults(data)
@staticmethod
def wrap_existing_vaults(node: Any) -> Any:
"""Recursively wrap any str that begins with '$ANSIBLE_VAULT' in a VaultScalar so it dumps as a literal block."""
if isinstance(node, dict):
return {k: YamlHandler.wrap_existing_vaults(v) for k, v in node.items()}
if isinstance(node, list):
return [YamlHandler.wrap_existing_vaults(v) for v in node]
if isinstance(node, str) and node.lstrip().startswith("$ANSIBLE_VAULT"):
return VaultScalar(node)
return node

View File

165
utils/manager/inventory.py Normal file
View File

@@ -0,0 +1,165 @@
import secrets
import hashlib
import bcrypt
from pathlib import Path
from typing import Dict
from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar
import string
import sys
import base64
class InventoryManager:
def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]):
"""Initialize the Inventory Manager."""
self.role_path = role_path
self.inventory_path = inventory_path
self.vault_pw = vault_pw
self.overrides = overrides
self.inventory = YamlHandler.load_yaml(inventory_path)
self.schema = YamlHandler.load_yaml(role_path / "schema" / "main.yml")
self.app_id = self.load_application_id(role_path)
self.vault_handler = VaultHandler(vault_pw)
def load_application_id(self, role_path: Path) -> str:
"""Load the application ID from the role's vars/main.yml file."""
vars_file = role_path / "vars" / "main.yml"
data = YamlHandler.load_yaml(vars_file)
app_id = data.get("application_id")
if not app_id:
print(f"ERROR: 'application_id' missing in {vars_file}", file=sys.stderr)
sys.exit(1)
return app_id
def apply_schema(self) -> Dict:
"""Apply the schema and return the updated inventory."""
apps = self.inventory.setdefault("applications", {})
target = apps.setdefault(self.app_id, {})
# Load the data from vars/main.yml
vars_file = self.role_path / "config" / "main.yml"
data = YamlHandler.load_yaml(vars_file)
# Check if 'central-database' is enabled in the features section of data
if "features" in data:
if "central_database" in data["features"] and \
data["features"]["central_database"]:
# Add 'central_database' value (password) to credentials
target.setdefault("credentials", {})["database_password"] = self.generate_value("alphanumeric")
if "oauth2" in data["features"] and \
data["features"]["oauth2"]:
target.setdefault("credentials", {})["oauth2_proxy_cookie_secret"] = self.generate_value("random_hex_16")
# Apply recursion only for the `credentials` section
self.recurse_credentials(self.schema, target)
return self.inventory
def recurse_credentials(self, branch: dict, dest: dict, prefix: str = ""):
"""Recursively process only the 'credentials' section and generate values."""
for key, meta in branch.items():
full_key = f"{prefix}.{key}" if prefix else key
# Only process 'credentials' section for encryption
if prefix == "credentials" and isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")):
alg = meta["algorithm"]
if alg == "plain":
# Must be supplied via --set
if full_key not in self.overrides:
print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=<value>", file=sys.stderr)
sys.exit(1)
plain = self.overrides[full_key]
else:
plain = self.overrides.get(full_key, self.generate_value(alg))
# Check if the value is already vaulted or if it's a dictionary
existing_value = dest.get(key)
# If existing_value is a dictionary, print a warning and skip encryption
if isinstance(existing_value, dict):
print(f"Skipping encryption for '{key}', as it is a dictionary.")
continue
# Check if the value is a VaultScalar and already vaulted
if existing_value and isinstance(existing_value, VaultScalar):
print(f"Skipping encryption for '{key}', as it is already vaulted.")
continue
# Encrypt only if it's not already vaulted
snippet = self.vault_handler.encrypt_string(plain, key)
lines = snippet.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip())
body = "\n".join(line[indent:] for line in lines[1:])
dest[key] = VaultScalar(body)
elif isinstance(meta, dict):
sub = dest.setdefault(key, {})
self.recurse_credentials(meta, sub, full_key)
else:
dest[key] = meta
def generate_secure_alphanumeric(self, length: int) -> str:
"""Generate a cryptographically secure random alphanumeric string of the given length."""
characters = string.ascii_letters + string.digits # a-zA-Z0-9
return ''.join(secrets.choice(characters) for _ in range(length))
def generate_value(self, algorithm: str) -> str:
"""
Generate a random secret value according to the specified algorithm.
Supported algorithms:
"random_hex"
Returns a 64-byte (512-bit) secure random string, encoded as 128 hexadecimal characters.
Use when you need maximum entropy in a hex-only format.
"sha256"
Generates 32 random bytes, hashes them with SHA-256, and returns a 64-character hex digest.
Good for when you want a fixed-length (256-bit) hash output.
"sha1"
Generates 20 random bytes, hashes them with SHA-1, and returns a 40-character hex digest.
Only use in legacy contexts; SHA-1 is considered weaker than SHA-256.
"bcrypt"
Creates a random 16-byte URL-safe password, then applies a bcrypt hash.
Suitable for storing user-style passwords where bcrypt verification is needed.
"alphanumeric"
Produces a 64-character string drawn from [AZ, az, 09].
Offers ≈380 bits of entropy; human-friendly charset.
"base64_prefixed_32"
Generates 32 random bytes, encodes them in Base64, and prefixes the result with "base64:".
Useful when downstream systems expect a Base64 format.
"random_hex_16"
Returns 16 random bytes (128 bits) encoded as 32 hexadecimal characters.
Handy for shorter tokens or salts.
Returns:
A securely generated string according to the chosen algorithm.
"""
if algorithm == "random_hex":
return secrets.token_hex(64)
if algorithm == "sha256":
return hashlib.sha256(secrets.token_bytes(32)).hexdigest()
if algorithm == "sha1":
return hashlib.sha1(secrets.token_bytes(20)).hexdigest()
if algorithm == "bcrypt":
# Generate a random password and hash it with bcrypt
pw = secrets.token_urlsafe(16).encode()
raw_hash = bcrypt.hashpw(pw, bcrypt.gensalt()).decode()
# Replace every '$' with a random lowercase alphanumeric character
alnum = string.digits + string.ascii_lowercase
escaped = "".join(secrets.choice(alnum) if ch == '$' else ch for ch in raw_hash)
return escaped
if algorithm == "alphanumeric":
return self.generate_secure_alphanumeric(64)
if algorithm == "base64_prefixed_32":
return "base64:" + base64.b64encode(secrets.token_bytes(32)).decode()
if algorithm == "random_hex_16":
# 16 Bytes → 32 Hex-Characters
return secrets.token_hex(16)
return "undefined"

83
utils/test_main.py Normal file
View File

@@ -0,0 +1,83 @@
# tests/unit/test_main.py
import os
import sys
import stat
import tempfile
import unittest
from unittest import mock
# Insert project root into import path so we can import main.py
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
)
import main # assumes main.py lives at the project root
class TestMainHelpers(unittest.TestCase):
def test_format_command_help_basic(self):
name = "cmd"
description = "A basic description"
output = main.format_command_help(
name, description,
indent=2, col_width=20, width=40
)
# Should start with two spaces and the command name
self.assertTrue(output.startswith(" cmd"))
# Description should appear somewhere in the wrapped text
self.assertIn("A basic description", output)
def test_list_cli_commands_filters_and_sorts(self):
# Create a temporary directory with sample files
with tempfile.TemporaryDirectory() as tmpdir:
open(os.path.join(tmpdir, "one.py"), "w").close()
open(os.path.join(tmpdir, "__init__.py"), "w").close()
open(os.path.join(tmpdir, "ignore.txt"), "w").close()
open(os.path.join(tmpdir, "two.py"), "w").close()
# Only 'one' and 'two' should be returned, in sorted order
commands = main.list_cli_commands(tmpdir)
self.assertEqual(commands, ["one", "two"])
def test_git_clean_repo_invokes_git_clean(self):
with mock.patch('main.subprocess.run') as mock_run:
main.git_clean_repo()
mock_run.assert_called_once_with(['git', 'clean', '-Xfd'], check=True)
def test_extract_description_via_help_with_description(self):
# Create a dummy script that prints a help description
with tempfile.TemporaryDirectory() as tmpdir:
script_path = os.path.join(tmpdir, "dummy.py")
with open(script_path, "w") as f:
f.write(
"#!/usr/bin/env python3\n"
"import sys\n"
"if '--help' in sys.argv:\n"
" print('usage: dummy.py [options]')\n"
" print()\n"
" print('This is a help description.')\n"
)
# Make it executable
mode = os.stat(script_path).st_mode
os.chmod(script_path, mode | stat.S_IXUSR)
description = main.extract_description_via_help(script_path)
self.assertEqual(description, "This is a help description.")
def test_extract_description_via_help_without_description(self):
# Script that has no help description
with tempfile.TemporaryDirectory() as tmpdir:
script_path = os.path.join(tmpdir, "empty.py")
with open(script_path, "w") as f:
f.write(
"#!/usr/bin/env python3\n"
"print('no help here')\n"
)
description = main.extract_description_via_help(script_path)
self.assertEqual(description, "-")
if __name__ == "__main__":
unittest.main()