Compare commits

..

13 Commits

126 changed files with 969 additions and 650 deletions

View File

@@ -1,12 +1,11 @@
ROLES_DIR := ./roles
APPLICATIONS_OUT := ./group_vars/all/04_applications.yml
APPLICATIONS_SCRIPT := ./cli/generate_applications.py
APPLICATIONS_SCRIPT := ./cli/generate/defaults/applications.py
USERS_OUT := ./group_vars/all/03_users.yml
USERS_SCRIPT := ./cli/generate_users.py
INCLUDES_SCRIPT := ./cli/generate_playbook.py
USERS_SCRIPT := ./cli/generate/defaults/users.py
INCLUDES_SCRIPT := ./cli/generate/conditional_role_include.py
# Define the prefixes for which we want individual role-include files
INCLUDE_GROUPS := "drv-" "svc-" "desk-" "web-" "util-"
INCLUDE_GROUPS := $(shell python3 main.py meta invokable_paths -s "-" --no-signal | tr '\n' ' ')
# Directory where these include-files will be written
INCLUDES_OUT_DIR := ./tasks/groups

View File

@@ -7,7 +7,16 @@ import datetime
import sys
def run_ansible_playbook(inventory, modes, limit=None, allowed_applications=None, password_file=None, verbose=0, skip_tests=False, skip_validation=False):
def run_ansible_playbook(
inventory,
modes,
limit=None,
allowed_applications=None,
password_file=None,
verbose=0,
skip_tests=False,
skip_validation=False
):
start_time = datetime.datetime.now()
print(f"\n▶️ Script started at: {start_time.isoformat()}\n")
@@ -17,32 +26,40 @@ def run_ansible_playbook(inventory, modes, limit=None, allowed_applications=None
script_dir = os.path.dirname(os.path.realpath(__file__))
playbook = os.path.join(os.path.dirname(script_dir), "playbook.yml")
# Inventory validation step
if not skip_validation:
print("\n🔍 Validating inventory before deployment...\n")
try:
subprocess.run(
[sys.executable, os.path.join(script_dir, "validate_inventory.py"), os.path.dirname(inventory)],
[sys.executable,
os.path.join(script_dir, "validate.inventory.py"),
os.path.dirname(inventory)
],
check=True
)
except subprocess.CalledProcessError:
print("\n❌ Inventory validation failed. Deployment aborted.\n", file=sys.stderr)
print(
"\n❌ Inventory validation failed. Deployment aborted.\n",
file=sys.stderr
)
sys.exit(1)
else:
print("\n⚠️ Skipping inventory validation as requested.\n")
if not skip_tests:
print("\n🧪 Running tests (make test)...\n")
subprocess.run(["make", "test"], check=True)
# Build ansible-playbook command
cmd = ["ansible-playbook", "-i", inventory, playbook]
if limit:
cmd.extend(["--limit", limit])
# Pass application IDs parameter as extra var if provided
if allowed_applications:
joined = ",".join(allowed_applications)
cmd.extend(["-e", f"allowed_applications={joined}"])
# Pass other mode flags
for key, value in modes.items():
val = str(value).lower() if isinstance(value, bool) else str(value)
cmd.extend(["-e", f"{key}={val}"])
@@ -75,55 +92,56 @@ def main():
help="Path to the inventory file (INI or YAML) containing hosts and variables."
)
parser.add_argument(
"--limit",
"-l", "--limit",
help="Restrict execution to a specific host or host group from the inventory."
)
parser.add_argument(
"--host-type",
choices=["server", "personal-computer"],
"-T", "--host-type",
choices=["server", "desktop"],
default="server",
help="Specify whether the target is a server or a personal computer. Affects role selection and variables."
)
parser.add_argument(
"--reset", action="store_true",
"-r", "--reset", action="store_true",
help="Reset all CyMaIS files and configurations, and run the entire playbook (not just individual roles)."
)
parser.add_argument(
"--test", action="store_true",
"-t", "--test", action="store_true",
help="Run test routines instead of production tasks. Useful for local testing and CI pipelines."
)
parser.add_argument(
"--update", action="store_true",
"-u", "--update", action="store_true",
help="Enable the update procedure to bring software and roles up to date."
)
parser.add_argument(
"--backup", action="store_true",
"-b", "--backup", action="store_true",
help="Perform a full backup of critical data and configurations before the update process."
)
parser.add_argument(
"--cleanup", action="store_true",
"-c", "--cleanup", action="store_true",
help="Clean up unused files and outdated configurations after all tasks are complete."
)
parser.add_argument(
"--debug", action="store_true",
"-d", "--debug", action="store_true",
help="Enable detailed debug output for Ansible and this script."
)
parser.add_argument(
"--password-file",
"-p", "--password-file",
help="Path to the file containing the Vault password. If not provided, prompts for the password interactively."
)
parser.add_argument(
"--skip-tests", action="store_true",
"-s", "--skip-tests", action="store_true",
help="Skip running 'make test' even if tests are normally enabled."
)
parser.add_argument(
"--skip-validation", action="store_true",
"-V", "--skip-validation", action="store_true",
help="Skip inventory validation before deployment."
)
parser.add_argument(
"--id",
"-i", "--id",
nargs="+",
default=[],
dest="id",
help="List of application_id's for partial deploy. If not set, all application IDs defined in the inventory will be executed."
)
parser.add_argument(
@@ -152,8 +170,8 @@ def main():
verbose=args.verbose,
skip_tests=args.skip_tests,
skip_validation=args.skip_validation
)
if __name__ == "__main__":
main()

47
cli/fix/ini_py.py Normal file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
This script creates __init__.py files in every subdirectory under the specified
folder relative to the project root.
"""
import os
import argparse
def create_init_files(root_folder):
"""
Walk through all subdirectories of root_folder and create an __init__.py file
in each directory if it doesn't already exist.
"""
for dirpath, dirnames, filenames in os.walk(root_folder):
init_file = os.path.join(dirpath, '__init__.py')
if not os.path.exists(init_file):
open(init_file, 'w').close()
print(f"Created: {init_file}")
else:
print(f"Skipped (already exists): {init_file}")
def main():
parser = argparse.ArgumentParser(
description='Create __init__.py files in every subdirectory.'
)
parser.add_argument(
'folder',
help='Relative path to the target folder (e.g., cli/fix)'
)
args = parser.parse_args()
# Determine the absolute path based on the current working directory
root_folder = os.path.abspath(args.folder)
if not os.path.isdir(root_folder):
print(f"Error: The folder '{args.folder}' does not exist or is not a directory.")
exit(1)
create_init_files(root_folder)
if __name__ == '__main__':
main()

View File

@@ -8,6 +8,10 @@ import sys
import yaml
from pathlib import Path
# Directory containing roles; can be overridden by tests
MODULE_DIR = Path(__file__).resolve().parent
ROLES_DIR = (MODULE_DIR.parent.parent / "roles").resolve()
def process_role(role_dir: Path, prefix: str, preview: bool, overwrite: bool):
name = role_dir.name
if not name.startswith(prefix):
@@ -50,6 +54,15 @@ def process_role(role_dir: Path, prefix: str, preview: bool, overwrite: bool):
print(f"Created {vars_file} with application_id: {expected_id}")
def run(prefix: str, preview: bool = False, overwrite: bool = False):
"""
Ensure vars/main.yml for roles under ROLES_DIR with the given prefix has correct application_id.
"""
for role in sorted(Path(ROLES_DIR).iterdir()):
if role.is_dir():
process_role(role, prefix, preview, overwrite)
def main():
parser = argparse.ArgumentParser(
description="Ensure vars/main.yml for roles with a given prefix has correct application_id"
@@ -68,16 +81,9 @@ def main():
)
args = parser.parse_args()
# Determine roles directory relative to this script
script_dir = Path(__file__).resolve().parent
roles_dir = (script_dir.parent / "roles").resolve()
if not roles_dir.is_dir():
print(f"Roles directory not found: {roles_dir}", file=sys.stderr)
sys.exit(1)
# Run processing
run(prefix=args.prefix, preview=args.preview, overwrite=args.overwrite)
for role in sorted(roles_dir.iterdir()):
if role.is_dir():
process_role(role, args.prefix, args.preview, args.overwrite)
if __name__ == "__main__":
main()

View File

@@ -156,7 +156,7 @@ def print_dependency_tree(graph):
for root in roots:
print_node(root)
def generate_playbook_entries(roles_dir, prefixes=None):
def gen_condi_role_incl(roles_dir, prefixes=None):
"""
Generate playbook entries based on the sorted order.
Raises a ValueError if application_id is missing.
@@ -209,7 +209,7 @@ def main():
print_dependency_tree(graph)
sys.exit(0)
entries = generate_playbook_entries(args.roles_dir, prefixes)
entries = gen_condi_role_incl(args.roles_dir, prefixes)
output = ''.join(entries)
if args.output:

