Added application ids filter for easier partial deployment

This commit is contained in:
Kevin Veen-Birkenbach 2025-07-04 21:52:37 +02:00
parent 9f1d153053
commit 52f467c15c
No known key found for this signature in database
GPG Key ID: 44D8F11FD62F878E
5 changed files with 114 additions and 3 deletions

View File

@ -6,7 +6,8 @@ import os
import datetime
import sys
def run_ansible_playbook(inventory, playbook, modes, limit=None, password_file=None, verbose=0, skip_tests=False):
def run_ansible_playbook(inventory, playbook, modes, limit=None, allowed_applications=None, password_file=None, verbose=0, skip_tests=False):
start_time = datetime.datetime.now()
print(f"\n▶️ Script started at: {start_time.isoformat()}\n")
@ -22,6 +23,15 @@ def run_ansible_playbook(inventory, playbook, modes, limit=None, password_file=N
if limit:
cmd.extend(["--limit", limit])
# Pass application IDs parameter as extra var if provided
if allowed_applications:
joined = ",".join(allowed_applications)
cmd.extend(["-e", f"allowed_applications={joined}"])
else:
# No IDs provided: execute all applications defined in the inventory
cmd.extend(["-e", "allowed_applications=all"])
# Pass other mode flags
for key, value in modes.items():
val = str(value).lower() if isinstance(value, bool) else str(value)
cmd.extend(["-e", f"{key}={val}"])
@ -43,6 +53,7 @@ def run_ansible_playbook(inventory, playbook, modes, limit=None, password_file=N
duration = end_time - start_time
print(f"⏱️ Total execution time: {duration}\n")
def main():
script_dir = os.path.dirname(os.path.realpath(__file__))
parser = argparse.ArgumentParser(
@ -99,6 +110,12 @@ def main():
"--skip-validation", action="store_true",
help="Skip inventory validation before deployment."
)
parser.add_argument(
"--id",
nargs="+",
default=[],
help="List of application_id's for partial deploy. If not set, all application IDs defined in the inventory will be executed."
)
parser.add_argument(
"-v", "--verbose", action="count", default=0,
help="Increase verbosity level. Multiple -v flags increase detail (e.g., -vvv for maximum log output)."
@ -134,6 +151,7 @@ def main():
playbook=playbook_file,
modes=modes,
limit=args.limit,
allowed_applications=args.allowed_applications,
password_file=args.password_file,
verbose=args.verbose,
skip_tests=args.skip_tests

View File

@ -115,7 +115,7 @@ def generate_playbook_entries(roles_dir, prefix=None):
role = roles[role_name]
entries.append(
f"- name: setup {role['application_id']}\n"
f" when: ('{role['application_id']}' in group_names)\n"
f" when: {role['application_id']} | application_allowed(group_names, allowed_applications)\n"
f" include_role:\n"
f" name: {role['role_name']}\n"
)

View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
# Provides a filter to control which applications (roles) should be deployed
from ansible.errors import AnsibleFilterError
def application_allowed(application_id: str, group_names: list, allowed_applications: list = []):
"""
Return True if:
- application_id exists in group_names, AND
- either allowed_applications is not provided (or empty), OR application_id is in allowed_applications.
Parameters:
application_id (str): Name of the application/role to check.
group_names (list): List of groups the current host belongs to.
allowed_applications (list, optional): List of application IDs to allow.
Returns:
bool: True if this application is allowed to deploy, False otherwise.
"""
# Ensure group_names is iterable
if not isinstance(group_names, (list, tuple)):
raise AnsibleFilterError(f"Expected group_names to be a list or tuple, got {type(group_names)}")
# Must be part of the host's groups
if application_id not in group_names:
return False
# If allowed_applications provided, only allow if ID is in that list
if allowed_applications:
if not isinstance(allowed_applications, (list, tuple)):
raise AnsibleFilterError(f"allowed_applications must be a list or tuple if provided, got {type(allowed_applications)}")
return application_id in allowed_applications
# No filter provided → allow all in group_names
return True
class FilterModule(object):
def filters(self):
return {
'application_allowed': application_allowed,
}

View File

@ -54,4 +54,7 @@ certbot_cert_path: "/etc/letsencrypt/live" # Path contain
docker_restart_policy: "unless-stopped"
# helper
_applications_nextcloud_oidc_flavor: "{{ applications.nextcloud.oidc.flavor | default('oidc_login' if applications.nextcloud.features.ldap | default(true) else 'sociallogin') }}"
_applications_nextcloud_oidc_flavor: "{{ applications.nextcloud.oidc.flavor | default('oidc_login' if applications.nextcloud.features.ldap | default(true) else 'sociallogin') }}"
# default value if not set via CLI (-e) or in playbook vars
allowed_applications: []

View File

@ -0,0 +1,46 @@
import unittest
from filter_plugins.application_allowed import application_allowed
from ansible.errors import AnsibleFilterError
class TestApplicationAllowed(unittest.TestCase):
def test_application_not_in_group(self):
# application not in group_names should always return False
self.assertFalse(application_allowed('app1', ['other_group'], None))
self.assertFalse(application_allowed('app1', ['other_group'], []))
self.assertFalse(application_allowed('app1', ['other_group'], ['app1']))
def test_no_allowed_applications_allows_group_items(self):
# allowed_applications is None or empty -> allow if in group
self.assertTrue(application_allowed('app1', ['app1', 'app2'], None))
# empty list treated as no filter -> allow all in group
self.assertTrue(application_allowed('app2', ['app1', 'app2'], []))
def test_allowed_applications_list(self):
group = ['app1', 'app2', 'app3']
allowed = ['app2', 'app3']
self.assertFalse(application_allowed('app1', group, allowed))
self.assertTrue(application_allowed('app2', group, allowed))
self.assertTrue(application_allowed('app3', group, allowed))
def test_allowed_applications_wrong_type(self):
# invalid allowed_applications type
with self.assertRaises(AnsibleFilterError):
application_allowed('app1', ['app1'], allowed_applications=123)
def test_group_names_wrong_type(self):
# invalid group_names type
with self.assertRaises(AnsibleFilterError):
application_allowed('app1', 'not_a_list', None)
def test_allowed_applications_edge_cases(self):
# whitespace-only entries do not affect result
group = ['app1']
allowed = ['app1', ' ', '']
self.assertTrue(application_allowed('app1', group, allowed))
# application in group but not listed -> false
self.assertFalse(application_allowed('app2', ['app2'], allowed))
if __name__ == '__main__':
unittest.main()