Compare commits

...

13 Commits

126 changed files with 969 additions and 650 deletions

View File

@@ -1,12 +1,11 @@
ROLES_DIR := ./roles
APPLICATIONS_OUT := ./group_vars/all/04_applications.yml
APPLICATIONS_SCRIPT := ./cli/generate_applications.py
APPLICATIONS_SCRIPT := ./cli/generate/defaults/applications.py
USERS_OUT := ./group_vars/all/03_users.yml
USERS_SCRIPT := ./cli/generate_users.py
INCLUDES_SCRIPT := ./cli/generate_playbook.py
USERS_SCRIPT := ./cli/generate/defaults/users.py
INCLUDES_SCRIPT := ./cli/generate/conditional_role_include.py
# Define the prefixes for which we want individual role-include files
INCLUDE_GROUPS := "drv-" "svc-" "desk-" "web-" "util-"
INCLUDE_GROUPS := $(shell python3 main.py meta invokable_paths -s "-" --no-signal | tr '\n' ' ')
# Directory where these include-files will be written
INCLUDES_OUT_DIR := ./tasks/groups

View File

@@ -5,8 +5,8 @@ from pathlib import Path
import yaml
from typing import Dict, Any
from utils.manager.inventory import InventoryManager
from utils.handler.vault import VaultHandler, VaultScalar
from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar
from utils.handler.yaml import YamlHandler
from yaml.dumper import SafeDumper

View File

@@ -7,7 +7,16 @@ import datetime
import sys
def run_ansible_playbook(inventory, modes, limit=None, allowed_applications=None, password_file=None, verbose=0, skip_tests=False, skip_validation=False):
def run_ansible_playbook(
inventory,
modes,
limit=None,
allowed_applications=None,
password_file=None,
verbose=0,
skip_tests=False,
skip_validation=False
):
start_time = datetime.datetime.now()
print(f"\n▶️ Script started at: {start_time.isoformat()}\n")
@@ -17,32 +26,40 @@ def run_ansible_playbook(inventory, modes, limit=None, allowed_applications=None
script_dir = os.path.dirname(os.path.realpath(__file__))
playbook = os.path.join(os.path.dirname(script_dir), "playbook.yml")
# Inventory validation step
if not skip_validation:
print("\n🔍 Validating inventory before deployment...\n")
try:
subprocess.run(
[sys.executable, os.path.join(script_dir, "validate_inventory.py"), os.path.dirname(inventory)],
[sys.executable,
os.path.join(script_dir, "validate.inventory.py"),
os.path.dirname(inventory)
],
check=True
)
except subprocess.CalledProcessError:
print("\n❌ Inventory validation failed. Deployment aborted.\n", file=sys.stderr)
print(
"\n❌ Inventory validation failed. Deployment aborted.\n",
file=sys.stderr
)
sys.exit(1)
else:
print("\n⚠️ Skipping inventory validation as requested.\n")
if not skip_tests:
print("\n🧪 Running tests (make test)...\n")
subprocess.run(["make", "test"], check=True)
# Build ansible-playbook command
cmd = ["ansible-playbook", "-i", inventory, playbook]
if limit:
cmd.extend(["--limit", limit])
# Pass application IDs parameter as extra var if provided
if allowed_applications:
joined = ",".join(allowed_applications)
cmd.extend(["-e", f"allowed_applications={joined}"])
# Pass other mode flags
for key, value in modes.items():
val = str(value).lower() if isinstance(value, bool) else str(value)
cmd.extend(["-e", f"{key}={val}"])
@@ -75,55 +92,56 @@ def main():
help="Path to the inventory file (INI or YAML) containing hosts and variables."
)
parser.add_argument(
"--limit",
"-l", "--limit",
help="Restrict execution to a specific host or host group from the inventory."
)
parser.add_argument(
"--host-type",
choices=["server", "personal-computer"],
"-T", "--host-type",
choices=["server", "desktop"],
default="server",
help="Specify whether the target is a server or a personal computer. Affects role selection and variables."
)
parser.add_argument(
"--reset", action="store_true",
"-r", "--reset", action="store_true",
help="Reset all CyMaIS files and configurations, and run the entire playbook (not just individual roles)."
)
parser.add_argument(
"--test", action="store_true",
"-t", "--test", action="store_true",
help="Run test routines instead of production tasks. Useful for local testing and CI pipelines."
)
parser.add_argument(
"--update", action="store_true",
"-u", "--update", action="store_true",
help="Enable the update procedure to bring software and roles up to date."
)
parser.add_argument(
"--backup", action="store_true",
"-b", "--backup", action="store_true",
help="Perform a full backup of critical data and configurations before the update process."
)
parser.add_argument(
"--cleanup", action="store_true",
"-c", "--cleanup", action="store_true",
help="Clean up unused files and outdated configurations after all tasks are complete."
)
parser.add_argument(
"--debug", action="store_true",
"-d", "--debug", action="store_true",
help="Enable detailed debug output for Ansible and this script."
)
parser.add_argument(
"--password-file",
"-p", "--password-file",
help="Path to the file containing the Vault password. If not provided, prompts for the password interactively."
)
parser.add_argument(
"--skip-tests", action="store_true",
"-s", "--skip-tests", action="store_true",
help="Skip running 'make test' even if tests are normally enabled."
)
parser.add_argument(
"--skip-validation", action="store_true",
"-V", "--skip-validation", action="store_true",
help="Skip inventory validation before deployment."
)
parser.add_argument(
"--id",
"-i", "--id",
nargs="+",
default=[],
dest="id",
help="List of application_id's for partial deploy. If not set, all application IDs defined in the inventory will be executed."
)
parser.add_argument(
@@ -152,8 +170,8 @@ def main():
verbose=args.verbose,
skip_tests=args.skip_tests,
skip_validation=args.skip_validation
)
if __name__ == "__main__":
main()

47
cli/fix/ini_py.py Normal file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
This script creates __init__.py files in every subdirectory under the specified
folder relative to the project root.
"""
import os
import argparse
def create_init_files(root_folder):
"""
Walk through all subdirectories of root_folder and create an __init__.py file
in each directory if it doesn't already exist.
"""
for dirpath, dirnames, filenames in os.walk(root_folder):
init_file = os.path.join(dirpath, '__init__.py')
if not os.path.exists(init_file):
open(init_file, 'w').close()
print(f"Created: {init_file}")
else:
print(f"Skipped (already exists): {init_file}")
def main():
parser = argparse.ArgumentParser(
description='Create __init__.py files in every subdirectory.'
)
parser.add_argument(
'folder',
help='Relative path to the target folder (e.g., cli/fix)'
)
args = parser.parse_args()
# Determine the absolute path based on the current working directory
root_folder = os.path.abspath(args.folder)
if not os.path.isdir(root_folder):
print(f"Error: The folder '{args.folder}' does not exist or is not a directory.")
exit(1)
create_init_files(root_folder)
if __name__ == '__main__':
main()

View File

@@ -8,6 +8,10 @@ import sys
import yaml
from pathlib import Path
# Directory containing roles; can be overridden by tests
MODULE_DIR = Path(__file__).resolve().parent
ROLES_DIR = (MODULE_DIR.parent.parent / "roles").resolve()
def process_role(role_dir: Path, prefix: str, preview: bool, overwrite: bool):
name = role_dir.name
if not name.startswith(prefix):
@@ -50,6 +54,15 @@ def process_role(role_dir: Path, prefix: str, preview: bool, overwrite: bool):
print(f"Created {vars_file} with application_id: {expected_id}")
def run(prefix: str, preview: bool = False, overwrite: bool = False):
"""
Ensure vars/main.yml for roles under ROLES_DIR with the given prefix has correct application_id.
"""
for role in sorted(Path(ROLES_DIR).iterdir()):
if role.is_dir():
process_role(role, prefix, preview, overwrite)
def main():
parser = argparse.ArgumentParser(
description="Ensure vars/main.yml for roles with a given prefix has correct application_id"
@@ -68,16 +81,9 @@ def main():
)
args = parser.parse_args()
# Determine roles directory relative to this script
script_dir = Path(__file__).resolve().parent
roles_dir = (script_dir.parent / "roles").resolve()
if not roles_dir.is_dir():
print(f"Roles directory not found: {roles_dir}", file=sys.stderr)
sys.exit(1)
# Run processing
run(prefix=args.prefix, preview=args.preview, overwrite=args.overwrite)
for role in sorted(roles_dir.iterdir()):
if role.is_dir():
process_role(role, args.prefix, args.preview, args.overwrite)
if __name__ == "__main__":
main()

View File

@@ -156,7 +156,7 @@ def print_dependency_tree(graph):
for root in roots:
print_node(root)
def generate_playbook_entries(roles_dir, prefixes=None):
def gen_condi_role_incl(roles_dir, prefixes=None):
"""
Generate playbook entries based on the sorted order.
Raises a ValueError if application_id is missing.
@@ -209,7 +209,7 @@ def main():
print_dependency_tree(graph)
sys.exit(0)
entries = generate_playbook_entries(args.roles_dir, prefixes)
entries = gen_condi_role_incl(args.roles_dir, prefixes)
output = ''.join(entries)
if args.output:

View File

View File

@@ -6,7 +6,7 @@ import yaml
import sys
from pathlib import Path
plugin_path = Path(__file__).resolve().parent / ".." / "lookup_plugins"
plugin_path = Path(__file__).resolve().parent / ".." / ".." / ".." /"lookup_plugins"
sys.path.insert(0, str(plugin_path))
from application_gid import LookupModule

0
cli/meta/__init__.py Normal file
View File

49
cli/meta/applications.py Normal file
View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import argparse
import glob
import os
import sys
try:
import yaml
except ImportError:
sys.stderr.write("PyYAML is required. Install with `pip install pyyaml`.\n")
sys.exit(1)
def find_application_ids():
"""
Searches all files matching roles/*/vars/main.yml for the key 'application_id'
and returns a list of all found IDs.
"""
pattern = os.path.join('roles', '*', 'vars', 'main.yml')
app_ids = []
for filepath in glob.glob(pattern):
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
sys.stderr.write(f"Error reading {filepath}: {e}\n")
continue
if isinstance(data, dict) and 'application_id' in data:
app_ids.append(data['application_id'])
return sorted(set(app_ids))
def main():
parser = argparse.ArgumentParser(
description='Output a list of all application_id values defined in roles/*/vars/main.yml'
)
# No arguments other than --help
parser.parse_args()
ids = find_application_ids()
for app_id in ids:
print(app_id)
if __name__ == '__main__':
main()

View File

@@ -1,105 +0,0 @@
import argparse
import subprocess
from ansible.parsing.vault import VaultLib, VaultSecret
import sys
import yaml
import re
from utils.handler.vault import VaultScalar
from yaml.loader import SafeLoader
from yaml.dumper import SafeDumper
# Register the custom constructor and representer for VaultScalar in PyYAML
SafeLoader.add_constructor('!vault', lambda loader, node: VaultScalar(node.value))
SafeDumper.add_representer(VaultScalar, lambda dumper, data: dumper.represent_scalar('!vault', data))
def is_vault_encrypted_data(data: str) -> bool:
"""Check if the given data is encrypted with Ansible Vault by looking for the vault header."""
return data.lstrip().startswith('$ANSIBLE_VAULT')
def decrypt_vault_data(encrypted_data: str, vault_secret: VaultSecret) -> str:
"""
Decrypt the given encrypted data using the provided vault_secret.
:param encrypted_data: Encrypted string to be decrypted
:param vault_secret: The VaultSecret instance used to decrypt the data
:return: Decrypted data as a string
"""
vault = VaultLib()
decrypted_data = vault.decrypt(encrypted_data, vault_secret)
return decrypted_data
def decrypt_vault_file(vault_file: str, vault_password_file: str):
"""
Decrypt the Ansible Vault file and return its contents.
:param vault_file: Path to the encrypted Ansible Vault file
:param vault_password_file: Path to the file containing the Vault password
:return: Decrypted contents of the Vault file
"""
# Read the vault password
with open(vault_password_file, 'r') as f:
vault_password = f.read().strip()
# Create a VaultSecret instance from the password
vault_secret = VaultSecret(vault_password.encode())
# Read the encrypted file
with open(vault_file, 'r') as f:
file_content = f.read()
# If the file is partially encrypted, we'll decrypt only the encrypted values
decrypted_data = file_content # Start with the unmodified content
# Find all vault-encrypted values (i.e., values starting with $ANSIBLE_VAULT)
encrypted_values = re.findall(r'^\s*([\w\.\-_]+):\s*["\']?\$ANSIBLE_VAULT[^\n]+', file_content, flags=re.MULTILINE)
# If there are encrypted values, decrypt them
for value in encrypted_values:
# Extract the encrypted value and decrypt it
encrypted_value = re.search(r'(["\']?\$ANSIBLE_VAULT[^\n]+)', value)
if encrypted_value:
# Remove any newlines or extra spaces from the encrypted value
encrypted_value = encrypted_value.group(0).replace('\n', '').replace('\r', '')
decrypted_value = decrypt_vault_data(encrypted_value, vault_secret)
# Replace the encrypted value with the decrypted value in the content
decrypted_data = decrypted_data.replace(encrypted_value, decrypted_value.strip())
return decrypted_data
def decrypt_and_display(vault_file: str, vault_password_file: str):
"""
Decrypts the Ansible Vault file and its values, then display the result.
Supports both full file and partial value encryption.
:param vault_file: Path to the encrypted Ansible Vault file
:param vault_password_file: Path to the file containing the Vault password
"""
decrypted_data = decrypt_vault_file(vault_file, vault_password_file)
# Convert the decrypted data to a string format (YAML or JSON)
output_data = yaml.dump(yaml.safe_load(decrypted_data), default_flow_style=False)
# Use subprocess to call `less` for paginated, scrollable output
subprocess.run(["less"], input=output_data, text=True)
def main():
# Set up the argument parser
parser = argparse.ArgumentParser(description="Decrypt and display variables from an Ansible Vault file.")
# Add arguments for the vault file and vault password file
parser.add_argument(
'vault_file',
type=str,
help="Path to the encrypted Ansible Vault file"
)
parser.add_argument(
'vault_password_file',
type=str,
help="Path to the file containing the Vault password"
)
# Parse the arguments
args = parser.parse_args()
# Display vault variables in a scrollable manner
decrypt_and_display(args.vault_file, args.vault_password_file)
if __name__ == "__main__":
main()

0
cli/validate/__init__.py Normal file
View File