View File

View File

@@ -6,7 +6,7 @@ import yaml
import sys
from pathlib import Path
plugin_path = Path(__file__).resolve().parent / ".." / "lookup_plugins"
plugin_path = Path(__file__).resolve().parent / ".." / ".." / ".." /"lookup_plugins"
sys.path.insert(0, str(plugin_path))
from application_gid import LookupModule

0
cli/meta/__init__.py Normal file
View File

49
cli/meta/applications.py Normal file
View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import argparse
import glob
import os
import sys
try:
import yaml
except ImportError:
sys.stderr.write("PyYAML is required. Install with `pip install pyyaml`.\n")
sys.exit(1)
def find_application_ids():
"""
Searches all files matching roles/*/vars/main.yml for the key 'application_id'
and returns a list of all found IDs.
"""
pattern = os.path.join('roles', '*', 'vars', 'main.yml')
app_ids = []
for filepath in glob.glob(pattern):
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
sys.stderr.write(f"Error reading {filepath}: {e}\n")
continue
if isinstance(data, dict) and 'application_id' in data:
app_ids.append(data['application_id'])
return sorted(set(app_ids))
def main():
parser = argparse.ArgumentParser(
description='Output a list of all application_id values defined in roles/*/vars/main.yml'
)
# No arguments other than --help
parser.parse_args()
ids = find_application_ids()
for app_id in ids:
print(app_id)
if __name__ == '__main__':
main()

View File

@@ -1,105 +0,0 @@
import argparse
import subprocess
from ansible.parsing.vault import VaultLib, VaultSecret
import sys
import yaml
import re
from utils.handler.vault import VaultScalar
from yaml.loader import SafeLoader
from yaml.dumper import SafeDumper
# Register the custom constructor and representer for VaultScalar in PyYAML
SafeLoader.add_constructor('!vault', lambda loader, node: VaultScalar(node.value))
SafeDumper.add_representer(VaultScalar, lambda dumper, data: dumper.represent_scalar('!vault', data))
def is_vault_encrypted_data(data: str) -> bool:
"""Check if the given data is encrypted with Ansible Vault by looking for the vault header."""
return data.lstrip().startswith('$ANSIBLE_VAULT')
def decrypt_vault_data(encrypted_data: str, vault_secret: VaultSecret) -> str:
"""
Decrypt the given encrypted data using the provided vault_secret.
:param encrypted_data: Encrypted string to be decrypted
:param vault_secret: The VaultSecret instance used to decrypt the data
:return: Decrypted data as a string
"""
vault = VaultLib()
decrypted_data = vault.decrypt(encrypted_data, vault_secret)
return decrypted_data
def decrypt_vault_file(vault_file: str, vault_password_file: str):
"""
Decrypt the Ansible Vault file and return its contents.
:param vault_file: Path to the encrypted Ansible Vault file
:param vault_password_file: Path to the file containing the Vault password
:return: Decrypted contents of the Vault file
"""
# Read the vault password
with open(vault_password_file, 'r') as f:
vault_password = f.read().strip()
# Create a VaultSecret instance from the password
vault_secret = VaultSecret(vault_password.encode())
# Read the encrypted file
with open(vault_file, 'r') as f:
file_content = f.read()
# If the file is partially encrypted, we'll decrypt only the encrypted values
decrypted_data = file_content # Start with the unmodified content
# Find all vault-encrypted values (i.e., values starting with $ANSIBLE_VAULT)
encrypted_values = re.findall(r'^\s*([\w\.\-_]+):\s*["\']?\$ANSIBLE_VAULT[^\n]+', file_content, flags=re.MULTILINE)
# If there are encrypted values, decrypt them
for value in encrypted_values:
# Extract the encrypted value and decrypt it
encrypted_value = re.search(r'(["\']?\$ANSIBLE_VAULT[^\n]+)', value)
if encrypted_value:
# Remove any newlines or extra spaces from the encrypted value
encrypted_value = encrypted_value.group(0).replace('\n', '').replace('\r', '')
decrypted_value = decrypt_vault_data(encrypted_value, vault_secret)
# Replace the encrypted value with the decrypted value in the content
decrypted_data = decrypted_data.replace(encrypted_value, decrypted_value.strip())
return decrypted_data
def decrypt_and_display(vault_file: str, vault_password_file: str):
"""
Decrypts the Ansible Vault file and its values, then display the result.
Supports both full file and partial value encryption.
:param vault_file: Path to the encrypted Ansible Vault file
:param vault_password_file: Path to the file containing the Vault password
"""
decrypted_data = decrypt_vault_file(vault_file, vault_password_file)
# Convert the decrypted data to a string format (YAML or JSON)
output_data = yaml.dump(yaml.safe_load(decrypted_data), default_flow_style=False)
# Use subprocess to call `less` for paginated, scrollable output
subprocess.run(["less"], input=output_data, text=True)
def main():
# Set up the argument parser
parser = argparse.ArgumentParser(description="Decrypt and display variables from an Ansible Vault file.")
# Add arguments for the vault file and vault password file
parser.add_argument(
'vault_file',
type=str,
help="Path to the encrypted Ansible Vault file"
)
parser.add_argument(
'vault_password_file',
type=str,
help="Path to the file containing the Vault password"
)
# Parse the arguments
args = parser.parse_args()
# Display vault variables in a scrollable manner
decrypt_and_display(args.vault_file, args.vault_password_file)
if __name__ == "__main__":
main()

0
cli/validate/__init__.py Normal file
View File

