Another bulk of refaktoring cleanup

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-11 18:57:40 +02:00
parent 168c5c0da6
commit 33276263b0
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
12 changed files with 210 additions and 56 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
CLI for extracting invokable role paths from a nested roles YAML file using argparse.
CLI for extracting invokable or non-invokable role paths from a nested roles YAML file using argparse.
Assumes a default roles file at the project root if none is provided.
"""
@ -18,12 +18,12 @@ sys.path.insert(0, project_root)
import argparse
import yaml
from filter_plugins.invokable_paths import get_invokable_paths
from filter_plugins.invokable_paths import get_invokable_paths, get_non_invokable_paths
def main():
parser = argparse.ArgumentParser(
description="Extract invokable role paths from a nested roles YAML file."
description="Extract invokable or non-invokable role paths from a nested roles YAML file."
)
parser.add_argument(
"roles_file",
@ -33,13 +33,33 @@ def main():
)
parser.add_argument(
"--suffix", "-s",
help="Optional suffix to append to each invokable path.",
help="Optional suffix to append to each path.",
default=None
)
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument(
"--non-invokable", "-n",
action='store_true',
help="List paths where 'invokable' is False or not set."
)
mode_group.add_argument(
"--invokable", "-i",
action='store_true',
help="List paths where 'invokable' is True. (default behavior)"
)
args = parser.parse_args()
# Default to invokable if neither flag is provided
list_non = args.non_invokable
list_inv = args.invokable or not (args.non_invokable or args.invokable)
try:
paths = get_invokable_paths(args.roles_file, args.suffix)
if list_non:
paths = get_non_invokable_paths(args.roles_file, args.suffix)
else:
paths = get_invokable_paths(args.roles_file, args.suffix)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)

View File

@ -2,29 +2,18 @@ import os
import yaml
from typing import Dict, List, Optional
def get_invokable_paths(
roles_file: Optional[str] = None,
suffix: Optional[str] = None
) -> List[str]:
"""
Load nested roles YAML from the given file (or default at project root) and return
dash-joined paths where 'invokable' is True. Appends suffix if provided.
:param roles_file: Optional path to YAML file. Defaults to '<project_root>/roles/categories.yml'.
:param suffix: Optional suffix to append to each invokable path.
:return: List of invokable paths.
:raises FileNotFoundError: If the YAML file cannot be found.
:raises yaml.YAMLError: If the YAML file cannot be parsed.
:raises ValueError: If the root of the YAML is not a dictionary.
Load nested roles YAML and return dash-joined paths where 'invokable' is True. Appends suffix if provided.
"""
# Determine default roles_file if not provided
if not roles_file:
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
roles_file = os.path.join(project_root, 'roles', 'categories.yml')
# Load and validate YAML
try:
with open(roles_file, 'r') as f:
data = yaml.safe_load(f) or {}
@ -36,7 +25,6 @@ def get_invokable_paths(
if not isinstance(data, dict):
raise ValueError("YAML root is not a dictionary")
# Unwrap if single 'roles' key
roles = data
if 'roles' in roles and isinstance(roles['roles'], dict) and len(roles) == 1:
roles = roles['roles']
@ -54,7 +42,6 @@ def get_invokable_paths(
p += suffix
found.append(p)
# Recurse into non-metadata child dicts
children = {
ck: cv for ck, cv in cfg.items()
if ck not in METADATA and isinstance(cv, dict)
@ -66,6 +53,61 @@ def get_invokable_paths(
return _recurse(roles)
def get_non_invokable_paths(
roles_file: Optional[str] = None,
suffix: Optional[str] = None
) -> List[str]:
"""
Load nested roles YAML and return dash-joined paths where 'invokable' is False or missing.
Appends suffix if provided.
"""
if not roles_file:
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
roles_file = os.path.join(project_root, 'roles', 'categories.yml')
try:
with open(roles_file, 'r') as f:
data = yaml.safe_load(f) or {}
except FileNotFoundError:
raise FileNotFoundError(f"Roles file not found: {roles_file}")
except yaml.YAMLError as e:
raise yaml.YAMLError(f"Error parsing YAML {roles_file}: {e}")
if not isinstance(data, dict):
raise ValueError("YAML root is not a dictionary")
roles = data
if 'roles' in roles and isinstance(roles['roles'], dict) and len(roles) == 1:
roles = roles['roles']
def _recurse_non(subroles: Dict[str, dict], parent: List[str] = None) -> List[str]:
parent = parent or []
found: List[str] = []
METADATA = {'title', 'description', 'icon', 'invokable'}
for key, cfg in subroles.items():
path = parent + [key]
p = '-'.join(path)
inv = cfg.get('invokable', False)
if not inv:
entry = p + (suffix or "")
found.append(entry)
children = {
ck: cv for ck, cv in cfg.items()
if ck not in METADATA and isinstance(cv, dict)
}
if children:
found.extend(_recurse_non(children, path))
return found
return _recurse_non(roles)
class FilterModule:
def filters(self):
return {'invokable_paths': get_invokable_paths}
return {
'invokable_paths': get_invokable_paths,
'non_invokable_paths': get_non_invokable_paths
}

View File

@ -1,6 +1,7 @@
roles:
sys:
title: "System"
invokable: false
alm:
title: "Alerting"
description: "Notification handlers for system events"
@ -31,11 +32,11 @@ roles:
description: "Roles for installing and configuring hardware drivers—covering printers, graphics, input devices, and other peripheral support."
icon: "fas fa-microchip"
invokable: true
core:
title: "Core & System"
description: "Fundamental system configuration"
icon: "fas fa-cogs"
invokable: true
# core:
# title: "Core & System"
# description: "Fundamental system configuration"
# icon: "fas fa-cogs"
# invokable: true
gen:
title: "Generic"
description: "Helper roles & installers (git, locales, timer, etc.)"

View File

@ -27,7 +27,7 @@ galaxy_info:
issue_tracker_url: "https://s.veen.world/cymaisissues"
documentation: "https://s.veen.world/cymais"
dependencies:
- gen-certbot
- srv-web-7-7-certbot
- srv-web-7-4-core
- sys-alm-compose
- sys-cln-certs

View File

@ -104,12 +104,8 @@
- name: "Load base roles"
include_tasks: "./tasks/groups/{{ item }}-roles.yml"
loop:
- core
- drv
- gen
- net
- mon
- maint
- update
loop_control:
label: "{{ item }}-roles.yml"

View File

@ -0,0 +1,47 @@
import unittest
import yaml
import os
import glob
from filter_plugins.invokable_paths import get_invokable_paths, get_non_invokable_paths
class TestSysRolesApplicationId(unittest.TestCase):
"""
Integration tests for sys-* roles based on categories.yml prefixes:
For each actual sys-* directory under roles/:
- If dash-joined prefix is in invokable_paths -> vars/main.yml must exist and contain application_id.
- Otherwise (non-invokable or undeclared) -> if vars/main.yml exists, it must NOT contain application_id.
"""
@classmethod
def setUpClass(cls):
cls.base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
cat_file = os.path.join(cls.base_dir, 'roles', 'categories.yml')
cls.invokable_prefixes = set(get_invokable_paths(cat_file))
# collect actual sys dirs
pattern = os.path.join(cls.base_dir, 'roles', 'sys-*')
cls.actual_dirs = [d for d in glob.glob(pattern) if os.path.isdir(d)]
def test_sys_roles_application_id(self):
for role_dir in self.actual_dirs:
name = os.path.basename(role_dir)
prefix = f"sys-{name[len('sys-'):] if name.startswith('sys-') else name}"
vars_file = os.path.join(role_dir, 'vars', 'main.yml')
if prefix in self.invokable_prefixes:
# must exist with id
self.assertTrue(os.path.isfile(vars_file), f"Missing vars/main.yml for invokable role {prefix}")
with open(vars_file) as f:
content = yaml.safe_load(f) or {}
self.assertIn('application_id', content,
f"Expected 'application_id' in {vars_file} for invokable role {prefix}")
else:
# if exists, must not contain id
if not os.path.isfile(vars_file):
continue
with open(vars_file) as f:
content = yaml.safe_load(f) or {}
self.assertNotIn('application_id', content,
f"Unexpected 'application_id' in {vars_file} for non-invokable role {prefix}")
if __name__ == '__main__':
unittest.main()
unittest.main()

View File

@ -1,25 +0,0 @@
# tests/integration/test_no_application_id.py
import unittest
import yaml
import glob
import os
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
test_files = glob.glob(os.path.join(BASE_DIR, "roles/sys-*/vars/main.yml"))
class TestNoApplicationId(unittest.TestCase):
"""
Ensure that no sys-* role main.yml defines an application_id variable.
"""
def test_no_application_id_defined(self):
for file_path in test_files:
with open(file_path, 'r') as f:
content = yaml.safe_load(f) or {}
self.assertNotIn(
'application_id', content,
f"Unexpected 'application_id' defined in {file_path}"
)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,73 @@
import unittest
import tempfile
import yaml
import os
from filter_plugins.invokable_paths import get_non_invokable_paths
class TestNonInvokablePaths(unittest.TestCase):
def write_yaml(self, data):
tmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yml')
yaml.dump(data, tmp)
tmp.close()
return tmp.name
def test_empty_roles(self):
path = self.write_yaml({})
# No roles, so no non-invokable paths
self.assertEqual(get_non_invokable_paths(path), [])
os.unlink(path)
def test_single_non_invokable_false_and_missing(self):
data_false = {'role1': {'invokable': False}}
path_false = self.write_yaml(data_false)
self.assertEqual(get_non_invokable_paths(path_false), ['role1'])
os.unlink(path_false)
data_missing = {'role1': {}}
path_missing = self.write_yaml(data_missing)
self.assertEqual(get_non_invokable_paths(path_missing), ['role1'])
os.unlink(path_missing)
def test_single_invokable_true_excluded(self):
data = {'role1': {'invokable': True}}
path = self.write_yaml(data)
# invokable True should not appear in non-invokable list
self.assertEqual(get_non_invokable_paths(path), [])
os.unlink(path)
def test_nested_and_deeply_nested(self):
data = {
'parent': {
'invokable': True,
'child': {'invokable': False},
'other': {'invokable': True},
'sub': {
'deep': {}
}
}
}
path = self.write_yaml(data)
# 'parent-child' (explicit False), 'parent-sub' (missing invokable), and 'parent-sub-deep' (missing) are non-invokable
expected = ['parent-child', 'parent-sub', 'parent-sub-deep']
self.assertEqual(sorted(get_non_invokable_paths(path)), sorted(expected))
os.unlink(path)
def test_unwrap_roles_key(self):
data = {'roles': {
'role1': {'invokable': False},
'role2': {'invokable': True}
}}
path = self.write_yaml(data)
# Only role1 is non-invokable
self.assertEqual(get_non_invokable_paths(path), ['role1'])
os.unlink(path)
def test_suffix_appended(self):
data = {'role1': {'invokable': False}}
path = self.write_yaml(data)
self.assertEqual(get_non_invokable_paths(path, suffix='_suf'), ['role1_suf'])
os.unlink(path)
if __name__ == '__main__':
unittest.main()