154
cli/validate/inventory.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
import argparse
import sys
import yaml
import re
from pathlib import Path
# Ensure imports work when run directly
script_dir = Path(__file__).resolve().parent
repo_root = script_dir.parent.parent
sys.path.insert(0, str(repo_root))
from cli.meta.applications import find_application_ids
def load_yaml_file(path):
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
content = re.sub(r'(?m)^([ \t]*[^\s:]+):\s*!vault[\s\S]+?(?=^\S|\Z)', r"\1: \"<vaulted>\"\n", content)
return yaml.safe_load(content)
except Exception as e:
print(f"Warning: Could not parse {path}: {e}", file=sys.stderr)
return None
def recursive_keys(d, prefix=''):
keys = set()
if isinstance(d, dict):
for k, v in d.items():
full = f"{prefix}.{k}" if prefix else k
keys.add(full)
keys.update(recursive_keys(v, full))
return keys
def compare_application_keys(applications, defaults, source):
errs = []
for app_id, conf in applications.items():
if app_id not in defaults:
errs.append(f"{source}: Unknown application '{app_id}' (not in defaults_applications)")
continue
default = defaults[app_id]
app_keys = recursive_keys(conf)
def_keys = recursive_keys(default)
for key in app_keys:
if key.startswith('credentials'):
continue
if key not in def_keys:
errs.append(f"{source}: Missing default for {app_id}: {key}")
return errs
def compare_user_keys(users, default_users, source):
errs = []
for user, conf in users.items():
if user not in default_users:
print(f"Warning: {source}: Unknown user '{user}' (not in default_users)", file=sys.stderr)
continue
def_conf = default_users[user]
for key in conf:
if key in ('password','credentials','mailu_token'):
continue
if key not in def_conf:
errs.append(f"Missing default for user '{user}': key '{key}'")
return errs
def load_inventory_files(inv_dir):
all_data = {}
p = Path(inv_dir)
for f in p.glob('*.yml'):
data = load_yaml_file(f)
if isinstance(data, dict):
apps = data.get('applications') or data.get('defaults_applications')
if apps:
all_data[str(f)] = apps
for d in p.glob('*_vars'):
if d.is_dir():
for f in d.rglob('*.yml'):
data = load_yaml_file(f)
if isinstance(data, dict):
apps = data.get('applications') or data.get('defaults_applications')
if apps:
all_data[str(f)] = apps
return all_data
def validate_host_keys(app_ids, inv_dir):
errs = []
p = Path(inv_dir)
# Scan all top-level YAMLs for 'all.children'
for f in p.glob('*.yml'):
data = load_yaml_file(f)
if not isinstance(data, dict):
continue
all_node = data.get('all', {})
children = all_node.get('children')
if not isinstance(children, dict):
continue
for grp in children.keys():
if grp not in app_ids:
errs.append(f"{f}: Invalid group '{grp}' (not in application_ids)")
return errs
def find_single_file(pattern):
c = list(Path('group_vars/all').glob(pattern))
if len(c)!=1:
raise RuntimeError(f"Expected exactly one {pattern} in group_vars/all, found {len(c)}")
return c[0]
def main():
p = argparse.ArgumentParser()
p.add_argument('inventory_dir')
args = p.parse_args()
# defaults
dfile = find_single_file('*_applications.yml')
ufile = find_single_file('*users.yml')
ddata = load_yaml_file(dfile) or {}
udata = load_yaml_file(ufile) or {}
defaults = ddata.get('defaults_applications',{})
default_users = udata.get('default_users',{})
if not defaults:
print(f"Error: No 'defaults_applications' found in {dfile}", file=sys.stderr)
sys.exit(1)
if not default_users:
print(f"Error: No 'default_users' found in {ufile}", file=sys.stderr)
sys.exit(1)
app_errs = []
inv_files = load_inventory_files(args.inventory_dir)
for src, apps in inv_files.items():
app_errs.extend(compare_application_keys(apps, defaults, src))
user_errs = []
for fpath in Path(args.inventory_dir).rglob('*.yml'):
data = load_yaml_file(fpath)
if isinstance(data, dict) and 'users' in data:
errs = compare_user_keys(data['users'], default_users, str(fpath))
for e in errs:
print(e, file=sys.stderr)
user_errs.extend(errs)
host_errs = validate_host_keys(find_application_ids(), args.inventory_dir)
app_errs.extend(host_errs)
if app_errs or user_errs:
if app_errs:
print('Validation failed with the following issues:')
for e in app_errs:
print(f"- {e}")
sys.exit(1)
print('Inventory directory is valid against defaults and hosts.')
sys.exit(0)
if __name__=='__main__':
main()

View File

@@ -1,144 +0,0 @@
#!/usr/bin/env python3
import argparse
import sys
import yaml
import re
from pathlib import Path
def load_yaml_file(path):
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
content = re.sub(r'(?m)^([ \t]*[^\s:]+):\s*!vault[\s\S]+?(?=^\S|\Z)', r'\1: "<vaulted>"\n', content)
return yaml.safe_load(content)
except Exception as e:
print(f"Warning: Could not parse {path}: {e}", file=sys.stderr)
return None
def recursive_keys(d, prefix=""):
keys = set()
if isinstance(d, dict):
for k, v in d.items():
full_key = f"{prefix}.{k}" if prefix else k
keys.add(full_key)
keys.update(recursive_keys(v, full_key))
return keys
def compare_application_keys(applications, defaults, source_file):
errors = []
for app_id, app_conf in applications.items():
if app_id not in defaults:
errors.append(f"{source_file}: Unknown application '{app_id}' (not in defaults_applications)")
continue
default_conf = defaults.get(app_id, {})
app_keys = recursive_keys(app_conf)
default_keys = recursive_keys(default_conf)
for key in app_keys:
if key.startswith("credentials"):
continue # explicitly ignore credentials
if key not in default_keys:
errors.append(f"{source_file}: Missing default for {app_id}: {key}")
return errors
def compare_user_keys(users, default_users, source_file):
errors = []
for username, user_conf in users.items():
if username not in default_users:
print(f"Warning: {source_file}: Unknown user '{username}' (not in default_users)", file=sys.stderr)
continue
default_conf = default_users.get(username, {})
for key in user_conf:
if key in ("password", "credentials", "mailu_token"):
continue # ignore credentials/password
if key not in default_conf:
raise Exception(f"{source_file}: Missing default for user '{username}': key '{key}'")
return errors
def load_inventory_files(inventory_dir):
all_data = {}
inventory_path = Path(inventory_dir)
for path in inventory_path.glob("*.yml"):
data = load_yaml_file(path)
if isinstance(data, dict):
applications = data.get("applications") or data.get("defaults_applications")
if applications:
all_data[path] = applications
for vars_folder in inventory_path.glob("*_vars"):
if vars_folder.is_dir():
for subfile in vars_folder.rglob("*.yml"):
data = load_yaml_file(subfile)
if isinstance(data, dict):
applications = data.get("applications") or data.get("defaults_applications")
if applications:
all_data[subfile] = applications
return all_data
def find_single_file(pattern):
candidates = list(Path("group_vars/all").glob(pattern))
if len(candidates) != 1:
raise RuntimeError(f"Expected exactly one {pattern} file in group_vars/all, found {len(candidates)}")
return candidates[0]
def main():
parser = argparse.ArgumentParser(description="Verify application and user variable consistency with defaults.")
parser.add_argument("inventory_dir", help="Path to inventory directory (contains inventory.yml and *_vars/)")
args = parser.parse_args()
defaults_path = find_single_file("*_applications.yml")
users_path = find_single_file("*users.yml")
defaults_data = load_yaml_file(defaults_path)
default_users_data = load_yaml_file(users_path)
defaults = defaults_data.get("defaults_applications", {}) if defaults_data else {}
default_users = default_users_data.get("default_users", {}) if default_users_data else {}
if not defaults:
print(f"Error: No 'defaults_applications' found in {defaults_path}.", file=sys.stderr)
sys.exit(1)
if not default_users:
print(f"Error: No 'default_users' found in {users_path}.", file=sys.stderr)
sys.exit(1)
all_errors = []
inventory_files = load_inventory_files(args.inventory_dir)
for source_path, app_data in inventory_files.items():
errors = compare_application_keys(app_data, defaults, str(source_path))
all_errors.extend(errors)
# Load all users.yml files from inventory
for path in Path(args.inventory_dir).rglob("*.yml"):
data = load_yaml_file(path)
if isinstance(data, dict) and "users" in data:
try:
compare_user_keys(data["users"], default_users, str(path))
except Exception as e:
print(e, file=sys.stderr)
sys.exit(1)
if all_errors:
print("Validation failed with the following issues:")
for err in all_errors:
print("-", err)
sys.exit(1)
else:
print("Inventory directory is valid against defaults.")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -28,7 +28,7 @@ system_maintenance_cleanup_services:
system_maintenance_manipulation_services:
- "maint-docker-heal"
- "update-docker"
- "maint-docker-storage-optimizer"
- "cln-docker-storage-optimizer"
- "maint-docker-restart"
## Total System Maintenance Services