154
cli/validate/inventory.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
import argparse
import sys
import yaml
import re
from pathlib import Path
# Ensure imports work when run directly
script_dir = Path(__file__).resolve().parent
repo_root = script_dir.parent.parent
sys.path.insert(0, str(repo_root))
from cli.meta.applications import find_application_ids
def load_yaml_file(path):
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
content = re.sub(r'(?m)^([ \t]*[^\s:]+):\s*!vault[\s\S]+?(?=^\S|\Z)', r"\1: \"<vaulted>\"\n", content)
return yaml.safe_load(content)
except Exception as e:
print(f"Warning: Could not parse {path}: {e}", file=sys.stderr)
return None
def recursive_keys(d, prefix=''):
keys = set()
if isinstance(d, dict):
for k, v in d.items():
full = f"{prefix}.{k}" if prefix else k
keys.add(full)
keys.update(recursive_keys(v, full))
return keys
def compare_application_keys(applications, defaults, source):
errs = []
for app_id, conf in applications.items():
if app_id not in defaults:
errs.append(f"{source}: Unknown application '{app_id}' (not in defaults_applications)")
continue
default = defaults[app_id]
app_keys = recursive_keys(conf)
def_keys = recursive_keys(default)
for key in app_keys:
if key.startswith('credentials'):
continue
if key not in def_keys:
errs.append(f"{source}: Missing default for {app_id}: {key}")
return errs
def compare_user_keys(users, default_users, source):
errs = []
for user, conf in users.items():
if user not in default_users:
print(f"Warning: {source}: Unknown user '{user}' (not in default_users)", file=sys.stderr)
continue
def_conf = default_users[user]
for key in conf:
if key in ('password','credentials','mailu_token'):
continue
if key not in def_conf:
errs.append(f"Missing default for user '{user}': key '{key}'")
return errs
def load_inventory_files(inv_dir):
all_data = {}
p = Path(inv_dir)
for f in p.glob('*.yml'):
data = load_yaml_file(f)
if isinstance(data, dict):
apps = data.get('applications') or data.get('defaults_applications')
if apps:
all_data[str(f)] = apps
for d in p.glob('*_vars'):
if d.is_dir():
for f in d.rglob('*.yml'):
data = load_yaml_file(f)
if isinstance(data, dict):
apps = data.get('applications') or data.get('defaults_applications')
if apps:
all_data[str(f)] = apps
return all_data
def validate_host_keys(app_ids, inv_dir):
errs = []
p = Path(inv_dir)
# Scan all top-level YAMLs for 'all.children'
for f in p.glob('*.yml'):
data = load_yaml_file(f)
if not isinstance(data, dict):
continue
all_node = data.get('all', {})
children = all_node.get('children')
if not isinstance(children, dict):
continue
for grp in children.keys():
if grp not in app_ids:
errs.append(f"{f}: Invalid group '{grp}' (not in application_ids)")
return errs
def find_single_file(pattern):
c = list(Path('group_vars/all').glob(pattern))
if len(c)!=1:
raise RuntimeError(f"Expected exactly one {pattern} in group_vars/all, found {len(c)}")
return c[0]
def main():
p = argparse.ArgumentParser()
p.add_argument('inventory_dir')
args = p.parse_args()
# defaults
dfile = find_single_file('*_applications.yml')
ufile = find_single_file('*users.yml')
ddata = load_yaml_file(dfile) or {}
udata = load_yaml_file(ufile) or {}
defaults = ddata.get('defaults_applications',{})
default_users = udata.get('default_users',{})
if not defaults:
print(f"Error: No 'defaults_applications' found in {dfile}", file=sys.stderr)
sys.exit(1)
if not default_users:
print(f"Error: No 'default_users' found in {ufile}", file=sys.stderr)
sys.exit(1)
app_errs = []
inv_files = load_inventory_files(args.inventory_dir)
for src, apps in inv_files.items():
app_errs.extend(compare_application_keys(apps, defaults, src))
user_errs = []
for fpath in Path(args.inventory_dir).rglob('*.yml'):
data = load_yaml_file(fpath)
if isinstance(data, dict) and 'users' in data:
errs = compare_user_keys(data['users'], default_users, str(fpath))
for e in errs:
print(e, file=sys.stderr)
user_errs.extend(errs)
host_errs = validate_host_keys(find_application_ids(), args.inventory_dir)
app_errs.extend(host_errs)
if app_errs or user_errs:
if app_errs:
print('Validation failed with the following issues:')
for e in app_errs:
print(f"- {e}")
sys.exit(1)
print('Inventory directory is valid against defaults and hosts.')
sys.exit(0)
if __name__=='__main__':
main()

View File

@@ -1,144 +0,0 @@
#!/usr/bin/env python3
import argparse
import sys
import yaml
import re
from pathlib import Path
def load_yaml_file(path):
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
content = re.sub(r'(?m)^([ \t]*[^\s:]+):\s*!vault[\s\S]+?(?=^\S|\Z)', r'\1: "<vaulted>"\n', content)
return yaml.safe_load(content)
except Exception as e:
print(f"Warning: Could not parse {path}: {e}", file=sys.stderr)
return None
def recursive_keys(d, prefix=""):
keys = set()
if isinstance(d, dict):
for k, v in d.items():
full_key = f"{prefix}.{k}" if prefix else k
keys.add(full_key)
keys.update(recursive_keys(v, full_key))
return keys
def compare_application_keys(applications, defaults, source_file):
errors = []
for app_id, app_conf in applications.items():
if app_id not in defaults:
errors.append(f"{source_file}: Unknown application '{app_id}' (not in defaults_applications)")
continue
default_conf = defaults.get(app_id, {})
app_keys = recursive_keys(app_conf)
default_keys = recursive_keys(default_conf)
for key in app_keys:
if key.startswith("credentials"):
continue # explicitly ignore credentials
if key not in default_keys:
errors.append(f"{source_file}: Missing default for {app_id}: {key}")
return errors
def compare_user_keys(users, default_users, source_file):
errors = []
for username, user_conf in users.items():
if username not in default_users:
print(f"Warning: {source_file}: Unknown user '{username}' (not in default_users)", file=sys.stderr)
continue
default_conf = default_users.get(username, {})
for key in user_conf:
if key in ("password", "credentials", "mailu_token"):
continue # ignore credentials/password
if key not in default_conf:
raise Exception(f"{source_file}: Missing default for user '{username}': key '{key}'")
return errors
def load_inventory_files(inventory_dir):
all_data = {}
inventory_path = Path(inventory_dir)
for path in inventory_path.glob("*.yml"):
data = load_yaml_file(path)
if isinstance(data, dict):
applications = data.get("applications") or data.get("defaults_applications")
if applications:
all_data[path] = applications
for vars_folder in inventory_path.glob("*_vars"):
if vars_folder.is_dir():
for subfile in vars_folder.rglob("*.yml"):
data = load_yaml_file(subfile)
if isinstance(data, dict):
applications = data.get("applications") or data.get("defaults_applications")
if applications:
all_data[subfile] = applications
return all_data
def find_single_file(pattern):
candidates = list(Path("group_vars/all").glob(pattern))
if len(candidates) != 1:
raise RuntimeError(f"Expected exactly one {pattern} file in group_vars/all, found {len(candidates)}")
return candidates[0]
def main():
parser = argparse.ArgumentParser(description="Verify application and user variable consistency with defaults.")
parser.add_argument("inventory_dir", help="Path to inventory directory (contains inventory.yml and *_vars/)")
args = parser.parse_args()
defaults_path = find_single_file("*_applications.yml")
users_path = find_single_file("*users.yml")
defaults_data = load_yaml_file(defaults_path)
default_users_data = load_yaml_file(users_path)
defaults = defaults_data.get("defaults_applications", {}) if defaults_data else {}
default_users = default_users_data.get("default_users", {}) if default_users_data else {}
if not defaults:
print(f"Error: No 'defaults_applications' found in {defaults_path}.", file=sys.stderr)
sys.exit(1)
if not default_users:
print(f"Error: No 'default_users' found in {users_path}.", file=sys.stderr)
sys.exit(1)
all_errors = []
inventory_files = load_inventory_files(args.inventory_dir)
for source_path, app_data in inventory_files.items():
errors = compare_application_keys(app_data, defaults, str(source_path))
all_errors.extend(errors)
# Load all users.yml files from inventory
for path in Path(args.inventory_dir).rglob("*.yml"):
data = load_yaml_file(path)
if isinstance(data, dict) and "users" in data:
try:
compare_user_keys(data["users"], default_users, str(path))
except Exception as e:
print(e, file=sys.stderr)
sys.exit(1)
if all_errors:
print("Validation failed with the following issues:")
for err in all_errors:
print("-", err)
sys.exit(1)
else:
print("Inventory directory is valid against defaults.")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -28,7 +28,7 @@ system_maintenance_cleanup_services:
system_maintenance_manipulation_services:
- "maint-docker-heal"
- "update-docker"
- "maint-docker-storage-optimizer"
- "cln-docker-storage-optimizer"
- "maint-docker-restart"
## Total System Maintenance Services

