Added run_after integration test

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-11 10:15:58 +02:00
parent 23bbe0520c
commit 6780950257
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
7 changed files with 294 additions and 3 deletions

3
.gitignore vendored
View File

@ -3,4 +3,5 @@ site.retry
venv
*.log
*.bak
*tree.json
*tree.json
roles/list.json

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Generate a JSON file listing all Ansible role directories.
Usage:
python roles_list.py [--roles-dir path/to/roles] [--output path/to/roles/list.json | console]
"""
import os
import json
import argparse
def find_roles(roles_dir: str):
"""Return sorted list of role names under roles_dir."""
return sorted([
entry for entry in os.listdir(roles_dir)
if os.path.isdir(os.path.join(roles_dir, entry))
])
def write_roles_list(roles, out_file):
"""Write the list of roles to out_file as JSON."""
os.makedirs(os.path.dirname(out_file), exist_ok=True)
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(roles, f, indent=2)
print(f"Wrote roles list to {out_file}")
def main():
# Determine default roles_dir relative to this script: ../../.. -> roles
script_dir = os.path.dirname(os.path.abspath(__file__))
default_roles_dir = os.path.abspath(
os.path.join(script_dir, '..', '..', 'roles')
)
default_output = os.path.join(default_roles_dir, 'list.json')
parser = argparse.ArgumentParser(description='Generate roles/list.json')
parser.add_argument(
'--roles-dir', '-r',
default=default_roles_dir,
help=f'Directory containing role subfolders (default: {default_roles_dir})'
)
parser.add_argument(
'--output', '-o',
default=default_output,
help=(
'Output path for roles list JSON '
'(or "console" to print to stdout, default: %(default)s)'
)
)
args = parser.parse_args()
if not os.path.isdir(args.roles_dir):
parser.error(f"Roles directory not found: {args.roles_dir}")
roles = find_roles(args.roles_dir)
if args.output.lower() == 'console':
# Print JSON to stdout
print(json.dumps(roles, indent=2))
else:
write_roles_list(roles, args.output)
if __name__ == '__main__':
main()

76
cli/meta/role_folder.py Normal file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""
CLI Script: get_role_folder_cli.py
This script determines the appropriate Ansible role folder based on the provided application_id
by inspecting each role's vars/main.yml within the roles directory. By default, it assumes the
roles directory is located at the project root, relative to this script's location.
Example:
./get_role_folder_cli.py --application-id my-app-id
"""
import os
import sys
import argparse
import yaml
def get_role_folder(application_id, roles_path):
"""
Find the role directory under `roles_path` whose vars/main.yml contains the specified application_id.
:param application_id: The application_id to match.
:param roles_path: Path to the roles directory.
:return: The name of the matching role directory.
:raises RuntimeError: If no match is found or if an error occurs while reading files.
"""
if not os.path.isdir(roles_path):
raise RuntimeError(f"Roles path not found: {roles_path}")
for role in sorted(os.listdir(roles_path)):
role_dir = os.path.join(roles_path, role)
vars_file = os.path.join(role_dir, 'vars', 'main.yml')
if os.path.isfile(vars_file):
try:
with open(vars_file, 'r') as f:
data = yaml.safe_load(f) or {}
except Exception as e:
raise RuntimeError(f"Failed to load {vars_file}: {e}")
if data.get('application_id') == application_id:
return role
raise RuntimeError(f"No role found with application_id '{application_id}' in {roles_path}")
def main():
parser = argparse.ArgumentParser(
description='Determine the Ansible role folder by application_id'
)
parser.add_argument(
'application_id',
help='The application_id defined in vars/main.yml to search for'
)
parser.add_argument(
'-r', '--roles-path',
default=os.path.join(
os.path.dirname(os.path.realpath(__file__)),
os.pardir, os.pardir,
'roles'
),
help='Path to the roles directory (default: roles/ at project root)'
)
args = parser.parse_args()
try:
folder = get_role_folder(args.application_id, args.roles_path)
print(folder)
sys.exit(0)
except RuntimeError as err:
print(f"Error: {err}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -5,14 +5,12 @@ from ansible.errors import AnsibleFilterError
class FilterModule(object):
def filters(self):
# module_util-Verzeichnis ermitteln und zum Import-Pfad hinzufügen
plugin_dir = os.path.dirname(__file__)
project_root = os.path.dirname(plugin_dir)
module_utils = os.path.join(project_root, 'module_utils')
if module_utils not in sys.path:
sys.path.append(module_utils)
# jetzt kannst Du domain_utils importieren
try:
from domain_utils import get_domain
except ImportError as e:

View File

@ -0,0 +1,48 @@
'''
Ansible filter plugin: get_role_folder
This filter inspects each role under the given roles directory, loads its vars/main.yml,
and returns the role folder name whose application_id matches the provided value.
'''
from ansible.errors import AnsibleFilterError
import os
import yaml
def get_role_folder(application_id, roles_path='roles'):
"""
Find the role directory under `roles_path` whose vars/main.yml contains the given application_id.
:param application_id: The application_id to match.
:param roles_path: Path to the roles directory (default: 'roles').
:return: The name of the matching role directory.
:raises AnsibleFilterError: If vars file is unreadable or no match is found.
"""
if not os.path.isdir(roles_path):
raise AnsibleFilterError(f"Roles path not found: {roles_path}")
for role in os.listdir(roles_path):
role_dir = os.path.join(roles_path, role)
vars_file = os.path.join(role_dir, 'vars', 'main.yml')
if os.path.isfile(vars_file):
try:
with open(vars_file, 'r') as f:
data = yaml.safe_load(f) or {}
except Exception as e:
raise AnsibleFilterError(f"Failed to load {vars_file}: {e}")
if data.get('application_id') == application_id:
return role
raise AnsibleFilterError(f"No role found with application_id '{application_id}' in {roles_path}")
class FilterModule(object):
"""
Register the get_role_folder filter
"""
def filters(self):
return {
'get_role_folder': get_role_folder,
}

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
import os
import unittest
import yaml
from pathlib import Path
class TestRunAfterRoles(unittest.TestCase):
def setUp(self):
self.roles_dir = Path(__file__).resolve().parent.parent.parent / "roles"
self.valid_role_names = {p.name for p in self.roles_dir.iterdir() if p.is_dir()}
def test_run_after_roles_are_valid(self):
invalid_refs = []
for role in self.valid_role_names:
meta_path = self.roles_dir / role / "meta" / "main.yml"
if not meta_path.exists():
continue
try:
with open(meta_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
except Exception as e:
self.fail(f"Failed to parse {meta_path}: {e}")
continue
run_after = data.get("galaxy_info", {}).get("run_after", [])
for ref in run_after:
if ref not in self.valid_role_names:
invalid_refs.append((role, ref))
if invalid_refs:
msg = "\n".join(f"{role}: invalid run_after → {ref}" for role, ref in invalid_refs)
self.fail(f"Found invalid run_after references:\n{msg}")
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,66 @@
import os
import shutil
import tempfile
import unittest
import yaml
from ansible.errors import AnsibleFilterError
from filter_plugins.get_role_folder import get_role_folder
class TestGetRoleFolder(unittest.TestCase):
def setUp(self):
# Create a temporary directory to simulate roles structure
self.tempdir = tempfile.mkdtemp()
self.roles_dir = os.path.join(self.tempdir, 'roles')
os.makedirs(self.roles_dir)
# Role1: matching application_id
role1_path = os.path.join(self.roles_dir, 'role1', 'vars')
os.makedirs(role1_path)
with open(os.path.join(role1_path, 'main.yml'), 'w') as f:
yaml.dump({'application_id': 'app-123'}, f)
# Role2: non-matching application_id
role2_path = os.path.join(self.roles_dir, 'role2', 'vars')
os.makedirs(role2_path)
with open(os.path.join(role2_path, 'main.yml'), 'w') as f:
yaml.dump({'application_id': 'app-456'}, f)
# Role3: missing vars directory
os.makedirs(os.path.join(self.roles_dir, 'role3'))
def tearDown(self):
# Clean up temporary directory
shutil.rmtree(self.tempdir)
def test_find_existing_role(self):
# Should find role1 for application_id 'app-123'
result = get_role_folder('app-123', roles_path=self.roles_dir)
self.assertEqual(result, 'role1')
def test_no_match_raises(self):
# No role has application_id 'nonexistent'
with self.assertRaises(AnsibleFilterError) as cm:
get_role_folder('nonexistent', roles_path=self.roles_dir)
self.assertIn("No role found with application_id 'nonexistent'", str(cm.exception))
def test_missing_roles_path(self):
# Path does not exist
invalid_path = os.path.join(self.tempdir, 'invalid')
with self.assertRaises(AnsibleFilterError) as cm:
get_role_folder('any', roles_path=invalid_path)
self.assertIn(f"Roles path not found: {invalid_path}", str(cm.exception))
def test_invalid_yaml_raises(self):
# Create a role with invalid YAML
bad_role_path = os.path.join(self.roles_dir, 'badrole', 'vars')
os.makedirs(bad_role_path)
with open(os.path.join(bad_role_path, 'main.yml'), 'w') as f:
f.write("::: invalid yaml :::")
with self.assertRaises(AnsibleFilterError) as cm:
get_role_folder('app-123', roles_path=self.roles_dir)
self.assertIn('Failed to load', str(cm.exception))
if __name__ == '__main__':
unittest.main()