261
main.py
View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
import argparse
import os
import subprocess
import sys
@@ -10,7 +9,21 @@ import signal
from datetime import datetime
import pty
from cli.sounds import Sound
# Color support
try:
from colorama import init as colorama_init, Fore, Back, Style
colorama_init(autoreset=True)
except ImportError:
class Dummy:
def __getattr__(self, name): return ''
Fore = Back = Style = Dummy()
from cli.sounds import Sound # ensure Sound imported
def color_text(text, color):
return f"{color}{text}{Style.RESET_ALL}"
def format_command_help(name, description, indent=2, col_width=36, width=80):
prefix = " " * indent + f"{name:<{col_width - indent}}"
@@ -21,16 +34,41 @@ def format_command_help(name, description, indent=2, col_width=36, width=80):
)
return wrapper.fill(description)
def list_cli_commands(cli_dir):
return sorted(
os.path.splitext(f.name)[0] for f in os.scandir(cli_dir)
if f.is_file() and f.name.endswith(".py") and not f.name.startswith("__")
)
"""Recursively list all .py files under cli_dir that use argparse (without .py)."""
cmds = []
for root, _, files in os.walk(cli_dir):
for f in files:
if not f.endswith(".py") or f.startswith("__"):
continue
path = os.path.join(root, f)
try:
with open(path, 'r', encoding='utf-8') as fh:
content = fh.read()
if 'argparse' not in content:
continue
except Exception:
continue
rel_dir = os.path.relpath(root, cli_dir)
name = os.path.splitext(f)[0]
if rel_dir == ".":
cmd = (None, name)
else:
cmd = (rel_dir.replace(os.sep, "/"), name)
cmds.append(cmd)
return sorted(cmds, key=lambda x: (x[0] or "", x[1]))
def extract_description_via_help(cli_script_path):
try:
script_dir = os.path.dirname(os.path.realpath(__file__))
cli_dir = os.path.join(script_dir, "cli")
rel = os.path.relpath(cli_script_path, cli_dir)
module = "cli." + rel[:-3].replace(os.sep, ".")
result = subprocess.run(
[sys.executable, cli_script_path, "--help"],
[sys.executable, "-m", module, "--help"],
capture_output=True,
text=True,
check=True
@@ -39,7 +77,7 @@ def extract_description_via_help(cli_script_path):
for i, line in enumerate(lines):
if line.strip().startswith("usage:"):
continue
if line.strip() == "":
if not line.strip():
for j in range(i+1, len(lines)):
desc = lines[j].strip()
if desc:
@@ -48,102 +86,174 @@ def extract_description_via_help(cli_script_path):
except Exception:
return "-"
def git_clean_repo():
"""Remove all Git-ignored files and directories in the current repository."""
subprocess.run(['git', 'clean', '-Xfd'], check=True)
def play_start_intro():
Sound.play_start_sound()
Sound.play_cymais_intro_sound()
def failure_with_warning_loop():
Sound.play_finished_failed_sound()
print("Warning: command failed. Press Ctrl+C to stop sound warnings.")
def failure_with_warning_loop(no_signal, sound_enabled):
if not no_signal:
Sound.play_finished_failed_sound()
print(color_text("Warning: command failed. Press Ctrl+C to stop warnings.", Fore.RED))
try:
while True:
Sound.play_warning_sound()
if not no_signal:
Sound.play_warning_sound()
except KeyboardInterrupt:
print("Warnings stopped by user.")
print(color_text("Warnings stopped by user.", Fore.YELLOW))
from cli.sounds import Sound # ensure Sound imported
if __name__ == "__main__":
# Parse special flags early and remove from args
no_sound = False
log_enabled = False
git_clean = False
infinite = False
if '--no-sound' in sys.argv:
no_sound = True
sys.argv.remove('--no-sound')
if '--log' in sys.argv:
log_enabled = True
sys.argv.remove('--log')
if '--git-clean' in sys.argv:
git_clean = True
sys.argv.remove('--git-clean')
if '--infinite' in sys.argv:
infinite = True
sys.argv.remove('--infinite')
# Parse flags
sound_enabled = '--sound' in sys.argv and (sys.argv.remove('--sound') or True)
no_signal = '--no-signal' in sys.argv and (sys.argv.remove('--no-signal') or True)
log_enabled = '--log' in sys.argv and (sys.argv.remove('--log') or True)
git_clean = '--git-clean' in sys.argv and (sys.argv.remove('--git-clean') or True)
infinite = '--infinite' in sys.argv and (sys.argv.remove('--infinite') or True)
# Setup segfault handler to catch crashes
# Segfault handler
def segv_handler(signum, frame):
if not no_sound:
if not no_signal:
Sound.play_finished_failed_sound()
try:
while True:
Sound.play_warning_sound()
except KeyboardInterrupt:
pass
print("Segmentation fault detected. Exiting.")
print(color_text("Segmentation fault detected. Exiting.", Fore.RED))
sys.exit(1)
signal.signal(signal.SIGSEGV, segv_handler)
# Play intro sounds
if not no_sound:
# Play intro melody if requested
if sound_enabled:
threading.Thread(target=play_start_intro, daemon=True).start()
# Change to script directory
script_dir = os.path.dirname(os.path.realpath(__file__))
cli_dir = os.path.join(script_dir, "cli")
os.chdir(script_dir)
# If requested, clean git-ignored files
if git_clean:
git_clean_repo()
available_cli_commands = list_cli_commands(cli_dir)
# Collect available commands
available = list_cli_commands(cli_dir)
args = sys.argv[1:]
# Handle help invocation
if len(sys.argv) == 1 or sys.argv[1] in ('-h', '--help'):
print("CyMaIS CLI proxy to tools in ./cli/")
print("Usage: cymais [--no-sound] [--log] [--git-clean] [--infinite] <command> [options]")
print("Options:")
print(" --no-sound Suppress all sounds during execution")
print(" --log Log all proxied command output to logfile.log")
print(" --git-clean Remove all Git-ignored files before running")
print(" --infinite Run the proxied command in an infinite loop")
print(" -h, --help Show this help message and exit")
print("Available commands:")
for cmd in available_cli_commands:
path = os.path.join(cli_dir, f"{cmd}.py")
desc = extract_description_via_help(path)
print(format_command_help(cmd, desc))
# Global help
if not args or args[0] in ('-h', '--help'):
print(color_text("CyMaIS CLI 🦫🌐🖥️", Fore.CYAN + Style.BRIGHT))
print()
print(color_text("Your Gateway to Automated IT Infrastructure Setup", Style.DIM))
print()
print(color_text(
"Usage: cymais [--sound] [--no-signal] [--log] [--git-clean] [--infinite] <command> [options]",
Fore.GREEN
))
print()
# Use bright style for headings
print(color_text("Options:", Style.BRIGHT))
print(color_text(" --sound Play startup melody and warning sounds", Fore.YELLOW))
print(color_text(" --no-signal Suppress success/failure signals", Fore.YELLOW))
print(color_text(" --log Log all proxied command output to logfile.log", Fore.YELLOW))
print(color_text(" --git-clean Remove all Git-ignored files before running", Fore.YELLOW))
print(color_text(" --infinite Run the proxied command in an infinite loop", Fore.YELLOW))
print(color_text(" -h, --help Show this help message and exit", Fore.YELLOW))
print()
print(color_text("Available commands:", Style.BRIGHT))
print()
current_folder = None
for folder, cmd in available:
if folder != current_folder:
if folder:
print(color_text(f"{folder}/", Fore.MAGENTA))
print()
current_folder = folder
desc = extract_description_via_help(
os.path.join(cli_dir, *(folder.split('/') if folder else []), f"{cmd}.py")
)
print(color_text(format_command_help(cmd, desc, indent=2), ''), "\n")
print()
print(color_text(
"🔗 You can chain subcommands by specifying nested directories,",
Fore.CYAN
))
print(color_text(
" e.g. `cymais generate defaults applications` →",
Fore.CYAN
))
print(color_text(
" corresponds to `cli/generate/defaults/applications.py`.",
Fore.CYAN
))
print()
print(color_text(
"CyMaIS is a product of Kevin Veen-Birkenbach, https://cybermaster.space .\n",
Style.DIM
))
print(color_text(
"Test and use productively on https://cymais.cloud .\n",
Style.DIM
))
print(color_text(
"For commercial use, a license agreement with Kevin Veen-Birkenbach is required. \n",
Style.DIM
))
print(color_text("License: https://s.veen.world/cncl", Style.DIM))
print()
print(color_text("🎉🌈 Happy IT Infrastructuring! 🚀🔧✨", Fore.MAGENTA + Style.BRIGHT))
print()
sys.exit(0)
# Special-case per-command help
if len(sys.argv) >= 3 and sys.argv[1] in available_cli_commands and sys.argv[2] in ('-h', '--help'):
subprocess.run([sys.executable, os.path.join(cli_dir, f"{sys.argv[1]}.py"), "--help"])
sys.exit(0)
# Directory-specific help
if len(args) > 1 and args[-1] in ('-h', '--help'):
dir_parts = args[:-1]
candidate_dir = os.path.join(cli_dir, *dir_parts)
if os.path.isdir(candidate_dir):
print(color_text(
f"Overview of commands in: {'/'.join(dir_parts)}",
Fore.CYAN + Style.BRIGHT
))
print()
for folder, cmd in available:
if folder == "/".join(dir_parts):
desc = extract_description_via_help(
os.path.join(candidate_dir, f"{cmd}.py")
)
print(color_text(format_command_help(cmd, desc, indent=2), ''))
sys.exit(0)
# Execute chosen command
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('cli_command', choices=available_cli_commands)
parser.add_argument('cli_args', nargs=argparse.REMAINDER)
args = parser.parse_args()
# Per-command help
for n in range(len(args), 0, -1):
candidate = os.path.join(cli_dir, *args[:n]) + ".py"
if os.path.isfile(candidate) and len(args) > n and args[n] in ('-h', '--help'):
rel = os.path.relpath(candidate, cli_dir)
module = "cli." + rel[:-3].replace(os.sep, ".")
subprocess.run([sys.executable, "-m", module, args[n]])
sys.exit(0)
cmd_path = os.path.join(cli_dir, f"{args.cli_command}.py")
full_cmd = [sys.executable, cmd_path] + args.cli_args
# Resolve script path
script_path = None
cli_args = []
module = None
for n in range(len(args), 0, -1):
candidate = os.path.join(cli_dir, *args[:n]) + ".py"
if os.path.isfile(candidate):
script_path = candidate
cli_args = args[n:]
rel = os.path.relpath(candidate, cli_dir)
module = "cli." + rel[:-3].replace(os.sep, ".")
break
if not module:
print(color_text(f"Error: command '{' '.join(args)}' not found.", Fore.RED))
sys.exit(1)
log_file = None
if log_enabled:
@@ -152,8 +262,9 @@ if __name__ == "__main__":
timestamp = datetime.now().strftime('%Y%m%dT%H%M%S')
log_file_path = os.path.join(log_dir, f'{timestamp}.log')
log_file = open(log_file_path, 'a', encoding='utf-8')
# 📖 Tip: Check your logs at the path below
print(f"📖 Tip: Log file created at {log_file_path}")
print(color_text(f"Tip: Log file created at {log_file_path}", Fore.GREEN))
full_cmd = [sys.executable, "-m", module] + cli_args
def run_once():
try:
@@ -189,24 +300,22 @@ if __name__ == "__main__":
log_file.close()
if rc != 0:
print(f"Command '{args.cli_command}' failed with exit code {rc}.")
failure_with_warning_loop()
failure_with_warning_loop(no_signal, sound_enabled)
sys.exit(rc)
else:
if not no_sound:
if not no_signal:
Sound.play_finished_successfully_sound()
return True
except Exception as e:
print(f"Exception running command: {e}")
failure_with_warning_loop()
print(color_text(f"Exception running command: {e}", Fore.RED))
failure_with_warning_loop(no_signal, sound_enabled)
sys.exit(1)
if infinite:
# ♾️ Infinite mode activated
print("♾️ Starting infinite execution mode...")
print(color_text("Starting infinite execution mode...", Fore.CYAN))
count = 1
while True:
print(f"🔄 Execution #{count}")
print(color_text(f"Run #{count}", Style.BRIGHT))
run_once()
count += 1
else:

View File

@@ -2,9 +2,9 @@
hosts: all
tasks:
- name: "Load 'constructor' tasks"
include_tasks: "tasks/plays/01_constructor.yml"
include_tasks: "tasks/stages/01_constructor.yml"
- name: "Load '{{host_type}}' tasks"
include_tasks: "tasks/plays/02_{{host_type}}.yml"
include_tasks: "tasks/stages/02_{{host_type}}.yml"
- name: "Load 'destructor' tasks"
include_tasks: "tasks/plays/03_destructor.yml"
include_tasks: "tasks/stages/03_destructor.yml"
become: true

View File

@@ -1,4 +1,6 @@
backup_to_usb_script_path: "/usr/local/sbin/bkp-data-to-usb.python"
backup_to_usb_destination: "{{backup_to_usb_mount}}{{backup_to_usb_destination_subdirectory}}"
backups_folder_path: "{{backup_to_usb_destination}}"
systemctl_mount_service_name: "{{ backup_to_usb_mount | trim('/') | replace('/', '-') }}.mount"
backup_to_usb_script_path: /usr/local/sbin/bkp-data-to-usb.python
backup_to_usb_destination: '{{backup_to_usb_mount}}{{backup_to_usb_destination_subdirectory}}'
backups_folder_path: '{{backup_to_usb_destination}}'
systemctl_mount_service_name: '{{ backup_to_usb_mount | trim(''/'') | replace(''/'',
''-'') }}.mount'
application_id: data-to-usb