255
main.py
View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
import argparse
import os
import subprocess
import sys
@@ -10,7 +9,21 @@ import signal
from datetime import datetime
import pty
from cli.sounds import Sound
# Color support
try:
from colorama import init as colorama_init, Fore, Back, Style
colorama_init(autoreset=True)
except ImportError:
class Dummy:
def __getattr__(self, name): return ''
Fore = Back = Style = Dummy()
from cli.sounds import Sound # ensure Sound imported
def color_text(text, color):
return f"{color}{text}{Style.RESET_ALL}"
def format_command_help(name, description, indent=2, col_width=36, width=80):
prefix = " " * indent + f"{name:<{col_width - indent}}"
@@ -21,16 +34,41 @@ def format_command_help(name, description, indent=2, col_width=36, width=80):
)
return wrapper.fill(description)
def list_cli_commands(cli_dir):
return sorted(
os.path.splitext(f.name)[0] for f in os.scandir(cli_dir)
if f.is_file() and f.name.endswith(".py") and not f.name.startswith("__")
)
"""Recursively list all .py files under cli_dir that use argparse (without .py)."""
cmds = []
for root, _, files in os.walk(cli_dir):
for f in files:
if not f.endswith(".py") or f.startswith("__"):
continue
path = os.path.join(root, f)
try:
with open(path, 'r', encoding='utf-8') as fh:
content = fh.read()
if 'argparse' not in content:
continue
except Exception:
continue
rel_dir = os.path.relpath(root, cli_dir)
name = os.path.splitext(f)[0]
if rel_dir == ".":
cmd = (None, name)
else:
cmd = (rel_dir.replace(os.sep, "/"), name)
cmds.append(cmd)
return sorted(cmds, key=lambda x: (x[0] or "", x[1]))
def extract_description_via_help(cli_script_path):
try:
script_dir = os.path.dirname(os.path.realpath(__file__))
cli_dir = os.path.join(script_dir, "cli")
rel = os.path.relpath(cli_script_path, cli_dir)
module = "cli." + rel[:-3].replace(os.sep, ".")
result = subprocess.run(
[sys.executable, cli_script_path, "--help"],
[sys.executable, "-m", module, "--help"],
capture_output=True,
text=True,
check=True
@@ -39,7 +77,7 @@ def extract_description_via_help(cli_script_path):
for i, line in enumerate(lines):
if line.strip().startswith("usage:"):
continue
if line.strip() == "":
if not line.strip():
for j in range(i+1, len(lines)):
desc = lines[j].strip()
if desc:
@@ -48,102 +86,174 @@ def extract_description_via_help(cli_script_path):
except Exception:
return "-"
def git_clean_repo():
"""Remove all Git-ignored files and directories in the current repository."""
subprocess.run(['git', 'clean', '-Xfd'], check=True)
def play_start_intro():
Sound.play_start_sound()
Sound.play_cymais_intro_sound()
def failure_with_warning_loop():
def failure_with_warning_loop(no_signal, sound_enabled):
if not no_signal:
Sound.play_finished_failed_sound()
print("Warning: command failed. Press Ctrl+C to stop sound warnings.")
print(color_text("Warning: command failed. Press Ctrl+C to stop warnings.", Fore.RED))
try:
while True:
if not no_signal:
Sound.play_warning_sound()
except KeyboardInterrupt:
print("Warnings stopped by user.")
print(color_text("Warnings stopped by user.", Fore.YELLOW))
from cli.sounds import Sound # ensure Sound imported
if __name__ == "__main__":
# Parse special flags early and remove from args
no_sound = False
log_enabled = False
git_clean = False
infinite = False
if '--no-sound' in sys.argv:
no_sound = True
sys.argv.remove('--no-sound')
if '--log' in sys.argv:
log_enabled = True
sys.argv.remove('--log')
if '--git-clean' in sys.argv:
git_clean = True
sys.argv.remove('--git-clean')
if '--infinite' in sys.argv:
infinite = True
sys.argv.remove('--infinite')
# Parse flags
sound_enabled = '--sound' in sys.argv and (sys.argv.remove('--sound') or True)
no_signal = '--no-signal' in sys.argv and (sys.argv.remove('--no-signal') or True)
log_enabled = '--log' in sys.argv and (sys.argv.remove('--log') or True)
git_clean = '--git-clean' in sys.argv and (sys.argv.remove('--git-clean') or True)
infinite = '--infinite' in sys.argv and (sys.argv.remove('--infinite') or True)
# Setup segfault handler to catch crashes
# Segfault handler
def segv_handler(signum, frame):
if not no_sound:
if not no_signal:
Sound.play_finished_failed_sound()
try:
while True:
Sound.play_warning_sound()
except KeyboardInterrupt:
pass
print("Segmentation fault detected. Exiting.")
print(color_text("Segmentation fault detected. Exiting.", Fore.RED))
sys.exit(1)
signal.signal(signal.SIGSEGV, segv_handler)
# Play intro sounds
if not no_sound:
# Play intro melody if requested
if sound_enabled:
threading.Thread(target=play_start_intro, daemon=True).start()
# Change to script directory
script_dir = os.path.dirname(os.path.realpath(__file__))
cli_dir = os.path.join(script_dir, "cli")
os.chdir(script_dir)
# If requested, clean git-ignored files
if git_clean:
git_clean_repo()
available_cli_commands = list_cli_commands(cli_dir)
# Collect available commands
available = list_cli_commands(cli_dir)
args = sys.argv[1:]
# Handle help invocation
if len(sys.argv) == 1 or sys.argv[1] in ('-h', '--help'):
print("CyMaIS CLI proxy to tools in ./cli/")
print("Usage: cymais [--no-sound] [--log] [--git-clean] [--infinite] <command> [options]")
print("Options:")
print(" --no-sound Suppress all sounds during execution")
print(" --log Log all proxied command output to logfile.log")
print(" --git-clean Remove all Git-ignored files before running")
print(" --infinite Run the proxied command in an infinite loop")
print(" -h, --help Show this help message and exit")
print("Available commands:")
for cmd in available_cli_commands:
path = os.path.join(cli_dir, f"{cmd}.py")
desc = extract_description_via_help(path)
print(format_command_help(cmd, desc))
# Global help
if not args or args[0] in ('-h', '--help'):
print(color_text("CyMaIS CLI 🦫🌐🖥️", Fore.CYAN + Style.BRIGHT))
print()
print(color_text("Your Gateway to Automated IT Infrastructure Setup", Style.DIM))
print()
print(color_text(
"Usage: cymais [--sound] [--no-signal] [--log] [--git-clean] [--infinite] <command> [options]",
Fore.GREEN
))
print()
# Use bright style for headings
print(color_text("Options:", Style.BRIGHT))
print(color_text(" --sound Play startup melody and warning sounds", Fore.YELLOW))
print(color_text(" --no-signal Suppress success/failure signals", Fore.YELLOW))
print(color_text(" --log Log all proxied command output to logfile.log", Fore.YELLOW))
print(color_text(" --git-clean Remove all Git-ignored files before running", Fore.YELLOW))
print(color_text(" --infinite Run the proxied command in an infinite loop", Fore.YELLOW))
print(color_text(" -h, --help Show this help message and exit", Fore.YELLOW))
print()
print(color_text("Available commands:", Style.BRIGHT))
print()
current_folder = None
for folder, cmd in available:
if folder != current_folder:
if folder:
print(color_text(f"{folder}/", Fore.MAGENTA))
print()
current_folder = folder
desc = extract_description_via_help(
os.path.join(cli_dir, *(folder.split('/') if folder else []), f"{cmd}.py")
)
print(color_text(format_command_help(cmd, desc, indent=2), ''), "\n")
print()
print(color_text(
"🔗 You can chain subcommands by specifying nested directories,",
Fore.CYAN
))
print(color_text(
" e.g. `cymais generate defaults applications` →",
Fore.CYAN
))
print(color_text(
" corresponds to `cli/generate/defaults/applications.py`.",
Fore.CYAN
))
print()
print(color_text(
"CyMaIS is a product of Kevin Veen-Birkenbach, https://cybermaster.space .\n",
Style.DIM
))
print(color_text(
"Test and use productively on https://cymais.cloud .\n",
Style.DIM
))
print(color_text(
"For commercial use, a license agreement with Kevin Veen-Birkenbach is required. \n",
Style.DIM
))
print(color_text("License: https://s.veen.world/cncl", Style.DIM))
print()
print(color_text("🎉🌈 Happy IT Infrastructuring! 🚀🔧✨", Fore.MAGENTA + Style.BRIGHT))
print()
sys.exit(0)
# Special-case per-command help
if len(sys.argv) >= 3 and sys.argv[1] in available_cli_commands and sys.argv[2] in ('-h', '--help'):
subprocess.run([sys.executable, os.path.join(cli_dir, f"{sys.argv[1]}.py"), "--help"])
# Directory-specific help
if len(args) > 1 and args[-1] in ('-h', '--help'):
dir_parts = args[:-1]
candidate_dir = os.path.join(cli_dir, *dir_parts)
if os.path.isdir(candidate_dir):
print(color_text(
f"Overview of commands in: {'/'.join(dir_parts)}",
Fore.CYAN + Style.BRIGHT
))
print()
for folder, cmd in available:
if folder == "/".join(dir_parts):
desc = extract_description_via_help(
os.path.join(candidate_dir, f"{cmd}.py")
)
print(color_text(format_command_help(cmd, desc, indent=2), ''))
sys.exit(0)
# Execute chosen command
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('cli_command', choices=available_cli_commands)
parser.add_argument('cli_args', nargs=argparse.REMAINDER)
args = parser.parse_args()
# Per-command help
for n in range(len(args), 0, -1):
candidate = os.path.join(cli_dir, *args[:n]) + ".py"
if os.path.isfile(candidate) and len(args) > n and args[n] in ('-h', '--help'):
rel = os.path.relpath(candidate, cli_dir)
module = "cli." + rel[:-3].replace(os.sep, ".")
subprocess.run([sys.executable, "-m", module, args[n]])
sys.exit(0)
cmd_path = os.path.join(cli_dir, f"{args.cli_command}.py")
full_cmd = [sys.executable, cmd_path] + args.cli_args
# Resolve script path
script_path = None
cli_args = []
module = None
for n in range(len(args), 0, -1):
candidate = os.path.join(cli_dir, *args[:n]) + ".py"
if os.path.isfile(candidate):
script_path = candidate
cli_args = args[n:]
rel = os.path.relpath(candidate, cli_dir)
module = "cli." + rel[:-3].replace(os.sep, ".")
break
if not module:
print(color_text(f"Error: command '{' '.join(args)}' not found.", Fore.RED))
sys.exit(1)
log_file = None
if log_enabled:
@@ -152,8 +262,9 @@ if __name__ == "__main__":
timestamp = datetime.now().strftime('%Y%m%dT%H%M%S')
log_file_path = os.path.join(log_dir, f'{timestamp}.log')
log_file = open(log_file_path, 'a', encoding='utf-8')
# 📖 Tip: Check your logs at the path below
print(f"📖 Tip: Log file created at {log_file_path}")
print(color_text(f"Tip: Log file created at {log_file_path}", Fore.GREEN))
full_cmd = [sys.executable, "-m", module] + cli_args
def run_once():
try:
@@ -189,24 +300,22 @@ if __name__ == "__main__":
log_file.close()
if rc != 0:
print(f"Command '{args.cli_command}' failed with exit code {rc}.")
failure_with_warning_loop()
failure_with_warning_loop(no_signal, sound_enabled)
sys.exit(rc)
else:
if not no_sound:
if not no_signal:
Sound.play_finished_successfully_sound()
return True
except Exception as e:
print(f"Exception running command: {e}")
failure_with_warning_loop()
print(color_text(f"Exception running command: {e}", Fore.RED))
failure_with_warning_loop(no_signal, sound_enabled)
sys.exit(1)
if infinite:
# ♾️ Infinite mode activated
print("♾️ Starting infinite execution mode...")
print(color_text("Starting infinite execution mode...", Fore.CYAN))
count = 1
while True:
print(f"🔄 Execution #{count}")
print(color_text(f"Run #{count}", Style.BRIGHT))
run_once()
count += 1
else:

View File

@@ -2,9 +2,9 @@
hosts: all
tasks:
- name: "Load 'constructor' tasks"
include_tasks: "tasks/plays/01_constructor.yml"
include_tasks: "tasks/stages/01_constructor.yml"
- name: "Load '{{host_type}}' tasks"
include_tasks: "tasks/plays/02_{{host_type}}.yml"
include_tasks: "tasks/stages/02_{{host_type}}.yml"
- name: "Load 'destructor' tasks"
include_tasks: "tasks/plays/03_destructor.yml"
include_tasks: "tasks/stages/03_destructor.yml"
become: true

View File

@@ -1,4 +1,6 @@
backup_to_usb_script_path: "/usr/local/sbin/bkp-data-to-usb.python"
backup_to_usb_destination: "{{backup_to_usb_mount}}{{backup_to_usb_destination_subdirectory}}"
backups_folder_path: "{{backup_to_usb_destination}}"
systemctl_mount_service_name: "{{ backup_to_usb_mount | trim('/') | replace('/', '-') }}.mount"
backup_to_usb_script_path: /usr/local/sbin/bkp-data-to-usb.python
backup_to_usb_destination: '{{backup_to_usb_mount}}{{backup_to_usb_destination_subdirectory}}'
backups_folder_path: '{{backup_to_usb_destination}}'
systemctl_mount_service_name: '{{ backup_to_usb_mount | trim(''/'') | replace(''/'',
''-'') }}.mount'
application_id: data-to-usb

