diff --git a/cli/deploy.py b/cli/deploy.py index fb045a99..a89dc860 100644 --- a/cli/deploy.py +++ b/cli/deploy.py @@ -6,7 +6,8 @@ import os import datetime import sys -def run_ansible_playbook(inventory, playbook, modes, limit=None, password_file=None, verbose=0, skip_tests=False): + +def run_ansible_playbook(inventory, playbook, modes, limit=None, allowed_applications=None, password_file=None, verbose=0, skip_tests=False): start_time = datetime.datetime.now() print(f"\n▶️ Script started at: {start_time.isoformat()}\n") @@ -22,6 +23,15 @@ def run_ansible_playbook(inventory, playbook, modes, limit=None, password_file=N if limit: cmd.extend(["--limit", limit]) + # Pass application IDs parameter as extra var if provided + if allowed_applications: + joined = ",".join(allowed_applications) + cmd.extend(["-e", f"allowed_applications={joined}"]) + else: + # No IDs provided: execute all applications defined in the inventory + cmd.extend(["-e", "allowed_applications=all"]) + + # Pass other mode flags for key, value in modes.items(): val = str(value).lower() if isinstance(value, bool) else str(value) cmd.extend(["-e", f"{key}={val}"]) @@ -43,6 +53,7 @@ def run_ansible_playbook(inventory, playbook, modes, limit=None, password_file=N duration = end_time - start_time print(f"⏱️ Total execution time: {duration}\n") + def main(): script_dir = os.path.dirname(os.path.realpath(__file__)) parser = argparse.ArgumentParser( @@ -99,6 +110,12 @@ def main(): "--skip-validation", action="store_true", help="Skip inventory validation before deployment." ) + parser.add_argument( + "--id", + nargs="+", + default=[], + help="List of application_id's for partial deploy. If not set, all application IDs defined in the inventory will be executed." + ) parser.add_argument( "-v", "--verbose", action="count", default=0, help="Increase verbosity level. Multiple -v flags increase detail (e.g., -vvv for maximum log output)." @@ -134,6 +151,7 @@ def main(): playbook=playbook_file, modes=modes, limit=args.limit, + allowed_applications=args.allowed_applications, password_file=args.password_file, verbose=args.verbose, skip_tests=args.skip_tests diff --git a/cli/generate_playbook.py b/cli/generate_playbook.py index 7102d5cc..fd35ef52 100644 --- a/cli/generate_playbook.py +++ b/cli/generate_playbook.py @@ -115,7 +115,7 @@ def generate_playbook_entries(roles_dir, prefix=None): role = roles[role_name] entries.append( f"- name: setup {role['application_id']}\n" - f" when: ('{role['application_id']}' in group_names)\n" + f" when: {role['application_id']} | application_allowed(group_names, allowed_applications)\n" f" include_role:\n" f" name: {role['role_name']}\n" ) diff --git a/filter_plugins/application_allowed.py b/filter_plugins/application_allowed.py new file mode 100644 index 00000000..a03cf31b --- /dev/null +++ b/filter_plugins/application_allowed.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 + +# Provides a filter to control which applications (roles) should be deployed + +from ansible.errors import AnsibleFilterError + + +def application_allowed(application_id: str, group_names: list, allowed_applications: list = []): + """ + Return True if: + - application_id exists in group_names, AND + - either allowed_applications is not provided (or empty), OR application_id is in allowed_applications. + + Parameters: + application_id (str): Name of the application/role to check. + group_names (list): List of groups the current host belongs to. + allowed_applications (list, optional): List of application IDs to allow. + + Returns: + bool: True if this application is allowed to deploy, False otherwise. + """ + # Ensure group_names is iterable + if not isinstance(group_names, (list, tuple)): + raise AnsibleFilterError(f"Expected group_names to be a list or tuple, got {type(group_names)}") + + # Must be part of the host's groups + if application_id not in group_names: + return False + + # If allowed_applications provided, only allow if ID is in that list + if allowed_applications: + if not isinstance(allowed_applications, (list, tuple)): + raise AnsibleFilterError(f"allowed_applications must be a list or tuple if provided, got {type(allowed_applications)}") + return application_id in allowed_applications + + # No filter provided → allow all in group_names + return True + + +class FilterModule(object): + def filters(self): + return { + 'application_allowed': application_allowed, + } diff --git a/group_vars/all/00_general.yml b/group_vars/all/00_general.yml index 799088a6..e05a8cdd 100644 --- a/group_vars/all/00_general.yml +++ b/group_vars/all/00_general.yml @@ -54,4 +54,7 @@ certbot_cert_path: "/etc/letsencrypt/live" # Path contain docker_restart_policy: "unless-stopped" # helper -_applications_nextcloud_oidc_flavor: "{{ applications.nextcloud.oidc.flavor | default('oidc_login' if applications.nextcloud.features.ldap | default(true) else 'sociallogin') }}" \ No newline at end of file +_applications_nextcloud_oidc_flavor: "{{ applications.nextcloud.oidc.flavor | default('oidc_login' if applications.nextcloud.features.ldap | default(true) else 'sociallogin') }}" + +# default value if not set via CLI (-e) or in playbook vars +allowed_applications: [] diff --git a/tests/unit/filter_plugins/test_application_allowed.py b/tests/unit/filter_plugins/test_application_allowed.py new file mode 100644 index 00000000..c7f47972 --- /dev/null +++ b/tests/unit/filter_plugins/test_application_allowed.py @@ -0,0 +1,46 @@ +import unittest +from filter_plugins.application_allowed import application_allowed +from ansible.errors import AnsibleFilterError + + +class TestApplicationAllowed(unittest.TestCase): + def test_application_not_in_group(self): + # application not in group_names should always return False + self.assertFalse(application_allowed('app1', ['other_group'], None)) + self.assertFalse(application_allowed('app1', ['other_group'], [])) + self.assertFalse(application_allowed('app1', ['other_group'], ['app1'])) + + def test_no_allowed_applications_allows_group_items(self): + # allowed_applications is None or empty -> allow if in group + self.assertTrue(application_allowed('app1', ['app1', 'app2'], None)) + # empty list treated as no filter -> allow all in group + self.assertTrue(application_allowed('app2', ['app1', 'app2'], [])) + + def test_allowed_applications_list(self): + group = ['app1', 'app2', 'app3'] + allowed = ['app2', 'app3'] + self.assertFalse(application_allowed('app1', group, allowed)) + self.assertTrue(application_allowed('app2', group, allowed)) + self.assertTrue(application_allowed('app3', group, allowed)) + + def test_allowed_applications_wrong_type(self): + # invalid allowed_applications type + with self.assertRaises(AnsibleFilterError): + application_allowed('app1', ['app1'], allowed_applications=123) + + def test_group_names_wrong_type(self): + # invalid group_names type + with self.assertRaises(AnsibleFilterError): + application_allowed('app1', 'not_a_list', None) + + def test_allowed_applications_edge_cases(self): + # whitespace-only entries do not affect result + group = ['app1'] + allowed = ['app1', ' ', ''] + self.assertTrue(application_allowed('app1', group, allowed)) + # application in group but not listed -> false + self.assertFalse(application_allowed('app2', ['app2'], allowed)) + + +if __name__ == '__main__': + unittest.main()