View File

@@ -1 +1,2 @@
backup_directory_validator_folder: "{{path_administrator_scripts}}directory-validator/"
backup_directory_validator_folder: '{{path_administrator_scripts}}directory-validator/'
application_id: directory-validator

View File

@@ -1 +1,2 @@
bkp_docker_to_local_pkg: backup-docker-to-local
bkp_docker_to_local_pkg: backup-docker-to-local
application_id: docker-to-local

View File

@@ -1,2 +1,3 @@
authorized_keys_path: "{{ inventory_dir }}/files/{{ inventory_hostname }}/home/backup/.ssh/authorized_keys"
authorized_keys_list: "{{ lookup('file', authorized_keys_path).splitlines() }}"
authorized_keys_path: '{{ inventory_dir }}/files/{{ inventory_hostname }}/home/backup/.ssh/authorized_keys'
authorized_keys_list: '{{ lookup(''file'', authorized_keys_path).splitlines() }}'
application_id: provider-user

View File

@@ -0,0 +1 @@
application_id: provider

View File

@@ -1 +1,2 @@
docker_backup_remote_to_local_folder: "{{path_administrator_scripts}}bkp-remote-to-local/"
docker_backup_remote_to_local_folder: '{{path_administrator_scripts}}bkp-remote-to-local/'
application_id: remote-to-local

