From 7791bd8c04a14c9b378a7ecc2013969f7065976d Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Mon, 1 Sep 2025 11:47:51 +0200 Subject: [PATCH] Implement filter checks: ensure all defined filters are used and remove dead code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Integration tests added/updated: - tests/integration/test_filters_usage.py: AST-based detection of filter definitions (FilterModule.filters), robust Jinja detection ({{ ... }}, {% ... %}, {% filter ... %}), plus Python call tracking; fails if a filter is used only under tests/. - tests/integration/test_filters_are_defined.py: inverse check — every filter used in .yml/.yaml/.j2/.jinja2/.tmpl must be defined locally. Scans only inside Jinja blocks and ignores pipes inside strings (e.g., lookup('pipe', "... | grep ... | awk ...")) to avoid false positives like trusted_hosts, woff/woff2, etc. Bug fixes & robustness: - Build regexes without %-string formatting to avoid ValueError from literal '%' in Jinja tags. - Strip quoted strings in usage analysis so sed/grep/awk pipes are not miscounted as filters. - Prevent self-matches in the defining file. Cleanup / removal of dead code: - Removed unused filter plugins and related unit tests: * filter_plugins/alias_domains_map.py * filter_plugins/get_application_id.py * filter_plugins/load_configuration.py * filter_plugins/safe.py * filter_plugins/safe_join.py * roles/svc-db-openldap/filter_plugins/build_ldap_nested_group_entries.py * roles/sys-ctl-bkp-docker-2-loc/filter_plugins/dict_to_cli_args.py * corresponding tests under tests/unit/* - roles/svc-db-postgres/filter_plugins/split_postgres_connections.py: dropped no-longer-needed list_postgres_roles API; adjusted tests. Misc: - sys-stk-front-proxy/defaults/main.yml: clarified valid vhost_flavour values (comma-separated). Ref: https://chatgpt.com/share/68b56bac-c4f8-800f-aeef-6708dbb44199 --- filter_plugins/alias_domains_map.py | 86 ------ filter_plugins/get_application_id.py | 49 ---- filter_plugins/load_configuration.py | 122 -------- filter_plugins/safe.py | 55 ---- filter_plugins/safe_join.py | 29 -- .../build_ldap_nested_group_entries.py | 77 ------ .../split_postgres_connections.py | 16 +- .../filter_plugins/dict_to_cli_args.py | 36 --- roles/sys-stk-front-proxy/defaults/main.yml | 2 +- tests/integration/test_filters_are_defined.py | 252 +++++++++++++++++ tests/integration/test_filters_usage.py | 260 ++++++++++++++++++ .../test_domain_filters_alias.py | 111 -------- .../filter_plugins/test_get_application_id.py | 63 ----- .../filter_plugins/test_load_configuration.py | 122 -------- tests/unit/filter_plugins/test_safe_join.py | 56 ---- .../test_safe_placeholders_filter.py | 59 ---- tests/unit/filter_plugins/test_safe_var.py | 44 --- .../unit/lookup_plugins/test_docker_cards.py | 1 - .../test_split_postgres_connections.py | 6 - .../filter_plugins/test_dict_to_cli_args.py | 61 ---- 20 files changed, 514 insertions(+), 993 deletions(-) delete mode 100644 filter_plugins/alias_domains_map.py delete mode 100644 filter_plugins/get_application_id.py delete mode 100644 filter_plugins/load_configuration.py delete mode 100644 filter_plugins/safe.py delete mode 100644 filter_plugins/safe_join.py delete mode 100644 roles/svc-db-openldap/filter_plugins/build_ldap_nested_group_entries.py delete mode 100644 roles/sys-ctl-bkp-docker-2-loc/filter_plugins/dict_to_cli_args.py create mode 100644 tests/integration/test_filters_are_defined.py create mode 100644 tests/integration/test_filters_usage.py delete mode 100644 tests/unit/filter_plugins/test_domain_filters_alias.py delete mode 100644 tests/unit/filter_plugins/test_get_application_id.py delete mode 100644 tests/unit/filter_plugins/test_load_configuration.py delete mode 100644 tests/unit/filter_plugins/test_safe_join.py delete mode 100644 tests/unit/filter_plugins/test_safe_placeholders_filter.py delete mode 100644 tests/unit/filter_plugins/test_safe_var.py delete mode 100644 tests/unit/roles/sys-ctl-bkp-docker-2-loc/filter_plugins/test_dict_to_cli_args.py diff --git a/filter_plugins/alias_domains_map.py b/filter_plugins/alias_domains_map.py deleted file mode 100644 index 748536c9..00000000 --- a/filter_plugins/alias_domains_map.py +++ /dev/null @@ -1,86 +0,0 @@ -from ansible.errors import AnsibleFilterError - -class FilterModule(object): - def filters(self): - return {'alias_domains_map': self.alias_domains_map} - - def alias_domains_map(self, apps, PRIMARY_DOMAIN): - """ - Build a map of application IDs to their alias domains. - - - If no `domains` key → [] - - If `domains` exists but is an empty dict → return the original cfg - - Explicit `aliases` are used (default appended if missing) - - If only `canonical` defined and it doesn't include default, default is added - - Invalid types raise AnsibleFilterError - """ - def parse_entry(domains_cfg, key, app_id): - if key not in domains_cfg: - return None - entry = domains_cfg[key] - if isinstance(entry, dict): - values = list(entry.values()) - elif isinstance(entry, list): - values = entry - else: - raise AnsibleFilterError( - f"Unexpected type for 'domains.{key}' in application '{app_id}': {type(entry).__name__}" - ) - for d in values: - if not isinstance(d, str) or not d.strip(): - raise AnsibleFilterError( - f"Invalid domain entry in '{key}' for application '{app_id}': {d!r}" - ) - return values - - def default_domain(app_id, primary): - return f"{app_id}.{primary}" - - # 1) Precompute canonical domains per app (fallback to default) - canonical_map = {} - for app_id, cfg in apps.items(): - domains_cfg = cfg.get('server',{}).get('domains',{}) - entry = domains_cfg.get('canonical') - if entry is None: - canonical_map[app_id] = [default_domain(app_id, PRIMARY_DOMAIN)] - elif isinstance(entry, dict): - canonical_map[app_id] = list(entry.values()) - elif isinstance(entry, list): - canonical_map[app_id] = list(entry) - else: - raise AnsibleFilterError( - f"Unexpected type for 'server.domains.canonical' in application '{app_id}': {type(entry).__name__}" - ) - - # 2) Build alias list per app - result = {} - for app_id, cfg in apps.items(): - domains_cfg = cfg.get('server',{}).get('domains') - - # no domains key → no aliases - if domains_cfg is None: - result[app_id] = [] - continue - - # empty domains dict → return the original cfg - if isinstance(domains_cfg, dict) and not domains_cfg: - result[app_id] = cfg - continue - - # otherwise, compute aliases - aliases = parse_entry(domains_cfg, 'aliases', app_id) or [] - default = default_domain(app_id, PRIMARY_DOMAIN) - has_aliases = 'aliases' in domains_cfg - has_canon = 'canonical' in domains_cfg - - if has_aliases: - if default not in aliases: - aliases.append(default) - elif has_canon: - canon = canonical_map.get(app_id, []) - if default not in canon and default not in aliases: - aliases.append(default) - - result[app_id] = aliases - - return result \ No newline at end of file diff --git a/filter_plugins/get_application_id.py b/filter_plugins/get_application_id.py deleted file mode 100644 index 318b3288..00000000 --- a/filter_plugins/get_application_id.py +++ /dev/null @@ -1,49 +0,0 @@ -import os -import re -import yaml -from ansible.errors import AnsibleFilterError - - -def get_application_id(role_name): - """ - Jinja2/Ansible filter: given a role name, load its vars/main.yml and return the application_id value. - """ - # Construct path: assumes current working directory is project root - vars_file = os.path.join(os.getcwd(), 'roles', role_name, 'vars', 'main.yml') - - if not os.path.isfile(vars_file): - raise AnsibleFilterError(f"Vars file not found for role '{role_name}': {vars_file}") - - try: - # Read entire file content to avoid lazy stream issues - with open(vars_file, 'r', encoding='utf-8') as f: - content = f.read() - data = yaml.safe_load(content) - except Exception as e: - raise AnsibleFilterError(f"Error reading YAML from {vars_file}: {e}") - - # Ensure parsed data is a mapping - if not isinstance(data, dict): - raise AnsibleFilterError( - f"Error reading YAML from {vars_file}: expected mapping, got {type(data).__name__}" - ) - - # Detect malformed YAML: no valid identifier-like keys - valid_key_pattern = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$') - if data and not any(valid_key_pattern.match(k) for k in data.keys()): - raise AnsibleFilterError(f"Error reading YAML from {vars_file}: invalid top-level keys") - - if 'application_id' not in data: - raise AnsibleFilterError(f"Key 'application_id' not found in {vars_file}") - - return data['application_id'] - - -class FilterModule(object): - """ - Ansible filter plugin entry point. - """ - def filters(self): - return { - 'get_application_id': get_application_id, - } diff --git a/filter_plugins/load_configuration.py b/filter_plugins/load_configuration.py deleted file mode 100644 index 38213092..00000000 --- a/filter_plugins/load_configuration.py +++ /dev/null @@ -1,122 +0,0 @@ -import os -import yaml -import re -from ansible.errors import AnsibleFilterError - -# in-memory cache: application_id → (parsed_yaml, is_nested) -_cfg_cache = {} - -def load_configuration(application_id, key): - if not isinstance(key, str): - raise AnsibleFilterError("Key must be a dotted-string, e.g. 'features.matomo'") - - # locate roles/ - here = os.path.dirname(__file__) - root = os.path.abspath(os.path.join(here, '..')) - roles_dir = os.path.join(root, 'roles') - if not os.path.isdir(roles_dir): - raise AnsibleFilterError(f"Roles directory not found at {roles_dir}") - - # first time? load & cache - if application_id not in _cfg_cache: - config_path = None - - # 1) primary: vars/main.yml declares it - for role in os.listdir(roles_dir): - mv = os.path.join(roles_dir, role, 'vars', 'main.yml') - if os.path.exists(mv): - try: - md = yaml.safe_load(open(mv)) or {} - except Exception: - md = {} - if md.get('application_id') == application_id: - cf = os.path.join(roles_dir, role, "config" , "main.yml") - if not os.path.exists(cf): - raise AnsibleFilterError( - f"Role '{role}' declares '{application_id}' but missing config/main.yml" - ) - config_path = cf - break - - # 2) fallback nested - if config_path is None: - for role in os.listdir(roles_dir): - cf = os.path.join(roles_dir, role, "config" , "main.yml") - if not os.path.exists(cf): - continue - try: - dd = yaml.safe_load(open(cf)) or {} - except Exception: - dd = {} - if isinstance(dd, dict) and application_id in dd: - config_path = cf - break - - # 3) fallback flat - if config_path is None: - for role in os.listdir(roles_dir): - cf = os.path.join(roles_dir, role, "config" , "main.yml") - if not os.path.exists(cf): - continue - try: - dd = yaml.safe_load(open(cf)) or {} - except Exception: - dd = {} - # flat style: dict with all non-dict values - if isinstance(dd, dict) and not any(isinstance(v, dict) for v in dd.values()): - config_path = cf - break - - if config_path is None: - return None - - # parse once - try: - parsed = yaml.safe_load(open(config_path)) or {} - except Exception as e: - raise AnsibleFilterError(f"Error loading config/main.yml at {config_path}: {e}") - - # detect nested vs flat - is_nested = isinstance(parsed, dict) and (application_id in parsed) - _cfg_cache[application_id] = (parsed, is_nested) - - parsed, is_nested = _cfg_cache[application_id] - - # pick base entry - entry = parsed[application_id] if is_nested else parsed - - # resolve dotted key - key_parts = key.split('.') - for part in key_parts: - # Check if part has an index (e.g., domains.canonical[0]) - match = re.match(r'([^\[]+)\[([0-9]+)\]', part) - if match: - part, index = match.groups() - index = int(index) - if isinstance(entry, dict) and part in entry: - entry = entry[part] - # Check if entry is a list and access the index - if isinstance(entry, list) and 0 <= index < len(entry): - entry = entry[index] - else: - raise AnsibleFilterError( - f"Index '{index}' out of range for key '{part}' in application '{application_id}'" - ) - else: - raise AnsibleFilterError( - f"Key '{part}' not found under application '{application_id}'" - ) - else: - if isinstance(entry, dict) and part in entry: - entry = entry[part] - else: - raise AnsibleFilterError( - f"Key '{part}' not found under application '{application_id}'" - ) - - return entry - - -class FilterModule(object): - def filters(self): - return {'load_configuration': load_configuration} diff --git a/filter_plugins/safe.py b/filter_plugins/safe.py deleted file mode 100644 index f1452d68..00000000 --- a/filter_plugins/safe.py +++ /dev/null @@ -1,55 +0,0 @@ -from jinja2 import Undefined - - -def safe_placeholders(template: str, mapping: dict = None) -> str: - """ - Format a template like "{url}/logo.png". - If mapping is provided (not None) and ANY placeholder is missing or maps to None/empty string, the function will raise KeyError. - If mapping is None, missing placeholders or invalid templates return empty string. - Numerical zero or False are considered valid values. - Any other formatting errors return an empty string. - """ - # Non-string templates yield empty - if not isinstance(template, str): - return '' - - class SafeDict(dict): - def __getitem__(self, key): - val = super().get(key, None) - # Treat None or empty string as missing - if val is None or (isinstance(val, str) and val == ''): - raise KeyError(key) - return val - def __missing__(self, key): - raise KeyError(key) - - silent = mapping is None - data = mapping or {} - try: - return template.format_map(SafeDict(data)) - except KeyError: - if silent: - return '' - raise - except Exception: - return '' - -def safe_var(value): - """ - Ansible filter: returns the value unchanged unless it's Undefined or None, - in which case returns an empty string. - Catches all exceptions and yields ''. - """ - try: - if isinstance(value, Undefined) or value is None: - return '' - return value - except Exception: - return '' - -class FilterModule(object): - def filters(self): - return { - 'safe_var': safe_var, - 'safe_placeholders': safe_placeholders, - } diff --git a/filter_plugins/safe_join.py b/filter_plugins/safe_join.py deleted file mode 100644 index 2f4d9d44..00000000 --- a/filter_plugins/safe_join.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -Ansible filter plugin that joins a base string and a tail path safely. -Raises ValueError if base or tail is None. -""" - -def safe_join(base, tail): - """ - Safely join base and tail into a path or URL. - - - base: the base string. Must not be None. - - tail: the string to append. Must not be None. - - On ValueError, caller should handle it. - """ - if base is None or tail is None: - raise ValueError("safe_join: base and tail must not be None") - - try: - base_str = str(base).rstrip('/') - tail_str = str(tail).lstrip('/') - return f"{base_str}/{tail_str}" - except Exception: - return '' - - -class FilterModule(object): - def filters(self): - return { - 'safe_join': safe_join, - } diff --git a/roles/svc-db-openldap/filter_plugins/build_ldap_nested_group_entries.py b/roles/svc-db-openldap/filter_plugins/build_ldap_nested_group_entries.py deleted file mode 100644 index 0aa172a0..00000000 --- a/roles/svc-db-openldap/filter_plugins/build_ldap_nested_group_entries.py +++ /dev/null @@ -1,77 +0,0 @@ -def build_ldap_nested_group_entries(applications, users, ldap): - """ - Builds structured LDAP role entries using the global `ldap` configuration. - Supports objectClasses: posixGroup (adds gidNumber, memberUid), groupOfNames (adds member). - Now nests roles under an application-level OU: application-id/role. - """ - - result = {} - - # Base DN components - role_dn_base = ldap["DN"]["OU"]["ROLES"] - user_dn_base = ldap["DN"]["OU"]["USERS"] - ldap_user_attr = ldap["USER"]["ATTRIBUTES"]["ID"] - - # Supported objectClass flavors - flavors = ldap.get("RBAC").get("FLAVORS") - - for application_id, app_config in applications.items(): - # Compute the DN for the application-level OU - app_ou_dn = f"ou={application_id},{role_dn_base}" - - ou_entry = { - "dn": app_ou_dn, - "objectClass": ["top", "organizationalUnit"], - "ou": application_id, - "description": f"Roles for application {application_id}" - } - result[app_ou_dn] = ou_entry - - # Standard roles with an extra 'administrator' - base_roles = app_config.get("rbac", {}).get("roles", {}) - roles = { - **base_roles, - "administrator": { - "description": "Has full administrative access: manage themes, plugins, settings, and users" - } - } - - group_id = app_config.get("group_id") - - for role_name, role_conf in roles.items(): - # Build CN under the application OU - cn = role_name - dn = f"cn={cn},{app_ou_dn}" - - entry = { - "dn": dn, - "cn": cn, - "description": role_conf.get("description", ""), - "objectClass": ["top"] + flavors, - } - - member_dns = [] - member_uids = [] - for username, user_conf in users.items(): - if role_name in user_conf.get("roles", []): - member_dns.append(f"{ldap_user_attr}={username},{user_dn_base}") - member_uids.append(username) - - if "posixGroup" in flavors: - entry["gidNumber"] = group_id - if member_uids: - entry["memberUid"] = member_uids - - if "groupOfNames" in flavors and member_dns: - entry["member"] = member_dns - - result[dn] = entry - - return result - - -class FilterModule(object): - def filters(self): - return { - "build_ldap_nested_group_entries": build_ldap_nested_group_entries - } diff --git a/roles/svc-db-postgres/filter_plugins/split_postgres_connections.py b/roles/svc-db-postgres/filter_plugins/split_postgres_connections.py index 38af789e..fab4eb32 100644 --- a/roles/svc-db-postgres/filter_plugins/split_postgres_connections.py +++ b/roles/svc-db-postgres/filter_plugins/split_postgres_connections.py @@ -37,22 +37,8 @@ def split_postgres_connections(total_connections, roles_dir="roles"): denom = max(count, 1) return max(1, total // denom) -def list_postgres_roles(roles_dir="roles"): - """ - Helper: return a list of role names that declare database_type: postgres in vars/main.yml. - """ - names = [] - if not os.path.isdir(roles_dir): - return names - for name in os.listdir(roles_dir): - vars_main = os.path.join(roles_dir, name, "vars", "main.yml") - if os.path.isfile(vars_main) and _is_postgres_role(vars_main): - names.append(name) - return names - class FilterModule(object): def filters(self): return { - "split_postgres_connections": split_postgres_connections, - "list_postgres_roles": list_postgres_roles, + "split_postgres_connections": split_postgres_connections } diff --git a/roles/sys-ctl-bkp-docker-2-loc/filter_plugins/dict_to_cli_args.py b/roles/sys-ctl-bkp-docker-2-loc/filter_plugins/dict_to_cli_args.py deleted file mode 100644 index 5d01f81b..00000000 --- a/roles/sys-ctl-bkp-docker-2-loc/filter_plugins/dict_to_cli_args.py +++ /dev/null @@ -1,36 +0,0 @@ -def dict_to_cli_args(data): - """ - Convert a dictionary into CLI argument string. - Example: - { - "backup-dir": "/mnt/backups", - "shutdown": True, - "ignore-volumes": ["redis", "memcached"] - } - becomes: - --backup-dir=/mnt/backups --shutdown --ignore-volumes="redis memcached" - """ - if not isinstance(data, dict): - raise TypeError("Expected a dictionary for CLI argument conversion") - - args = [] - - for key, value in data.items(): - cli_key = f"--{key}" - - if isinstance(value, bool): - if value: - args.append(cli_key) - elif isinstance(value, list): - items = " ".join(map(str, value)) - args.append(f'{cli_key}="{items}"') - elif value is not None: - args.append(f'{cli_key}={value}') - - return " ".join(args) - -class FilterModule(object): - def filters(self): - return { - 'dict_to_cli_args': dict_to_cli_args - } diff --git a/roles/sys-stk-front-proxy/defaults/main.yml b/roles/sys-stk-front-proxy/defaults/main.yml index 1d2369de..a3595cfb 100644 --- a/roles/sys-stk-front-proxy/defaults/main.yml +++ b/roles/sys-stk-front-proxy/defaults/main.yml @@ -1,5 +1,5 @@ # default vhost flavour -vhost_flavour: "basic" # valid: basic | ws_generic +vhost_flavour: "basic" # valid: basic, ws_generic # build the full template path from the flavour vhost_template_src: "roles/srv-proxy-core/templates/vhost/{{ vhost_flavour }}.conf.j2" \ No newline at end of file diff --git a/tests/integration/test_filters_are_defined.py b/tests/integration/test_filters_are_defined.py new file mode 100644 index 00000000..d8f1c7a7 --- /dev/null +++ b/tests/integration/test_filters_are_defined.py @@ -0,0 +1,252 @@ +# tests/integration/test_filters_are_defined.py +import ast +import os +import re +import unittest +from typing import Dict, List, Set, Tuple + +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) + +# Where filter definitions may exist +FILTER_PLUGIN_BASES = [ + os.path.join(PROJECT_ROOT, "filter_plugins"), + os.path.join(PROJECT_ROOT, "roles"), # includes roles/*/filter_plugins +] + +# Where to search for usages (EXCLUDES tests/ by default) +SEARCH_BASES = [PROJECT_ROOT] +EXCLUDE_TESTS = True # keep True to require real usage sites + +# File extensions to scan for template usage +USAGE_EXTS = (".yml", ".yaml", ".j2", ".jinja2", ".tmpl") + +# Built-in / common filters that shouldn't require local definitions +BUILTIN_FILTERS: Set[str] = { + # Jinja2 core/common + "abs", "attr", "batch", "capitalize", "center", "default", "d", "dictsort", "escape", + "e", "filesizeformat", "first", "float", "forceescape", "format", "groupby", "indent", + "int", "join", "last", "length", "list", "lower", "map", "min", "max", "random", + "reject", "rejectattr", "replace", "reverse", "round", "safe", "select", + "selectattr", "slice", "sort", "string", "striptags", "sum", "title", "trim", + "truncate", "unique", "upper", "urlencode", "urlize", "wordcount", "xmlattr", + + # Common Ansible filters (subset, extend as needed) + "b64decode", "b64encode", "basename", "dirname", "from_json", "to_json", + "from_yaml", "to_yaml", "combine", "difference", "intersect", + "flatten", "zip", "regex_search", "regex_replace", "bool", + "type_debug", "json_query", "mandatory", "hash", "checksum", + "lower", "upper", "capitalize", "unique", "dict2items", "items2dict", "password_hash", "path_join", "product", "quote", "split", "ternary", "to_nice_yaml", "tojson", + + # Date/time-ish + "strftime", +} + +def _iter_files(base: str, *, exts: Tuple[str, ...]): + for root, _, files in os.walk(base): + if EXCLUDE_TESTS and (os.sep + "tests" + os.sep) in (root + os.sep): + continue + for fn in files: + if fn.endswith(exts): + yield os.path.join(root, fn) + +def _is_filter_plugins_dir(path: str) -> bool: + return "filter_plugins" in os.path.normpath(path).split(os.sep) + +def _read(path: str) -> str: + try: + with open(path, "r", encoding="utf-8", errors="ignore") as f: + return f.read() + except Exception: + return "" + +# --------------------------- +# Collect defined filters (AST) +# --------------------------- + +class _FiltersCollector(ast.NodeVisitor): + def __init__(self): + self.defs: List[Tuple[str, str]] = [] + + def visit_Return(self, node: ast.Return): + self.defs.extend(self._extract_mapping(node.value)) + + def _extract_mapping(self, node) -> List[Tuple[str, str]]: + pairs: List[Tuple[str, str]] = [] + if isinstance(node, ast.Dict): + for k, v in zip(node.keys, node.values): + key = k.value if isinstance(k, ast.Constant) and isinstance(k.value, str) else None + val = self._name_of(v) + if key: + pairs.append((key, val)) + return pairs + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == "dict": + for kw in node.keywords or []: + if kw.arg: + pairs.append((kw.arg, self._name_of(kw.value))) + return pairs + if isinstance(node, ast.Name): + return [] + return [] + + @staticmethod + def _name_of(v) -> str: + if isinstance(v, ast.Name): + return v.id + if isinstance(v, ast.Attribute): + return v.attr + return "" + +def _collect_filters_from_filters_method(func: ast.FunctionDef) -> List[Tuple[str, str]]: + c = _FiltersCollector() + c.visit(func) + + name_dicts: Dict[str, List[Tuple[str, str]]] = {} + returned_names: List[str] = [] + + for n in ast.walk(func): + if isinstance(n, ast.Assign): + if len(n.targets) == 1 and isinstance(n.targets[0], ast.Name): + tgt = n.targets[0].id + pairs = _FiltersCollector()._extract_mapping(n.value) + if pairs: + name_dicts.setdefault(tgt, []).extend(pairs) + elif isinstance(n, ast.Call): + if isinstance(n.func, ast.Attribute) and n.func.attr == "update": + obj = n.func.value + if isinstance(obj, ast.Name) and n.args: + add_pairs = _FiltersCollector()._extract_mapping(n.args[0]) + if add_pairs: + name_dicts.setdefault(obj.id, []).extend(add_pairs) + elif isinstance(n, ast.Return) and isinstance(n.value, ast.Name): + returned_names.append(n.value.id) + + for nm in returned_names: + for p in name_dicts.get(nm, []): + c.defs.append(p) + + # dedupe + seen = set() + out: List[Tuple[str, str]] = [] + for k, v in c.defs: + if (k, v) not in seen: + seen.add((k, v)) + out.append((k, v)) + return out + +def collect_defined_filters() -> Set[str]: + defined: Set[str] = set() + for base in FILTER_PLUGIN_BASES: + for path in _iter_files(base, exts=(".py",)): + if not _is_filter_plugins_dir(path): + continue + code = _read(path) + if not code: + continue + try: + tree = ast.parse(code, filename=path) + except Exception: + continue + for node in tree.body: + if isinstance(node, ast.ClassDef) and node.name == "FilterModule": + for item in node.body: + if isinstance(item, ast.FunctionDef) and item.name == "filters": + for fname, _call in _collect_filters_from_filters_method(item): + defined.add(fname) + return defined + +# --------------------------- +# Collect used filters (Jinja-only scanning with string stripping) +# --------------------------- + +# Capture inner bodies of Jinja blocks +RE_JINJA_MUSTACHE = re.compile(r"\{\{(.*?)\}\}", re.DOTALL) +RE_JINJA_TAG = re.compile(r"\{%(.*?)%\}", re.DOTALL) + +# Within a Jinja body, capture "| filter_name" (with args or not) +RE_PIPE_IN_BODY = re.compile(r"\|\s*([A-Za-z_]\w*)\b") + +# Matches "{% filter filter_name %}" +RE_BLOCK_FILTER = re.compile(r"\{%\s*filter\s+([A-Za-z_]\w*)\b", re.DOTALL) + +def _strip_quoted(text: str) -> str: + """ + Remove content inside single/double quotes to avoid false positives for pipes in strings, + e.g. lookup('pipe', "pacman ... | grep ... | awk ...") -> pipes are ignored. + """ + out = [] + i = 0 + n = len(text) + quote = None + while i < n: + ch = text[i] + if quote is None: + if ch in ("'", '"'): + quote = ch + i += 1 + continue + out.append(ch) + i += 1 + else: + # inside quotes; handle simple escapes \" and \' + if ch == "\\" and i + 1 < n: + i += 2 + continue + if ch == quote: + quote = None + i += 1 + return "".join(out) + +def _extract_filters_from_jinja_body(body: str) -> Set[str]: + # Strip quoted strings first so pipes inside strings are ignored + body_no_str = _strip_quoted(body) + return {m.group(1) for m in RE_PIPE_IN_BODY.finditer(body_no_str)} + +def collect_used_filters() -> Set[str]: + used: Set[str] = set() + for base in SEARCH_BASES: + for path in _iter_files(base, exts=USAGE_EXTS): + text = _read(path) + if not text: + continue + + # 1) Filters used in {{ ... }} blocks + for m in RE_JINJA_MUSTACHE.finditer(text): + used |= _extract_filters_from_jinja_body(m.group(1)) + + # 2) Filters used in {% ... %} blocks (e.g., set, if, for) + for m in RE_JINJA_TAG.finditer(text): + used |= _extract_filters_from_jinja_body(m.group(1)) + + # 3) Block filter form: {% filter name %} ... {% endfilter %} + for m in RE_BLOCK_FILTER.finditer(text): + used.add(m.group(1)) + + return used + +# --------------------------- +# Test +# --------------------------- + +class TestAllUsedFiltersAreDefined(unittest.TestCase): + def test_all_used_filters_have_definitions(self): + defined = collect_defined_filters() + used = collect_used_filters() + + # Remove built-ins and known-safe filters + candidates = sorted(used - BUILTIN_FILTERS) + + # Unknown filters are those not defined locally + unknown = [f for f in candidates if f not in defined] + + if unknown: + lines = [ + "These filters are used in templates/YAML but have no local definition " + "(and are not in BUILTIN_FILTERS):" + ] + for f in unknown: + lines.append(f"- " + f) + self.fail("\n".join(lines)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/integration/test_filters_usage.py b/tests/integration/test_filters_usage.py new file mode 100644 index 00000000..43e9f1aa --- /dev/null +++ b/tests/integration/test_filters_usage.py @@ -0,0 +1,260 @@ +import ast +import os +import re +import unittest +from typing import Dict, List, Tuple, Optional + +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) + +FILTER_PLUGIN_BASES = [ + os.path.join(PROJECT_ROOT, "filter_plugins"), + os.path.join(PROJECT_ROOT, "roles"), +] + +SEARCH_BASES = [PROJECT_ROOT] + +SEARCH_EXTS = (".yml", ".yaml", ".j2", ".jinja2", ".tmpl", ".py") + +def _iter_files(base: str, *, py_only: bool = False): + for root, _, files in os.walk(base): + for fn in files: + if py_only and not fn.endswith(".py"): + continue + if not py_only and not fn.endswith(SEARCH_EXTS): + continue + yield os.path.join(root, fn) + +def _is_filter_plugins_dir(path: str) -> bool: + return "filter_plugins" in os.path.normpath(path).split(os.sep) + +def _read(path: str) -> str: + try: + with open(path, "r", encoding="utf-8", errors="ignore") as f: + return f.read() + except Exception: + return "" + +# --------------------------- +# Filter definition extraction +# --------------------------- + +class _FiltersCollector(ast.NodeVisitor): + """ + Extract mappings returned by FilterModule.filters(). + Handles: + return {'name': fn, "x": y} + d = {'name': fn}; d.update({...}); return d + return dict(name=fn, x=y) + """ + def __init__(self): + self.defs: List[Tuple[str, str]] = [] # (filter_name, callable_name) + + def visit_Return(self, node: ast.Return): + mapping = self._extract_mapping(node.value) + for k, v in mapping: + self.defs.append((k, v)) + + def _extract_mapping(self, node) -> List[Tuple[str, str]]: + pairs: List[Tuple[str, str]] = [] + + # dict literal + if isinstance(node, ast.Dict): + for k, v in zip(node.keys, node.values): + key = k.value if isinstance(k, ast.Constant) and isinstance(k.value, str) else None + val = self._name_of(v) + if key: + pairs.append((key, val)) + return pairs + + # dict(...) call + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == "dict": + # keywords: dict(name=fn) + for kw in node.keywords or []: + if kw.arg: + pairs.append((kw.arg, self._name_of(kw.value))) + return pairs + + # Name (variable) that might be a dict assembled earlier in the function + if isinstance(node, ast.Name): + # Fallback: we can't easily dataflow-resolve here; handled elsewhere by walking Assign/Call + return [] + + return [] + + @staticmethod + def _name_of(v) -> str: + if isinstance(v, ast.Name): + return v.id + if isinstance(v, ast.Attribute): + return v.attr # take right-most name + return "" + +def _collect_filters_from_filters_method(func: ast.FunctionDef) -> List[Tuple[str, str]]: + """ + Walks the function to assemble any mapping that flows into the return. + We capture direct return dicts and also a common pattern: + d = {...} + d.update({...}) + return d + """ + collector = _FiltersCollector() + collector.visit(func) + + # additionally scan simple 'X = {...}' and 'X.update({...})' patterns, + # and if 'return X' occurs, merge those dicts. + name_dicts: Dict[str, List[Tuple[str, str]]] = {} + returns: List[str] = [] + + for n in ast.walk(func): + if isinstance(n, ast.Assign): + # X = { ... } + if len(n.targets) == 1 and isinstance(n.targets[0], ast.Name): + tgt = n.targets[0].id + pairs = _FiltersCollector()._extract_mapping(n.value) + if pairs: + name_dicts.setdefault(tgt, []).extend(pairs) + elif isinstance(n, ast.Call): + # X.update({ ... }) + if isinstance(n.func, ast.Attribute) and n.func.attr == "update": + obj = n.func.value + if isinstance(obj, ast.Name): + add_pairs = _FiltersCollector()._extract_mapping(n.args[0] if n.args else None) + if add_pairs: + name_dicts.setdefault(obj.id, []).extend(add_pairs) + elif isinstance(n, ast.Return) and isinstance(n.value, ast.Name): + returns.append(n.value.id) + + for rname in returns: + for p in name_dicts.get(rname, []): + collector.defs.append(p) + + # dedupe + seen = set() + out: List[Tuple[str, str]] = [] + for k, v in collector.defs: + if (k, v) not in seen: + seen.add((k, v)) + out.append((k, v)) + return out + +def _ast_collect_filters_from_file(path: str) -> List[Tuple[str, str, str]]: + code = _read(path) + if not code: + return [] + try: + tree = ast.parse(code, filename=path) + except Exception: + return [] + + results: List[Tuple[str, str, str]] = [] + for node in tree.body: + if isinstance(node, ast.ClassDef) and node.name == "FilterModule": + for item in node.body: + if isinstance(item, ast.FunctionDef) and item.name == "filters": + for (fname, callname) in _collect_filters_from_filters_method(item): + results.append((fname, callname, path)) + return results + +def collect_defined_filters() -> List[Dict[str, str]]: + found: List[Dict[str, str]] = [] + for base in FILTER_PLUGIN_BASES: + for path in _iter_files(base, py_only=True): + if not _is_filter_plugins_dir(path): + continue + for (filter_name, callable_name, fpath) in _ast_collect_filters_from_file(path): + found.append({"filter": filter_name, "callable": callable_name, "file": fpath}) + return found + +# --------------------------- +# Usage detection +# --------------------------- + +def _compile_jinja_patterns(name: str) -> list[re.Pattern]: + """ + Build robust patterns that match Jinja filter usage without using '%' string formatting. + Handles: + - {{ ... | name }} + - {% ... | name %} + - {% filter name %}...{% endfilter %} + - bare YAML/Jinja like: when: x | name + """ + escaped = re.escape(name) + return [ + re.compile(r"\{\{[^}]*\|\s*" + escaped + r"\b", re.DOTALL), # {{ ... | name }} + re.compile(r"\{%\s*[^%]*\|\s*" + escaped + r"\b", re.DOTALL), # {% ... | name %} + re.compile(r"\{%\s*filter\s+" + escaped + r"\b"), # {% filter name %} + re.compile(r"\|\s*" + escaped + r"\b"), # bare: when: x | name + ] + +def _python_call_pattern(callable_name: str) -> Optional[re.Pattern]: + if not callable_name: + return None + return re.compile(r"\b%s\s*\(" % re.escape(callable_name)) + +def search_usage(filter_name: str, callable_name: str, *, skip_file: str) -> tuple[bool, bool]: + """ + Search for filter usage. + + Returns tuple: + (used_anywhere, used_outside_tests) + + - used_anywhere: True if found in repo at all + - used_outside_tests: True if found outside tests/ + """ + jinja_pats = _compile_jinja_patterns(filter_name) + py_pat = _python_call_pattern(callable_name) + + used_anywhere = False + used_outside_tests = False + + for base in SEARCH_BASES: + for path in _iter_files(base, py_only=False): + try: + if os.path.samefile(path, skip_file): + continue + except Exception: + pass + + content = _read(path) + if not content: + continue + + hit = False + for pat in jinja_pats: + if pat.search(content): + hit = True + break + + if not hit and py_pat and path.endswith(".py") and py_pat.search(content): + hit = True + + if hit: + used_anywhere = True + if "/tests/" not in path and not path.endswith("tests"): + used_outside_tests = True + + return used_anywhere, used_outside_tests + +class TestFilterDefinitionsAreUsed(unittest.TestCase): + def test_every_defined_filter_is_used(self): + definitions = collect_defined_filters() + if not definitions: + self.skipTest("No filters found under filter_plugins/.") + + unused = [] + for d in definitions: + f_name, c_name, f_path = d["filter"], d["callable"], d["file"] + used_any, used_outside = search_usage(f_name, c_name, skip_file=f_path) + if not used_any: + unused.append((f_name, c_name, f_path, "not used anywhere")) + elif not used_outside: + unused.append((f_name, c_name, f_path, "only used in tests")) + + if unused: + msg = ["The following filters are invalidly unused:"] + for f, c, p, reason in sorted(unused): + msg.append(f"- '{f}' (callable '{c or 'unknown'}') defined in {p} → {reason}") + self.fail("\n".join(msg)) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/filter_plugins/test_domain_filters_alias.py b/tests/unit/filter_plugins/test_domain_filters_alias.py deleted file mode 100644 index c8b575ec..00000000 --- a/tests/unit/filter_plugins/test_domain_filters_alias.py +++ /dev/null @@ -1,111 +0,0 @@ -import os -import sys -import unittest - -# Add the filter_plugins directory to the import path -dir_path = os.path.abspath( - os.path.join(os.path.dirname(__file__), '../../../filter_plugins') -) -sys.path.insert(0, dir_path) - -from ansible.errors import AnsibleFilterError -from alias_domains_map import FilterModule - -class TestDomainFilters(unittest.TestCase): - def setUp(self): - self.filter_module = FilterModule() - # Sample primary domain - self.primary = 'example.com' - - def test_alias_empty_apps(self): - apps = {} - expected = {} - result = self.filter_module.alias_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - def test_alias_without_aliases_and_no_canonical(self): - apps = {'app1': {}} - # canonical defaults to ['app1.example.com'], so alias should be [] - expected = {'app1': []} - result = self.filter_module.alias_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - def test_alias_with_explicit_aliases(self): - apps = { - 'app1': { - 'server':{ - 'domains': {'aliases': ['alias.com']} - } - } - } - # canonical defaults to ['app1.example.com'], so alias should include alias.com and default - expected = {'app1': ['alias.com', 'app1.example.com']} - result = self.filter_module.alias_domains_map(apps, self.primary) - self.assertCountEqual(result['app1'], expected['app1']) - - def test_alias_with_canonical_not_default(self): - apps = { - 'app1': { - 'server':{'domains': {'canonical': ['foo.com']}} - } - } - # foo.com is canonical, default not in canonical so added as alias - expected = {'app1': ['app1.example.com']} - result = self.filter_module.alias_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - def test_alias_with_existing_default(self): - apps = { - 'app1': { - 'server':{ - 'domains': { - 'canonical': ['foo.com'], - 'aliases': ['app1.example.com'] - } - } - } - } - # default present in aliases, should not be duplicated - expected = {'app1': ['app1.example.com']} - result = self.filter_module.alias_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - def test_invalid_aliases_type(self): - apps = { - 'app1': {'server':{'domains': {'aliases': 123}}} - } - with self.assertRaises(AnsibleFilterError): - self.filter_module.alias_domains_map(apps, self.primary) - - def test_alias_with_empty_domains_cfg(self): - apps = { - 'app1': { - 'server':{ - 'domains': {} - } - } - } - expected = apps - result = self.filter_module.alias_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - def test_alias_with_canonical_dict_not_default(self): - apps = { - 'app1': { - 'server':{ - 'domains': { - 'canonical': { - 'one': 'one.com', - 'two': 'two.com' - } - } - } - } - } - expected = {'app1': ['app1.example.com']} - result = self.filter_module.alias_domains_map(apps, self.primary) - self.assertEqual(result, expected) - - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/tests/unit/filter_plugins/test_get_application_id.py b/tests/unit/filter_plugins/test_get_application_id.py deleted file mode 100644 index 2891e65a..00000000 --- a/tests/unit/filter_plugins/test_get_application_id.py +++ /dev/null @@ -1,63 +0,0 @@ -# tests/unit/filter_plugins/test_get_application_id.py -import unittest -import os -import tempfile -import shutil -import yaml -from ansible.errors import AnsibleFilterError -from filter_plugins.get_application_id import get_application_id - -class TestGetApplicationIdFilter(unittest.TestCase): - def setUp(self): - # Create a temporary project directory and switch to it - self.tmpdir = tempfile.mkdtemp() - self.original_cwd = os.getcwd() - os.chdir(self.tmpdir) - - # Create the roles/testrole/vars directory structure - self.role_name = 'testrole' - self.vars_dir = os.path.join('roles', self.role_name, 'vars') - os.makedirs(self.vars_dir) - self.vars_file = os.path.join(self.vars_dir, 'main.yml') - - def tearDown(self): - # Return to original cwd and remove temp directory - os.chdir(self.original_cwd) - shutil.rmtree(self.tmpdir) - - def write_vars_file(self, content): - with open(self.vars_file, 'w') as f: - yaml.dump(content, f) - - def test_returns_application_id(self): - # Given a valid vars file with application_id - expected_id = '12345' - self.write_vars_file({'application_id': expected_id}) - # When - result = get_application_id(self.role_name) - # Then - self.assertEqual(result, expected_id) - - def test_file_not_found_raises_error(self): - # Given no vars file for a nonexistent role - with self.assertRaises(AnsibleFilterError) as cm: - get_application_id('nonexistent_role') - self.assertIn("Vars file not found", str(cm.exception)) - - def test_missing_key_raises_error(self): - # Given a vars file without application_id - self.write_vars_file({'other_key': 'value'}) - with self.assertRaises(AnsibleFilterError) as cm: - get_application_id(self.role_name) - self.assertIn("Key 'application_id' not found", str(cm.exception)) - - def test_invalid_yaml_raises_error(self): - # Write invalid YAML content - with open(self.vars_file, 'w') as f: - f.write(":::not a yaml:::") - with self.assertRaises(AnsibleFilterError) as cm: - get_application_id(self.role_name) - self.assertIn("Error reading YAML", str(cm.exception)) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/filter_plugins/test_load_configuration.py b/tests/unit/filter_plugins/test_load_configuration.py deleted file mode 100644 index 0a6718f5..00000000 --- a/tests/unit/filter_plugins/test_load_configuration.py +++ /dev/null @@ -1,122 +0,0 @@ -import os -import sys -import unittest -from unittest.mock import patch, mock_open -from ansible.errors import AnsibleFilterError - -# make sure our plugin is on PYTHONPATH -root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins')) -sys.path.insert(0, root) - -import load_configuration -from load_configuration import FilterModule, _cfg_cache - -class TestLoadConfigurationFilter(unittest.TestCase): - def setUp(self): - _cfg_cache.clear() - self.f = FilterModule().filters()['load_configuration'] - self.app = 'html' - self.nested_cfg = { - 'html': { - 'features': {'matomo': True}, - 'server': { - 'domains':{'canonical': ['html.example.com']} - } - } - } - self.flat_cfg = { - 'features': {'matomo': False}, - 'server': {'domains':{'canonical': ['flat.example.com']}} - } - - def test_invalid_key(self): - with self.assertRaises(AnsibleFilterError): - self.f(self.app, None) - - @patch('load_configuration.os.path.isdir', return_value=False) - def test_no_roles_dir(self, _): - with self.assertRaises(AnsibleFilterError): - self.f(self.app, 'features.matomo') - - @patch('load_configuration.os.listdir', return_value=['r1']) - @patch('load_configuration.os.path.isdir', return_value=True) - @patch('load_configuration.os.path.exists', return_value=False) - def test_no_matching_role(self, *_): - self.assertIsNone(self.f(self.app, 'features.matomo')) - - @patch('load_configuration.os.listdir', return_value=['r1']) - @patch('load_configuration.os.path.isdir', return_value=True) - @patch('load_configuration.os.path.exists') - @patch('load_configuration.open', new_callable=mock_open) - @patch('load_configuration.yaml.safe_load') - def test_primary_missing_conf(self, mock_yaml, mock_file, mock_exists, *_): - mock_exists.side_effect = lambda p: p.endswith('vars/main.yml') - mock_yaml.return_value = {'application_id': self.app} - with self.assertRaises(AnsibleFilterError): - self.f(self.app, 'features.matomo') - - @patch('load_configuration.os.listdir', return_value=['r1']) - @patch('load_configuration.os.path.isdir', return_value=True) - @patch('load_configuration.os.path.exists') - @patch('load_configuration.open', new_callable=mock_open) - @patch('load_configuration.yaml.safe_load') - def test_primary_and_cache(self, mock_yaml, mock_file, mock_exists, *_): - mock_exists.side_effect = lambda p: p.endswith('vars/main.yml') or p.endswith('config/main.yml') - mock_yaml.side_effect = [ - {'application_id': self.app}, # main.yml - self.nested_cfg # config/main.yml - ] - # first load - self.assertTrue(self.f(self.app, 'features.matomo')) - self.assertIn(self.app, _cfg_cache) - mock_yaml.reset_mock() - # from cache - self.assertEqual(self.f(self.app, 'server.domains.canonical'), - ['html.example.com']) - mock_yaml.assert_not_called() - - @patch('load_configuration.os.listdir', return_value=['r1']) - @patch('load_configuration.os.path.isdir', return_value=True) - @patch('load_configuration.os.path.exists', return_value=True) - @patch('load_configuration.open', mock_open(read_data="html: {}")) - @patch('load_configuration.yaml.safe_load', return_value={'html': {}}) - def test_key_not_found_after_load(self, *_): - with self.assertRaises(AnsibleFilterError): - self.f(self.app, 'does.not.exist') - - @patch('load_configuration.os.listdir', return_value=['r2']) - @patch('load_configuration.os.path.isdir', return_value=True) - @patch('load_configuration.os.path.exists') - @patch('load_configuration.open', new_callable=mock_open) - @patch('load_configuration.yaml.safe_load') - def test_fallback_nested(self, mock_yaml, mock_file, mock_exists, *_): - mock_exists.side_effect = lambda p: p.endswith('config/main.yml') - mock_yaml.return_value = self.nested_cfg - # nested fallback must work - self.assertTrue(self.f(self.app, 'features.matomo')) - self.assertEqual(self.f(self.app, 'server.domains.canonical'), - ['html.example.com']) - - @patch('load_configuration.os.listdir', return_value=['r4']) - @patch('load_configuration.os.path.isdir', return_value=True) - @patch('load_configuration.os.path.exists') - @patch('load_configuration.open', new_callable=mock_open) - @patch('load_configuration.yaml.safe_load') - def test_fallback_with_indexed_key(self, mock_yaml, mock_file, mock_exists, *_): - # Testing with an indexed key like domains.canonical[0] - mock_exists.side_effect = lambda p: p.endswith('config/main.yml') - mock_yaml.return_value = { - 'file': { - 'server': { - 'domains':{ - 'canonical': ['files.example.com', 'extra.example.com'] - } - } - } - } - # should get the first element of the canonical domains list - self.assertEqual(self.f('file', 'server.domains.canonical[0]'), - 'files.example.com') - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/filter_plugins/test_safe_join.py b/tests/unit/filter_plugins/test_safe_join.py deleted file mode 100644 index 96bfa289..00000000 --- a/tests/unit/filter_plugins/test_safe_join.py +++ /dev/null @@ -1,56 +0,0 @@ -import unittest -import sys -import os - -# Ensure filter_plugins directory is on the path -sys.path.insert( - 0, - os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins')) -) - -from safe_join import safe_join - -class TestSafeJoinFilter(unittest.TestCase): - def test_join_with_trailing_slashes(self): - self.assertEqual( - safe_join('http://example.com/', '/path/to'), - 'http://example.com/path/to' - ) - - def test_join_without_slashes(self): - self.assertEqual( - safe_join('http://example.com', 'path/to'), - 'http://example.com/path/to' - ) - - def test_base_none(self): - with self.assertRaises(ValueError): - safe_join(None, 'path') - - def test_tail_none(self): - with self.assertRaises(ValueError): - safe_join('http://example.com', None) - - def test_base_empty(self): - self.assertEqual(safe_join('', 'path'), '/path') - - def test_tail_empty(self): - # joining with empty tail should yield base with trailing slash - self.assertEqual( - safe_join('http://example.com', ''), - 'http://example.com/' - ) - - def test_numeric_base(self): - # numeric base is cast to string - self.assertEqual(safe_join(123, 'path'), '123/path') - - def test_exception_in_str(self): - class Bad: - def __str__(self): - raise ValueError('bad') - # on exception, safe_join returns '' - self.assertEqual(safe_join(Bad(), 'x'), '') - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/filter_plugins/test_safe_placeholders_filter.py b/tests/unit/filter_plugins/test_safe_placeholders_filter.py deleted file mode 100644 index 227fa94b..00000000 --- a/tests/unit/filter_plugins/test_safe_placeholders_filter.py +++ /dev/null @@ -1,59 +0,0 @@ -import unittest -import sys -import os - -# Ensure filter_plugins directory is on the path -sys.path.insert( - 0, - os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins')) -) - -from safe import safe_placeholders - -class TestSafePlaceholdersFilter(unittest.TestCase): - def test_simple_replacement(self): - template = "Hello, {user}!" - mapping = {'user': 'Alice'} - self.assertEqual(safe_placeholders(template, mapping), "Hello, Alice!") - - def test_missing_placeholder(self): - template = "Hello, {user}!" - # Missing placeholder should raise KeyError - with self.assertRaises(KeyError): - safe_placeholders(template, {}) - - def test_none_template(self): - self.assertEqual(safe_placeholders(None, {'user': 'Alice'}), "") - - def test_no_placeholders(self): - template = "Just a plain string" - mapping = {'any': 'value'} - self.assertEqual(safe_placeholders(template, mapping), "Just a plain string") - - def test_multiple_placeholders(self): - template = "{greet}, {user}!" - mapping = {'greet': 'Hi', 'user': 'Bob'} - self.assertEqual(safe_placeholders(template, mapping), "Hi, Bob!") - - def test_numeric_values(self): - template = "Count: {n}" - mapping = {'n': 0} - self.assertEqual(safe_placeholders(template, mapping), "Count: 0") - - def test_extra_mapping_keys(self): - template = "Value: {a}" - mapping = {'a': '1', 'b': '2'} - self.assertEqual(safe_placeholders(template, mapping), "Value: 1") - - def test_malformed_template(self): - # Unclosed placeholder should be caught and return empty string - template = "Unclosed {key" - mapping = {'key': 'value'} - self.assertEqual(safe_placeholders(template, mapping), "") - - def test_mapping_none(self): - template = "Test {x}" - self.assertEqual(safe_placeholders(template, None), "") - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/filter_plugins/test_safe_var.py b/tests/unit/filter_plugins/test_safe_var.py deleted file mode 100644 index 24c5c716..00000000 --- a/tests/unit/filter_plugins/test_safe_var.py +++ /dev/null @@ -1,44 +0,0 @@ -import unittest -import sys -import os -from jinja2 import Undefined - -# Ensure filter_plugins directory is on the path -sys.path.insert( - 0, - os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins')) -) - -from safe import FilterModule - -class TestSafeVarFilter(unittest.TestCase): - def setUp(self): - # Retrieve the safe_var filter function - self.filter = FilterModule().filters()['safe_var'] - - def test_returns_non_empty_string(self): - self.assertEqual(self.filter('hello'), 'hello') - - def test_returns_empty_string(self): - self.assertEqual(self.filter(''), '') - - def test_returns_empty_for_none(self): - self.assertEqual(self.filter(None), '') - - def test_returns_empty_for_jinja_undefined(self): - # Instantiate an Undefined without arguments - undef = Undefined() - self.assertEqual(self.filter(undef), '') - - def test_returns_zero_for_zero(self): - # 0 is falsey but not None or Undefined, so safe_var returns it - self.assertEqual(self.filter(0), 0) - - def test_returns_list_and_dict_unchanged(self): - data = {'key': 'value'} - self.assertEqual(self.filter(data), data) - lst = [1, 2, 3] - self.assertEqual(self.filter(lst), lst) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/unit/lookup_plugins/test_docker_cards.py b/tests/unit/lookup_plugins/test_docker_cards.py index b577d9b4..0f2a7e10 100644 --- a/tests/unit/lookup_plugins/test_docker_cards.py +++ b/tests/unit/lookup_plugins/test_docker_cards.py @@ -23,7 +23,6 @@ class TestDockerCardsLookup(unittest.TestCase): os.makedirs(os.path.join(self.role_dir, "meta")) os.makedirs(os.path.join(self.role_dir, "vars")) - # Create vars/main.yml so get_application_id() can find the application_id. vars_main = os.path.join(self.role_dir, "vars", "main.yml") with open(vars_main, "w", encoding="utf-8") as f: f.write("application_id: portfolio\n") diff --git a/tests/unit/roles/svc-db-postgres/filter_plugins/test_split_postgres_connections.py b/tests/unit/roles/svc-db-postgres/filter_plugins/test_split_postgres_connections.py index 60db4157..443aaf6e 100644 --- a/tests/unit/roles/svc-db-postgres/filter_plugins/test_split_postgres_connections.py +++ b/tests/unit/roles/svc-db-postgres/filter_plugins/test_split_postgres_connections.py @@ -77,12 +77,6 @@ class SplitPostgresConnectionsTests(unittest.TestCase): def test_registry_contains_filters(self): registry = self.mod.FilterModule().filters() self.assertIn("split_postgres_connections", registry) - self.assertIn("list_postgres_roles", registry) - - def test_list_postgres_roles(self): - roles = self.mod.list_postgres_roles(self.roles_dir) - self.assertIsInstance(roles, list) - self.assertSetEqual(set(roles), {"app_a", "app_b"}) def test_split_postgres_connections_division(self): # There are 2 postgres roles -> 200 / 2 = 100 diff --git a/tests/unit/roles/sys-ctl-bkp-docker-2-loc/filter_plugins/test_dict_to_cli_args.py b/tests/unit/roles/sys-ctl-bkp-docker-2-loc/filter_plugins/test_dict_to_cli_args.py deleted file mode 100644 index 6155b0e6..00000000 --- a/tests/unit/roles/sys-ctl-bkp-docker-2-loc/filter_plugins/test_dict_to_cli_args.py +++ /dev/null @@ -1,61 +0,0 @@ -import unittest -import os -import sys - -# Add the path to roles/sys-ctl-bkp-docker-2-loc/filter_plugins -CURRENT_DIR = os.path.dirname(__file__) -FILTER_PLUGIN_DIR = os.path.abspath( - os.path.join(CURRENT_DIR, '../../../../../roles/sys-ctl-bkp-docker-2-loc/filter_plugins') -) -sys.path.insert(0, FILTER_PLUGIN_DIR) - -from dict_to_cli_args import dict_to_cli_args - - -class TestDictToCliArgs(unittest.TestCase): - def test_simple_string_args(self): - data = {"backup-dir": "/mnt/backups", "version-suffix": "-nightly"} - expected = "--backup-dir=/mnt/backups --version-suffix=-nightly" - self.assertEqual(dict_to_cli_args(data), expected) - - def test_boolean_true(self): - data = {"shutdown": True, "everything": True} - expected = "--shutdown --everything" - self.assertEqual(dict_to_cli_args(data), expected) - - def test_boolean_false(self): - data = {"shutdown": False, "everything": True} - expected = "--everything" - self.assertEqual(dict_to_cli_args(data), expected) - - def test_list_argument(self): - data = {"ignore-volumes": ["redis", "memcached"]} - expected = '--ignore-volumes="redis memcached"' - self.assertEqual(dict_to_cli_args(data), expected) - - def test_mixed_arguments(self): - data = { - "backup-dir": "/mnt/backups", - "shutdown": True, - "ignore-volumes": ["redis", "memcached"] - } - result = dict_to_cli_args(data) - self.assertIn("--backup-dir=/mnt/backups", result) - self.assertIn("--shutdown", result) - self.assertIn('--ignore-volumes="redis memcached"', result) - - def test_empty_dict(self): - self.assertEqual(dict_to_cli_args({}), "") - - def test_none_value(self): - data = {"some-value": None, "other": "yes"} - expected = "--other=yes" - self.assertEqual(dict_to_cli_args(data), expected) - - def test_invalid_type(self): - with self.assertRaises(TypeError): - dict_to_cli_args(["not", "a", "dict"]) - - -if __name__ == "__main__": - unittest.main()