View File

@@ -1 +1,2 @@
backup_directory_validator_folder: "{{path_administrator_scripts}}directory-validator/"
backup_directory_validator_folder: '{{path_administrator_scripts}}directory-validator/'
application_id: directory-validator

View File

@@ -1 +1,2 @@
bkp_docker_to_local_pkg: backup-docker-to-local
application_id: docker-to-local

View File

@@ -1,2 +1,3 @@
authorized_keys_path: "{{ inventory_dir }}/files/{{ inventory_hostname }}/home/backup/.ssh/authorized_keys"
authorized_keys_list: "{{ lookup('file', authorized_keys_path).splitlines() }}"
authorized_keys_path: '{{ inventory_dir }}/files/{{ inventory_hostname }}/home/backup/.ssh/authorized_keys'
authorized_keys_list: '{{ lookup(''file'', authorized_keys_path).splitlines() }}'
application_id: provider-user

View File

@@ -0,0 +1 @@
application_id: provider

View File

@@ -1 +1,2 @@
docker_backup_remote_to_local_folder: "{{path_administrator_scripts}}bkp-remote-to-local/"
docker_backup_remote_to_local_folder: '{{path_administrator_scripts}}bkp-remote-to-local/'
application_id: remote-to-local

View File

@@ -92,7 +92,7 @@ roles:
title: "Backup & Restore"
description: "Backup strategies & restore procedures"
icon: "fas fa-hdd"
invokable: false
invokable: true
update:
title: "Updates & Package Management"
description: "OS & package updates"
@@ -103,3 +103,8 @@ roles:
description: "User accounts & access control"
icon: "fas fa-users"
invokable: false
cln:
title: "Cleanup"
description: "Roles for cleaning up various system resources—old backups, unused certificates, temporary files, Docker volumes, disk caches, deprecated domains, and more."
icon: "fas fa-trash-alt"
invokable: true