View File

@@ -92,7 +92,7 @@ roles:
title: "Backup & Restore"
description: "Backup strategies & restore procedures"
icon: "fas fa-hdd"
invokable: false
invokable: true
update:
title: "Updates & Package Management"
description: "OS & package updates"
@@ -103,3 +103,8 @@ roles:
description: "User accounts & access control"
icon: "fas fa-users"
invokable: false
cln:
title: "Cleanup"
description: "Roles for cleaning up various system resources—old backups, unused certificates, temporary files, Docker volumes, disk caches, deprecated domains, and more."
icon: "fas fa-trash-alt"
invokable: true

View File

@@ -1 +1,2 @@
cleanup_backups_directory: "{{path_administrator_scripts}}cln-backups/"
cleanup_backups_directory: '{{path_administrator_scripts}}cln-backups/'
application_id: backups-service

View File

@@ -0,0 +1 @@
application_id: backups-timer

View File

@@ -0,0 +1 @@
application_id: certs

View File

@@ -1 +1,2 @@
cleanup_disc_space_folder: "{{path_administrator_scripts}}cln-disc-space/"
cleanup_disc_space_folder: '{{path_administrator_scripts}}cln-disc-space/'
application_id: disc-space

View File

@@ -0,0 +1 @@
application_id: docker-anonymous-volumes

View File

@@ -0,0 +1,5 @@
- name: "reload cln-docker-storage-optimizer.cymais.service"
systemd:
name: cln-docker-storage-optimizer.cymais.service
state: reloaded
daemon_reload: yes

View File

@@ -0,0 +1,22 @@
- name: "create {{storage_optimizer_directory}}"
file:
path: "{{storage_optimizer_directory}}"
state: directory
mode: 0755
- name: create cln-docker-storage-optimizer.cymais.service
template:
src: cln-docker-storage-optimizer.service.j2
dest: /etc/systemd/system/cln-docker-storage-optimizer.cymais.service
notify: reload cln-docker-storage-optimizer.cymais.service
- name: create cln-docker-storage-optimizer.py
copy:
src: cln-docker-storage-optimizer.py
dest: "{{storage_optimizer_script}}"
mode: 0755
- name: "optimize storage performance"
systemd:
name: cln-docker-storage-optimizer.cymais.service
state: started

View File

@@ -4,5 +4,5 @@ OnFailure=alert-compose.cymais@%n.service
[Service]
Type=oneshot
ExecStartPre=/bin/sh -c '/usr/bin/python {{ path_system_lock_script }} {{ system_maintenance_services | join(' ') }} --ignore maint-docker-storage-optimizer bkp-remote-to-local --timeout "{{system_maintenance_lock_timeout_storage_optimizer}}"'
ExecStartPre=/bin/sh -c '/usr/bin/python {{ path_system_lock_script }} {{ system_maintenance_services | join(' ') }} --ignore cln-docker-storage-optimizer bkp-remote-to-local --timeout "{{system_maintenance_lock_timeout_storage_optimizer}}"'
ExecStart=/bin/sh -c '/usr/bin/python {{storage_optimizer_script}} --rapid-storage-path {{path_rapid_storage}} --mass-storage-path {{path_mass_storage}}'

View File

@@ -0,0 +1,3 @@
storage_optimizer_directory: '{{path_administrator_scripts}}cln-docker-storage-optimizer/'
storage_optimizer_script: '{{storage_optimizer_directory}}cln-docker-storage-optimizer.py'
application_id: docker-storage-optimizer

View File

@@ -0,0 +1 @@
application_id: domains

View File

@@ -1 +1,2 @@
cln_failed_docker_backups_pkg: cleanup-failed-docker-backups
cln_failed_docker_backups_pkg: cleanup-failed-docker-backups
application_id: failed-docker-backups

View File

@@ -1 +1 @@
application_id: docker
application_id: desk-docker

View File

@@ -17,4 +17,4 @@ galaxy_info:
- git
- configuration
- pacman
- personal-computer
- desktop

View File

@@ -16,7 +16,7 @@
group: administrator
when: run_once_docker is not defined
- name: Set docker_enabled to true, to activate maint-docker-storage-optimizer
- name: Set docker_enabled to true, to activate cln-docker-storage-optimizer
set_fact:
docker_enabled: true
when: run_once_docker is not defined

View File

@@ -1 +1,2 @@
system_btrfs_auto_balancer_folder: "{{path_administrator_scripts}}auto-btrfs-balancer/"
system_btrfs_auto_balancer_folder: '{{path_administrator_scripts}}auto-btrfs-balancer/'
application_id: btrfs-auto-balancer

View File

@@ -1 +1,2 @@
heal_docker: "{{path_administrator_scripts}}maint-docker-heal/"
heal_docker: '{{path_administrator_scripts}}maint-docker-heal/'
application_id: docker-heal

View File

@@ -1,2 +1,3 @@
restart_docker_folder: "{{path_administrator_scripts}}maint-docker-restart/"
restart_docker_script: "{{restart_docker_folder}}maint-docker-restart.py"
restart_docker_folder: '{{path_administrator_scripts}}maint-docker-restart/'
restart_docker_script: '{{restart_docker_folder}}maint-docker-restart.py'
application_id: docker-restart

View File

@@ -1,5 +0,0 @@
- name: "reload maint-docker-storage-optimizer.cymais.service"
systemd:
name: maint-docker-storage-optimizer.cymais.service
state: reloaded
daemon_reload: yes

View File

@@ -1,22 +0,0 @@
- name: "create {{storage_optimizer_directory}}"
file:
path: "{{storage_optimizer_directory}}"
state: directory
mode: 0755
- name: create maint-docker-storage-optimizer.cymais.service
template:
src: maint-docker-storage-optimizer.service.j2
dest: /etc/systemd/system/maint-docker-storage-optimizer.cymais.service
notify: reload maint-docker-storage-optimizer.cymais.service
- name: create maint-docker-storage-optimizer.py
copy:
src: maint-docker-storage-optimizer.py
dest: "{{storage_optimizer_script}}"
mode: 0755
- name: "optimize storage performance"
systemd:
name: maint-docker-storage-optimizer.cymais.service
state: started

View File

@@ -1,2 +0,0 @@
storage_optimizer_directory: "{{path_administrator_scripts}}maint-docker-storage-optimizer/"
storage_optimizer_script: "{{storage_optimizer_directory}}maint-docker-storage-optimizer.py"

View File

@@ -0,0 +1 @@
application_id: lock

View File

@@ -0,0 +1 @@
application_id: swapfile

View File

@@ -0,0 +1 @@
application_id: dns-records

View File

@@ -1,8 +1,5 @@
caa_entries:
- tag: issue
value: "letsencrypt.org"
# - tag: issuewild
# value: "letsencrypt.org"
# - tag: iodef
# value: "mailto:{{ users.administrator.email }}"
base_sld_domains: "{{ current_play_domains_all | generate_base_sld_domains }}"
- tag: issue
value: letsencrypt.org
base_sld_domains: '{{ current_play_domains_all | generate_base_sld_domains }}'
application_id: letsencrypt

View File

@@ -0,0 +1 @@
application_id: wireguard-core

View File

@@ -0,0 +1 @@
application_id: wireguard-firewalled

View File

@@ -0,0 +1 @@
application_id: wireguard-plain

View File

@@ -1 +1 @@
application_id: "mariadb"
application_id: rdbms-mariadb

View File

@@ -1 +1 @@
application_id: postgres
application_id: rdbms-postgres

View File

@@ -0,0 +1 @@
application_id: apt

View File

@@ -1 +1,2 @@
update_docker_script: "{{path_administrator_scripts}}update-docker.py"
update_docker_script: '{{path_administrator_scripts}}update-docker.py'
application_id: docker

View File

@@ -0,0 +1 @@
application_id: pacman

View File

@@ -0,0 +1 @@
application_id: pip

View File

@@ -0,0 +1 @@
application_id: pkgmgr

View File

@@ -0,0 +1 @@
application_id: yay

View File

@@ -1 +1 @@
application_id: desk-browser
application_id: browser

View File

@@ -1 +1 @@
application_id: desk-design
application_id: design

View File

@@ -1 +1 @@
application_id: desk-dev-arduino
application_id: dev-arduino

View File

@@ -1 +1 @@
application_id: desk-dev-core
application_id: dev-core

View File

@@ -1 +1 @@
application_id: desk-dev-java
application_id: dev-java

View File

@@ -1 +1 @@
application_id: desk-dev-php
application_id: dev-php

View File

@@ -1 +1 @@
application_id: desk-dev-python
application_id: dev-python

View File

@@ -1 +1 @@
application_id: desk-dev-shell
application_id: dev-shell

View File

@@ -1 +1 @@
application_id: desk-game-compose
application_id: game-compose

View File

@@ -5,4 +5,4 @@ gamer_default_games:
- gnuchess
- sauerbraten
- mari0
application_id: desk-game-os
application_id: game-os

View File

@@ -1 +1 @@
application_id: desk-game-windows
application_id: game-windows

View File

@@ -1 +1 @@
application_id: desk-office-tools
application_id: office-tools

View File

@@ -1 +1 @@
application_id: srv-corporate-identity
application_id: corporate-identity

View File

@@ -23,6 +23,4 @@ galaxy_info:
issue_tracker_url: "https://github.com/kevinveenbirkenbach/portfolio/issues"
documentation: "https://github.com/kevinveenbirkenbach/portfolio#readme"
logo:
class: "fa-solid fa-briefcase"
run_after:
- web-svc-simpleicons
class: "fa-solid fa-briefcase"

View File

@@ -26,8 +26,9 @@ galaxy_info:
repository: "https://s.veen.world/cymais"
issue_tracker_url: "https://s.veen.world/cymaisissues"
documentation: "https://s.veen.world/cymais"
run_after:
- web-app-matomo
# This propably leads to problems at a point, @todo solve it
# run_after:
# - web-app-matomo
dependencies:
- srv-web-7-6-https
- gen-git

View File

@@ -1,4 +1,4 @@
application_id: simpleicons
application_id: simpleicons
container_port: 3000
simpleicons_host_server_file: "{{docker_compose.directories.config}}server.js"
simpleicons_host_package_file: "{{docker_compose.directories.config}}package.json"

View File

@@ -1,83 +0,0 @@
---
## pc applications
- name: general host setup
when: ("personal_computers" in group_names)
include_role:
name: "{{ item }}"
loop:
- util-gen-admin
- drv-non-free
- name: util-desk-office-tools
when: ("collection_officetools" in group_names)
include_role:
name: "{{ item }}"
loop:
- util-desk-office-tools
- desk-jrnl
- name: personal computer for business
when: ("business_personal_computer" in group_names)
include_role:
name: desk-gnucash
- name: util-desk-design
when: ("collection_designer" in group_names)
include_role:
name: util-desk-design
- name: desk-qbittorrent
when: ("collection_torrent" in group_names)
include_role:
name: desk-qbittorrent
- name: desk-obs
when: ("collection_streamer" in group_names)
include_role:
name: desk-obs
- name: desk-bluray-player
when: ("collection_bluray_player" in group_names)
include_role:
name: desk-bluray-player
- name: GNOME setup
when: ("gnome" in group_names)
include_role:
name: desk-gnome
- name: setup ssh client
when: ("ssh-client" in group_names)
include_role:
name: desk-ssh
- name: setup gaming hosts
when: ("gaming" in group_names)
include_role:
name: util-desk-game-compose
- name: setup entertainment hosts
when: ("entertainment" in group_names)
include_role:
name: desk-spotify
- name: setup torbrowser hosts
when: ("torbrowser" in group_names)
include_role:
name: desk-torbrowser
- name: setup nextcloud-client
when: ("nextcloud_client" in group_names)
include_role:
name: desk-nextcloud-client
- name: setup docker
when: ("docker_client" in group_names)
include_role:
name: desk-docker
# driver
- name: setup msi rgb keyboard
when: ("msi_perkeyrgb" in group_names)
include_role:
name: drv-msi-keyboard-color

View File

@@ -1,15 +0,0 @@
- name: optimize storage performance
include_role:
name: maint-docker-storage-optimizer
when: ('storage-optimizer' | application_allowed(group_names, allowed_applications))
- name: Cleanup Docker Anonymous Volumes
import_role:
name: cln-docker-anonymous-volumes
when: mode_cleanup | bool
- name: Show all facts
debug:
var: ansible_facts
when: enable_debug | bool

View File

@@ -72,7 +72,7 @@
recursive=True
)) |
generate_all_domains(
('www_redirect' in group_names)
('redir-www' in group_names)
)
}}
@@ -101,52 +101,18 @@
name: update
when: mode_update | bool
- name: setup standard wireguard
when: ('wireguard_server' | application_allowed(group_names, allowed_applications))
include_role:
name: net-wireguard-core
# vpn setup
- name: setup wireguard client behind firewall\nat
when: ('wireguard_behind_firewall' | application_allowed(group_names, allowed_applications))
include_role:
name: net-wireguard-firewalled
- name: setup wireguard client
when: ('wireguard_client' | application_allowed(group_names, allowed_applications))
include_role:
name: net-wireguard-plain
## backup setup
- name: setup replica backup hosts
when: ('backup_remote_to_local' | application_allowed(group_names, allowed_applications))
include_role:
name: bkp-remote-to-local
- name: setup backup to swappable
when: ('backup_to_usb' | application_allowed(group_names, allowed_applications))
include_role:
name: bkp-data-to-usb
## driver setup
- name: drv-intel
when: ('intel' | application_allowed(group_names, allowed_applications))
include_role:
name: drv-intel
- name: setup multiprinter hosts
when: ('epson_multiprinter' | application_allowed(group_names, allowed_applications))
include_role:
name: drv-epson-multiprinter
- name: setup hibernate lid switch
when: ('drv-lid-switch' | application_allowed(group_names, allowed_applications))
include_role:
name: drv-lid-switch
## system setup
- name: setup swapfile hosts
when: ('swapfile' | application_allowed(group_names, allowed_applications))
include_role:
name: maint-swapfile
- name: "Load base roles"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- core
- drv
- gen
- net
- alert
- mon
- maint
- update
- bkp
- cln
loop_control:
label: "{{ item }}-roles.yml"