View File

@@ -1 +1,2 @@
cleanup_backups_directory: "{{path_administrator_scripts}}cln-backups/"
cleanup_backups_directory: '{{path_administrator_scripts}}cln-backups/'
application_id: backups-service

View File

@@ -0,0 +1 @@
application_id: backups-timer

View File

@@ -0,0 +1 @@
application_id: certs

View File

@@ -1 +1,2 @@
cleanup_disc_space_folder: "{{path_administrator_scripts}}cln-disc-space/"
cleanup_disc_space_folder: '{{path_administrator_scripts}}cln-disc-space/'
application_id: disc-space

View File

@@ -0,0 +1 @@
application_id: docker-anonymous-volumes

View File

@@ -0,0 +1,5 @@
- name: "reload cln-docker-storage-optimizer.cymais.service"
systemd:
name: cln-docker-storage-optimizer.cymais.service
state: reloaded
daemon_reload: yes

View File

@@ -0,0 +1,22 @@
- name: "create {{storage_optimizer_directory}}"
file:
path: "{{storage_optimizer_directory}}"
state: directory
mode: 0755
- name: create cln-docker-storage-optimizer.cymais.service
template:
src: cln-docker-storage-optimizer.service.j2
dest: /etc/systemd/system/cln-docker-storage-optimizer.cymais.service
notify: reload cln-docker-storage-optimizer.cymais.service
- name: create cln-docker-storage-optimizer.py
copy:
src: cln-docker-storage-optimizer.py
dest: "{{storage_optimizer_script}}"
mode: 0755
- name: "optimize storage performance"
systemd:
name: cln-docker-storage-optimizer.cymais.service
state: started

View File

@@ -4,5 +4,5 @@ OnFailure=alert-compose.cymais@%n.service
[Service]
Type=oneshot
ExecStartPre=/bin/sh -c '/usr/bin/python {{ path_system_lock_script }} {{ system_maintenance_services | join(' ') }} --ignore maint-docker-storage-optimizer bkp-remote-to-local --timeout "{{system_maintenance_lock_timeout_storage_optimizer}}"'
ExecStartPre=/bin/sh -c '/usr/bin/python {{ path_system_lock_script }} {{ system_maintenance_services | join(' ') }} --ignore cln-docker-storage-optimizer bkp-remote-to-local --timeout "{{system_maintenance_lock_timeout_storage_optimizer}}"'
ExecStart=/bin/sh -c '/usr/bin/python {{storage_optimizer_script}} --rapid-storage-path {{path_rapid_storage}} --mass-storage-path {{path_mass_storage}}'

View File

@@ -0,0 +1,3 @@
storage_optimizer_directory: '{{path_administrator_scripts}}cln-docker-storage-optimizer/'
storage_optimizer_script: '{{storage_optimizer_directory}}cln-docker-storage-optimizer.py'
application_id: docker-storage-optimizer

View File

@@ -0,0 +1 @@
application_id: domains

View File

@@ -1 +1,2 @@
cln_failed_docker_backups_pkg: cleanup-failed-docker-backups
application_id: failed-docker-backups

View File

@@ -1 +1 @@
application_id: docker
application_id: desk-docker

View File

@@ -17,4 +17,4 @@ galaxy_info:
- git
- configuration
- pacman
- personal-computer
- desktop

View File

@@ -16,7 +16,7 @@
group: administrator
when: run_once_docker is not defined
- name: Set docker_enabled to true, to activate maint-docker-storage-optimizer
- name: Set docker_enabled to true, to activate cln-docker-storage-optimizer
set_fact:
docker_enabled: true
when: run_once_docker is not defined

View File

@@ -1 +1,2 @@
system_btrfs_auto_balancer_folder: "{{path_administrator_scripts}}auto-btrfs-balancer/"
system_btrfs_auto_balancer_folder: '{{path_administrator_scripts}}auto-btrfs-balancer/'
application_id: btrfs-auto-balancer

View File

@@ -1 +1,2 @@
heal_docker: "{{path_administrator_scripts}}maint-docker-heal/"
heal_docker: '{{path_administrator_scripts}}maint-docker-heal/'
application_id: docker-heal

View File

@@ -1,2 +1,3 @@
restart_docker_folder: "{{path_administrator_scripts}}maint-docker-restart/"
restart_docker_script: "{{restart_docker_folder}}maint-docker-restart.py"
restart_docker_folder: '{{path_administrator_scripts}}maint-docker-restart/'
restart_docker_script: '{{restart_docker_folder}}maint-docker-restart.py'
application_id: docker-restart

View File

@@ -1,5 +0,0 @@
- name: "reload maint-docker-storage-optimizer.cymais.service"
systemd:
name: maint-docker-storage-optimizer.cymais.service
state: reloaded
daemon_reload: yes

View File

@@ -1,22 +0,0 @@
- name: "create {{storage_optimizer_directory}}"
file:
path: "{{storage_optimizer_directory}}"
state: directory
mode: 0755
- name: create maint-docker-storage-optimizer.cymais.service
template:
src: maint-docker-storage-optimizer.service.j2
dest: /etc/systemd/system/maint-docker-storage-optimizer.cymais.service
notify: reload maint-docker-storage-optimizer.cymais.service
- name: create maint-docker-storage-optimizer.py
copy:
src: maint-docker-storage-optimizer.py
dest: "{{storage_optimizer_script}}"
mode: 0755
- name: "optimize storage performance"
systemd:
name: maint-docker-storage-optimizer.cymais.service
state: started

View File

@@ -1,2 +0,0 @@
storage_optimizer_directory: "{{path_administrator_scripts}}maint-docker-storage-optimizer/"
storage_optimizer_script: "{{storage_optimizer_directory}}maint-docker-storage-optimizer.py"

View File