View File

@@ -0,0 +1,15 @@
---
- name: "setup docker role includes for desktop pc"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- util-srv # Services need to run before applications
- util-desk
loop_control:
label: "{{ item }}-roles.yml"
- name: general host setup
include_role:
name: "{{ item }}"
loop:
- util-gen-admin
- drv-non-free

View File

@@ -1,6 +1,5 @@
---
- name: servers host setup
when: ("servers" in group_names)
- name: Setup server base
include_role:
name: "{{ item }}"
loop:
@@ -11,15 +10,10 @@
- mon-bot-btrfs
- maint-btrfs-auto-balancer
- name: "Integrate Docker Role includes"
- name: "Include server roles"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- svc
- web
- web-svc # Services need to run before applications
- web-app
loop_control:
label: "{{ item }}-roles.yml"
- name: "setup corporate identity"
include_role:
name: util-srv-corporate-identity
when: ('corporate_identity' | application_allowed(group_names, allowed_applications))

View File

@@ -0,0 +1,6 @@
- name: "Load destruction roles"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- cln
loop_control:
label: "{{ item }}-roles.yml"

View File

@@ -39,7 +39,7 @@ class TestApplicationIdConsistency(unittest.TestCase):
continue
actual_id = vars_data.get("application_id")
if actual_id != expected_id:
if actual_id not in [expected_id, role_name]:
failed_roles.append((
role_name,
f"application_id is '{actual_id}', expected '{expected_id}'"

View File

@@ -2,7 +2,7 @@ import os
import unittest
# import the functions from your CLI script
from cli.generate_playbook import build_dependency_graph, find_cycle
from cli.generate.conditional_role_include import build_dependency_graph, find_cycle
class TestCircularDependencies(unittest.TestCase):
"""

View File

@@ -0,0 +1,51 @@
import unittest
import os
import sys
import subprocess
class CLIHelpIntegrationTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Projekt-Root ermitteln
cls.project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..')
)
cls.main_py = os.path.join(cls.project_root, 'main.py')
cls.cli_dir = os.path.join(cls.project_root, 'cli')
cls.python = sys.executable
def test_all_cli_commands_help(self):
"""
Iteriere über alle .py Dateien in cli/, baue daraus die
Subcommand-Pfade und prüfe, dass `python main.py <cmd> --help`
mit Exit-Code 0 endet.
"""
for root, _, files in os.walk(self.cli_dir):
for fname in files:
if not fname.endswith('.py') or fname.startswith('__'):
continue
# Bestimme Subcommand-Segmente
rel_dir = os.path.relpath(root, self.cli_dir)
cmd_name = os.path.splitext(fname)[0]
if rel_dir == '.':
segments = [cmd_name]
else:
segments = rel_dir.split(os.sep) + [cmd_name]
with self.subTest(command=' '.join(segments)):
cmd = [self.python, self.main_py] + segments + ['--help', '--no-signal']
result = subprocess.run(
cmd, capture_output=True, text=True
)
self.assertEqual(
result.returncode, 0,
msg=(
f"Command `{ ' '.join(cmd) }` failed\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,47 @@
import os
import sys
import re
import unittest
from cli.meta.applications import find_application_ids
# ensure project root is on PYTHONPATH so we can import your CLI code
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
sys.path.insert(0, ROOT)
class TestGroupApplications(unittest.TestCase):
# regex to capture any literal check in group_names: 'name' in/not in group_names
GROUP_CHECK_RE = re.compile(r"['\"](?P<name>[^'\"]+)['\"]\s*(?:in|not in)\s*group_names")
def test_group_name_checks_use_valid_application_ids(self):
"""
Ensures that any string checked against group_names corresponds to a valid application ID.
"""
valid_apps = find_application_ids()
# walk the entire project tree
for dirpath, _, filenames in os.walk(ROOT):
for filename in filenames:
if not filename.lower().endswith(('.yml', '.yaml')):
continue
filepath = os.path.join(dirpath, filename)
try:
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
except Exception:
continue
# find all group_names checks in the file
for match in self.GROUP_CHECK_RE.finditer(text):
name = match.group('name')
# the checked name must be one of the valid application IDs
self.assertIn(
name,
valid_apps,
msg=(
f"{filepath}: group_names check uses '{name}', "
f"which is not a known application ID {valid_apps}"
)
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,67 @@
import unittest
from pathlib import Path
import re
import os
import sys
# Ensure your project root is on PYTHONPATH so filter_plugins can be imported
ROOT = Path(__file__).parents[2]
sys.path.insert(0, str(ROOT))
from filter_plugins.invokable_paths import get_invokable_paths
STAGES_DIR = ROOT / "tasks" / "stages"
GROUPS_DIR = ROOT / "tasks" / "groups"
class TestMetaRolesIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Use the filter directly
cls.role_files = get_invokable_paths(suffix="-roles.yml")
cls.invokable_items = get_invokable_paths()
# Read all playbook YAML contents under tasks/stages
cls.playbook_contents = {}
for path in STAGES_DIR.rglob("*.yml"):
cls.playbook_contents[path] = path.read_text(encoding="utf-8")
# Regex for include_tasks line with {{ item }}-roles.yml
cls.include_pattern = re.compile(
r'include_tasks:\s*["\']\./tasks/groups/\{\{\s*item\s*\}\}-roles\.yml["\']'
)
def test_all_role_files_exist(self):
"""Each '-roles.yml' path returned by the filter must exist in the project root."""
missing = []
for fname in self.role_files:
path = GROUPS_DIR / fname
if not path.is_file():
missing.append(fname)
self.assertFalse(
missing,
f"The following role files are missing at project root: {missing}"
)
def test_each_invokable_item_referenced_in_playbooks(self):
"""
Each invokable item (without suffix) must be looped through in at least one playbook
and include its corresponding include_tasks entry.
"""
not_referenced = []
for item in self.invokable_items:
found = False
loop_entry = re.compile(rf"-\s*{re.escape(item)}\b")
for content in self.playbook_contents.values():
if self.include_pattern.search(content) and loop_entry.search(content):
found = True
break
if not found:
not_referenced.append(item)
self.assertEqual(
not_referenced, [],
f"The following invokable items are not referenced in any playbook: {not_referenced}"
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,39 @@
import os
import glob
import yaml
import unittest
def find_application_ids():
"""
Scans all roles/*/vars/main.yml files and collects application_id values.
Returns a dict mapping application_id to list of file paths where it appears.
"""
ids = {}
# Wenn der Test unter tests/integration liegt, gehen wir zwei Ebenen hoch zum Projekt-Root
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
pattern = os.path.join(base_dir, "roles", "*", "vars", "main.yml")
for file_path in glob.glob(pattern):
with open(file_path, 'r') as f:
data = yaml.safe_load(f) or {}
app_id = data.get('application_id')
if app_id is not None:
ids.setdefault(app_id, []).append(file_path)
return ids
class TestUniqueApplicationId(unittest.TestCase):
def test_application_ids_are_unique(self):
ids = find_application_ids()
duplicates = {app_id: paths for app_id, paths in ids.items() if len(paths) > 1}
if duplicates:
messages = []
for app_id, paths in duplicates.items():
file_list = '\n '.join(paths)
messages.append(f"application_id '{app_id}' found in multiple files:\n {file_list}")
self.fail("\n\n".join(messages))
if __name__ == '__main__':
unittest.main(verbosity=2)

Some files were not shown because too many files have changed in this diff Show More