@@ -0,0 +1 @@
application_id: lock

View File

@@ -0,0 +1 @@
application_id: swapfile

View File

@@ -0,0 +1 @@
application_id: dns-records

View File

@@ -1,8 +1,5 @@
caa_entries:
- tag: issue
value: "letsencrypt.org"
# - tag: issuewild
# value: "letsencrypt.org"
# - tag: iodef
# value: "mailto:{{ users.administrator.email }}"
base_sld_domains: "{{ current_play_domains_all | generate_base_sld_domains }}"
- tag: issue
value: letsencrypt.org
base_sld_domains: '{{ current_play_domains_all | generate_base_sld_domains }}'
application_id: letsencrypt

View File

@@ -0,0 +1 @@
application_id: wireguard-core

View File

@@ -0,0 +1 @@
application_id: wireguard-firewalled

View File

@@ -0,0 +1 @@
application_id: wireguard-plain

View File

@@ -1 +1 @@
application_id: "mariadb"
application_id: rdbms-mariadb

View File

@@ -1 +1 @@
application_id: postgres
application_id: rdbms-postgres

View File

@@ -0,0 +1 @@
application_id: apt

View File

@@ -1 +1,2 @@
update_docker_script: "{{path_administrator_scripts}}update-docker.py"
update_docker_script: '{{path_administrator_scripts}}update-docker.py'
application_id: docker

View File

@@ -0,0 +1 @@
application_id: pacman

View File

@@ -0,0 +1 @@
application_id: pip

View File

@@ -0,0 +1 @@
application_id: pkgmgr

View File

@@ -0,0 +1 @@
application_id: yay

View File

@@ -1 +1 @@
application_id: desk-browser
application_id: browser

View File

@@ -1 +1 @@
application_id: desk-design
application_id: design

View File

@@ -1 +1 @@
application_id: desk-dev-arduino
application_id: dev-arduino

View File

@@ -1 +1 @@
application_id: desk-dev-core
application_id: dev-core

View File

@@ -1 +1 @@
application_id: desk-dev-java
application_id: dev-java

View File

@@ -1 +1 @@
application_id: desk-dev-php
application_id: dev-php

View File

@@ -1 +1 @@
application_id: desk-dev-python
application_id: dev-python

View File

@@ -1 +1 @@
application_id: desk-dev-shell
application_id: dev-shell

View File

@@ -1 +1 @@
application_id: desk-game-compose
application_id: game-compose

View File

@@ -5,4 +5,4 @@ gamer_default_games:
- gnuchess
- sauerbraten
- mari0
application_id: desk-game-os
application_id: game-os

View File

@@ -1 +1 @@
application_id: desk-game-windows
application_id: game-windows

View File

@@ -1 +1 @@
application_id: desk-office-tools
application_id: office-tools

View File

@@ -1 +1 @@
application_id: srv-corporate-identity
application_id: corporate-identity

View File

@@ -24,5 +24,3 @@ galaxy_info:
documentation: "https://github.com/kevinveenbirkenbach/portfolio#readme"
logo:
class: "fa-solid fa-briefcase"
run_after:
- web-svc-simpleicons

View File

@@ -26,8 +26,9 @@ galaxy_info:
repository: "https://s.veen.world/cymais"
issue_tracker_url: "https://s.veen.world/cymaisissues"
documentation: "https://s.veen.world/cymais"
run_after:
- web-app-matomo
# This propably leads to problems at a point, @todo solve it
# run_after:
# - web-app-matomo
dependencies:
- srv-web-7-6-https
- gen-git

View File

@@ -1,83 +0,0 @@
---
## pc applications
- name: general host setup
when: ("personal_computers" in group_names)
include_role:
name: "{{ item }}"
loop:
- util-gen-admin
- drv-non-free
- name: util-desk-office-tools
when: ("collection_officetools" in group_names)
include_role:
name: "{{ item }}"
loop:
- util-desk-office-tools
- desk-jrnl
- name: personal computer for business
when: ("business_personal_computer" in group_names)
include_role:
name: desk-gnucash
- name: util-desk-design
when: ("collection_designer" in group_names)
include_role:
name: util-desk-design
- name: desk-qbittorrent
when: ("collection_torrent" in group_names)
include_role:
name: desk-qbittorrent
- name: desk-obs
when: ("collection_streamer" in group_names)
include_role:
name: desk-obs
- name: desk-bluray-player
when: ("collection_bluray_player" in group_names)
include_role:
name: desk-bluray-player
- name: GNOME setup
when: ("gnome" in group_names)
include_role:
name: desk-gnome
- name: setup ssh client
when: ("ssh-client" in group_names)
include_role:
name: desk-ssh
- name: setup gaming hosts
when: ("gaming" in group_names)
include_role:
name: util-desk-game-compose
- name: setup entertainment hosts
when: ("entertainment" in group_names)
include_role:
name: desk-spotify
- name: setup torbrowser hosts
when: ("torbrowser" in group_names)
include_role:
name: desk-torbrowser
- name: setup nextcloud-client
when: ("nextcloud_client" in group_names)
include_role:
name: desk-nextcloud-client
- name: setup docker
when: ("docker_client" in group_names)
include_role:
name: desk-docker
# driver
- name: setup msi rgb keyboard
when: ("msi_perkeyrgb" in group_names)
include_role:
name: drv-msi-keyboard-color

View File

@@ -1,15 +0,0 @@
- name: optimize storage performance
include_role:
name: maint-docker-storage-optimizer
when: ('storage-optimizer' | application_allowed(group_names, allowed_applications))
- name: Cleanup Docker Anonymous Volumes
import_role:
name: cln-docker-anonymous-volumes
when: mode_cleanup | bool
- name: Show all facts
debug:
var: ansible_facts
when: enable_debug | bool

View File

@@ -72,7 +72,7 @@
recursive=True
)) |
generate_all_domains(
('www_redirect' in group_names)
('redir-www' in group_names)
)
}}
@@ -101,52 +101,18 @@
name: update
when: mode_update | bool
- name: setup standard wireguard
when: ('wireguard_server' | application_allowed(group_names, allowed_applications))
include_role:
name: net-wireguard-core
# vpn setup
- name: setup wireguard client behind firewall\nat
when: ('wireguard_behind_firewall' | application_allowed(group_names, allowed_applications))
include_role:
name: net-wireguard-firewalled
- name: setup wireguard client
when: ('wireguard_client' | application_allowed(group_names, allowed_applications))
include_role:
name: net-wireguard-plain
## backup setup
- name: setup replica backup hosts
when: ('backup_remote_to_local' | application_allowed(group_names, allowed_applications))
include_role:
name: bkp-remote-to-local
- name: setup backup to swappable
when: ('backup_to_usb' | application_allowed(group_names, allowed_applications))
include_role:
name: bkp-data-to-usb
## driver setup
- name: drv-intel
when: ('intel' | application_allowed(group_names, allowed_applications))
include_role:
name: drv-intel
- name: setup multiprinter hosts
when: ('epson_multiprinter' | application_allowed(group_names, allowed_applications))
include_role:
name: drv-epson-multiprinter
- name: setup hibernate lid switch
when: ('drv-lid-switch' | application_allowed(group_names, allowed_applications))
include_role:
name: drv-lid-switch
## system setup
- name: setup swapfile hosts
when: ('swapfile' | application_allowed(group_names, allowed_applications))
include_role:
name: maint-swapfile
- name: "Load base roles"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- core
- drv
- gen
- net
- alert
- mon
- maint
- update
- bkp
- cln
loop_control:
label: "{{ item }}-roles.yml"

View File

@@ -0,0 +1,15 @@
---
- name: "setup docker role includes for desktop pc"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- util-srv # Services need to run before applications
- util-desk
loop_control:
label: "{{ item }}-roles.yml"
- name: general host setup
include_role:
name: "{{ item }}"
loop:
- util-gen-admin
- drv-non-free

View File

@@ -1,6 +1,5 @@
---
- name: servers host setup
when: ("servers" in group_names)
- name: Setup server base
include_role:
name: "{{ item }}"
loop:
@@ -11,15 +10,10 @@
- mon-bot-btrfs
- maint-btrfs-auto-balancer
- name: "Integrate Docker Role includes"
- name: "Include server roles"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- svc
- web
- web-svc # Services need to run before applications
- web-app
loop_control:
label: "{{ item }}-roles.yml"
- name: "setup corporate identity"
include_role:
name: util-srv-corporate-identity
when: ('corporate_identity' | application_allowed(group_names, allowed_applications))

View File

@@ -0,0 +1,6 @@
- name: "Load destruction roles"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- cln
loop_control:
label: "{{ item }}-roles.yml"

View File

@@ -39,7 +39,7 @@ class TestApplicationIdConsistency(unittest.TestCase):
continue
actual_id = vars_data.get("application_id")
if actual_id != expected_id:
if actual_id not in [expected_id, role_name]:
failed_roles.append((
role_name,
f"application_id is '{actual_id}', expected '{expected_id}'"

View File

@@ -2,7 +2,7 @@ import os
import unittest
# import the functions from your CLI script
from cli.generate_playbook import build_dependency_graph, find_cycle
from cli.generate.conditional_role_include import build_dependency_graph, find_cycle
class TestCircularDependencies(unittest.TestCase):
"""

View File

@@ -0,0 +1,51 @@
import unittest
import os
import sys
import subprocess
class CLIHelpIntegrationTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Projekt-Root ermitteln
cls.project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..')
)
cls.main_py = os.path.join(cls.project_root, 'main.py')
cls.cli_dir = os.path.join(cls.project_root, 'cli')
cls.python = sys.executable
def test_all_cli_commands_help(self):
"""
Iteriere über alle .py Dateien in cli/, baue daraus die
Subcommand-Pfade und prüfe, dass `python main.py <cmd> --help`
mit Exit-Code 0 endet.
"""
for root, _, files in os.walk(self.cli_dir):
for fname in files:
if not fname.endswith('.py') or fname.startswith('__'):
continue
# Bestimme Subcommand-Segmente
rel_dir = os.path.relpath(root, self.cli_dir)
cmd_name = os.path.splitext(fname)[0]
if rel_dir == '.':
segments = [cmd_name]
else:
segments = rel_dir.split(os.sep) + [cmd_name]
with self.subTest(command=' '.join(segments)):
cmd = [self.python, self.main_py] + segments + ['--help', '--no-signal']
result = subprocess.run(
cmd, capture_output=True, text=True
)
self.assertEqual(
result.returncode, 0,
msg=(
f"Command `{ ' '.join(cmd) }` failed\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,47 @@
import os
import sys
import re
import unittest
from cli.meta.applications import find_application_ids
# ensure project root is on PYTHONPATH so we can import your CLI code
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
sys.path.insert(0, ROOT)
class TestGroupApplications(unittest.TestCase):
# regex to capture any literal check in group_names: 'name' in/not in group_names
GROUP_CHECK_RE = re.compile(r"['\"](?P<name>[^'\"]+)['\"]\s*(?:in|not in)\s*group_names")
def test_group_name_checks_use_valid_application_ids(self):
"""
Ensures that any string checked against group_names corresponds to a valid application ID.
"""
valid_apps = find_application_ids()
# walk the entire project tree
for dirpath, _, filenames in os.walk(ROOT):
for filename in filenames:
if not filename.lower().endswith(('.yml', '.yaml')):
continue
filepath = os.path.join(dirpath, filename)
try:
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
except Exception:
continue
# find all group_names checks in the file
for match in self.GROUP_CHECK_RE.finditer(text):
name = match.group('name')
# the checked name must be one of the valid application IDs
self.assertIn(
name,
valid_apps,
msg=(
f"{filepath}: group_names check uses '{name}', "
f"which is not a known application ID {valid_apps}"
)
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,67 @@
import unittest
from pathlib import Path
import re
import os
import sys
# Ensure your project root is on PYTHONPATH so filter_plugins can be imported
ROOT = Path(__file__).parents[2]
sys.path.insert(0, str(ROOT))
from filter_plugins.invokable_paths import get_invokable_paths
STAGES_DIR = ROOT / "tasks" / "stages"
GROUPS_DIR = ROOT / "tasks" / "groups"
class TestMetaRolesIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Use the filter directly
cls.role_files = get_invokable_paths(suffix="-roles.yml")
cls.invokable_items = get_invokable_paths()
# Read all playbook YAML contents under tasks/stages
cls.playbook_contents = {}
for path in STAGES_DIR.rglob("*.yml"):
cls.playbook_contents[path] = path.read_text(encoding="utf-8")
# Regex for include_tasks line with {{ item }}-roles.yml
cls.include_pattern = re.compile(
r'include_tasks:\s*["\']\./tasks/groups/\{\{\s*item\s*\}\}-roles\.yml["\']'
)
def test_all_role_files_exist(self):
"""Each '-roles.yml' path returned by the filter must exist in the project root."""
missing = []
for fname in self.role_files:
path = GROUPS_DIR / fname
if not path.is_file():
missing.append(fname)
self.assertFalse(
missing,
f"The following role files are missing at project root: {missing}"
)
def test_each_invokable_item_referenced_in_playbooks(self):
"""
Each invokable item (without suffix) must be looped through in at least one playbook
and include its corresponding include_tasks entry.
"""
not_referenced = []
for item in self.invokable_items:
found = False
loop_entry = re.compile(rf"-\s*{re.escape(item)}\b")
for content in self.playbook_contents.values():
if self.include_pattern.search(content) and loop_entry.search(content):
found = True
break
if not found:
not_referenced.append(item)
self.assertEqual(
not_referenced, [],
f"The following invokable items are not referenced in any playbook: {not_referenced}"
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,39 @@
import os
import glob
import yaml
import unittest
def find_application_ids():
"""
Scans all roles/*/vars/main.yml files and collects application_id values.
Returns a dict mapping application_id to list of file paths where it appears.
"""
ids = {}
# Wenn der Test unter tests/integration liegt, gehen wir zwei Ebenen hoch zum Projekt-Root
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
pattern = os.path.join(base_dir, "roles", "*", "vars", "main.yml")
for file_path in glob.glob(pattern):
with open(file_path, 'r') as f:
data = yaml.safe_load(f) or {}
app_id = data.get('application_id')
if app_id is not None:
ids.setdefault(app_id, []).append(file_path)
return ids
class TestUniqueApplicationId(unittest.TestCase):
def test_application_ids_are_unique(self):
ids = find_application_ids()
duplicates = {app_id: paths for app_id, paths in ids.items() if len(paths) > 1}
if duplicates:
messages = []
for app_id, paths in duplicates.items():
file_list = '\n '.join(paths)
messages.append(f"application_id '{app_id}' found in multiple files:\n {file_list}")
self.fail("\n\n".join(messages))
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

Some files were not shown because too many files have changed in this diff Show More