Implement filter checks: ensure all defined filters are used and remove dead code

Integration tests added/updated:
- tests/integration/test_filters_usage.py: AST-based detection of filter definitions (FilterModule.filters), robust Jinja detection ({{ ... }}, {% ... %}, {% filter ... %}), plus Python call tracking; fails if a filter is used only under tests/.
- tests/integration/test_filters_are_defined.py: inverse check — every filter used in .yml/.yaml/.j2/.jinja2/.tmpl must be defined locally. Scans only inside Jinja blocks and ignores pipes inside strings (e.g., lookup('pipe', "... | grep ... | awk ...")) to avoid false positives like trusted_hosts, woff/woff2, etc.

Bug fixes & robustness:
- Build regexes without %-string formatting to avoid ValueError from literal '%' in Jinja tags.
- Strip quoted strings in usage analysis so sed/grep/awk pipes are not miscounted as filters.
- Prevent self-matches in the defining file.

Cleanup / removal of dead code:
- Removed unused filter plugins and related unit tests:
  * filter_plugins/alias_domains_map.py
  * filter_plugins/get_application_id.py
  * filter_plugins/load_configuration.py
  * filter_plugins/safe.py
  * filter_plugins/safe_join.py
  * roles/svc-db-openldap/filter_plugins/build_ldap_nested_group_entries.py
  * roles/sys-ctl-bkp-docker-2-loc/filter_plugins/dict_to_cli_args.py
  * corresponding tests under tests/unit/*
- roles/svc-db-postgres/filter_plugins/split_postgres_connections.py: dropped no-longer-needed list_postgres_roles API; adjusted tests.

Misc:
- sys-stk-front-proxy/defaults/main.yml: clarified valid vhost_flavour values (comma-separated).

Ref: https://chatgpt.com/share/68b56bac-c4f8-800f-aeef-6708dbb44199
This commit is contained in:
2025-09-01 11:47:51 +02:00
parent 34b3f3b0ad
commit 7791bd8c04
20 changed files with 514 additions and 993 deletions

View File

@@ -0,0 +1,252 @@
# tests/integration/test_filters_are_defined.py
import ast
import os
import re
import unittest
from typing import Dict, List, Set, Tuple
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
# Where filter definitions may exist
FILTER_PLUGIN_BASES = [
os.path.join(PROJECT_ROOT, "filter_plugins"),
os.path.join(PROJECT_ROOT, "roles"), # includes roles/*/filter_plugins
]
# Where to search for usages (EXCLUDES tests/ by default)
SEARCH_BASES = [PROJECT_ROOT]
EXCLUDE_TESTS = True # keep True to require real usage sites
# File extensions to scan for template usage
USAGE_EXTS = (".yml", ".yaml", ".j2", ".jinja2", ".tmpl")
# Built-in / common filters that shouldn't require local definitions
BUILTIN_FILTERS: Set[str] = {
# Jinja2 core/common
"abs", "attr", "batch", "capitalize", "center", "default", "d", "dictsort", "escape",
"e", "filesizeformat", "first", "float", "forceescape", "format", "groupby", "indent",
"int", "join", "last", "length", "list", "lower", "map", "min", "max", "random",
"reject", "rejectattr", "replace", "reverse", "round", "safe", "select",
"selectattr", "slice", "sort", "string", "striptags", "sum", "title", "trim",
"truncate", "unique", "upper", "urlencode", "urlize", "wordcount", "xmlattr",
# Common Ansible filters (subset, extend as needed)
"b64decode", "b64encode", "basename", "dirname", "from_json", "to_json",
"from_yaml", "to_yaml", "combine", "difference", "intersect",
"flatten", "zip", "regex_search", "regex_replace", "bool",
"type_debug", "json_query", "mandatory", "hash", "checksum",
"lower", "upper", "capitalize", "unique", "dict2items", "items2dict", "password_hash", "path_join", "product", "quote", "split", "ternary", "to_nice_yaml", "tojson",
# Date/time-ish
"strftime",
}
def _iter_files(base: str, *, exts: Tuple[str, ...]):
for root, _, files in os.walk(base):
if EXCLUDE_TESTS and (os.sep + "tests" + os.sep) in (root + os.sep):
continue
for fn in files:
if fn.endswith(exts):
yield os.path.join(root, fn)
def _is_filter_plugins_dir(path: str) -> bool:
return "filter_plugins" in os.path.normpath(path).split(os.sep)
def _read(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
except Exception:
return ""
# ---------------------------
# Collect defined filters (AST)
# ---------------------------
class _FiltersCollector(ast.NodeVisitor):
def __init__(self):
self.defs: List[Tuple[str, str]] = []
def visit_Return(self, node: ast.Return):
self.defs.extend(self._extract_mapping(node.value))
def _extract_mapping(self, node) -> List[Tuple[str, str]]:
pairs: List[Tuple[str, str]] = []
if isinstance(node, ast.Dict):
for k, v in zip(node.keys, node.values):
key = k.value if isinstance(k, ast.Constant) and isinstance(k.value, str) else None
val = self._name_of(v)
if key:
pairs.append((key, val))
return pairs
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == "dict":
for kw in node.keywords or []:
if kw.arg:
pairs.append((kw.arg, self._name_of(kw.value)))
return pairs
if isinstance(node, ast.Name):
return []
return []
@staticmethod
def _name_of(v) -> str:
if isinstance(v, ast.Name):
return v.id
if isinstance(v, ast.Attribute):
return v.attr
return ""
def _collect_filters_from_filters_method(func: ast.FunctionDef) -> List[Tuple[str, str]]:
c = _FiltersCollector()
c.visit(func)
name_dicts: Dict[str, List[Tuple[str, str]]] = {}
returned_names: List[str] = []
for n in ast.walk(func):
if isinstance(n, ast.Assign):
if len(n.targets) == 1 and isinstance(n.targets[0], ast.Name):
tgt = n.targets[0].id
pairs = _FiltersCollector()._extract_mapping(n.value)
if pairs:
name_dicts.setdefault(tgt, []).extend(pairs)
elif isinstance(n, ast.Call):
if isinstance(n.func, ast.Attribute) and n.func.attr == "update":
obj = n.func.value
if isinstance(obj, ast.Name) and n.args:
add_pairs = _FiltersCollector()._extract_mapping(n.args[0])
if add_pairs:
name_dicts.setdefault(obj.id, []).extend(add_pairs)
elif isinstance(n, ast.Return) and isinstance(n.value, ast.Name):
returned_names.append(n.value.id)
for nm in returned_names:
for p in name_dicts.get(nm, []):
c.defs.append(p)
# dedupe
seen = set()
out: List[Tuple[str, str]] = []
for k, v in c.defs:
if (k, v) not in seen:
seen.add((k, v))
out.append((k, v))
return out
def collect_defined_filters() -> Set[str]:
defined: Set[str] = set()
for base in FILTER_PLUGIN_BASES:
for path in _iter_files(base, exts=(".py",)):
if not _is_filter_plugins_dir(path):
continue
code = _read(path)
if not code:
continue
try:
tree = ast.parse(code, filename=path)
except Exception:
continue
for node in tree.body:
if isinstance(node, ast.ClassDef) and node.name == "FilterModule":
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name == "filters":
for fname, _call in _collect_filters_from_filters_method(item):
defined.add(fname)
return defined
# ---------------------------
# Collect used filters (Jinja-only scanning with string stripping)
# ---------------------------
# Capture inner bodies of Jinja blocks
RE_JINJA_MUSTACHE = re.compile(r"\{\{(.*?)\}\}", re.DOTALL)
RE_JINJA_TAG = re.compile(r"\{%(.*?)%\}", re.DOTALL)
# Within a Jinja body, capture "| filter_name" (with args or not)
RE_PIPE_IN_BODY = re.compile(r"\|\s*([A-Za-z_]\w*)\b")
# Matches "{% filter filter_name %}"
RE_BLOCK_FILTER = re.compile(r"\{%\s*filter\s+([A-Za-z_]\w*)\b", re.DOTALL)
def _strip_quoted(text: str) -> str:
"""
Remove content inside single/double quotes to avoid false positives for pipes in strings,
e.g. lookup('pipe', "pacman ... | grep ... | awk ...") -> pipes are ignored.
"""
out = []
i = 0
n = len(text)
quote = None
while i < n:
ch = text[i]
if quote is None:
if ch in ("'", '"'):
quote = ch
i += 1
continue
out.append(ch)
i += 1
else:
# inside quotes; handle simple escapes \" and \'
if ch == "\\" and i + 1 < n:
i += 2
continue
if ch == quote:
quote = None
i += 1
return "".join(out)
def _extract_filters_from_jinja_body(body: str) -> Set[str]:
# Strip quoted strings first so pipes inside strings are ignored
body_no_str = _strip_quoted(body)
return {m.group(1) for m in RE_PIPE_IN_BODY.finditer(body_no_str)}
def collect_used_filters() -> Set[str]:
used: Set[str] = set()
for base in SEARCH_BASES:
for path in _iter_files(base, exts=USAGE_EXTS):
text = _read(path)
if not text:
continue
# 1) Filters used in {{ ... }} blocks
for m in RE_JINJA_MUSTACHE.finditer(text):
used |= _extract_filters_from_jinja_body(m.group(1))
# 2) Filters used in {% ... %} blocks (e.g., set, if, for)
for m in RE_JINJA_TAG.finditer(text):
used |= _extract_filters_from_jinja_body(m.group(1))
# 3) Block filter form: {% filter name %} ... {% endfilter %}
for m in RE_BLOCK_FILTER.finditer(text):
used.add(m.group(1))
return used
# ---------------------------
# Test
# ---------------------------
class TestAllUsedFiltersAreDefined(unittest.TestCase):
def test_all_used_filters_have_definitions(self):
defined = collect_defined_filters()
used = collect_used_filters()
# Remove built-ins and known-safe filters
candidates = sorted(used - BUILTIN_FILTERS)
# Unknown filters are those not defined locally
unknown = [f for f in candidates if f not in defined]
if unknown:
lines = [
"These filters are used in templates/YAML but have no local definition "
"(and are not in BUILTIN_FILTERS):"
]
for f in unknown:
lines.append(f"- " + f)
self.fail("\n".join(lines))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,260 @@
import ast
import os
import re
import unittest
from typing import Dict, List, Tuple, Optional
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
FILTER_PLUGIN_BASES = [
os.path.join(PROJECT_ROOT, "filter_plugins"),
os.path.join(PROJECT_ROOT, "roles"),
]
SEARCH_BASES = [PROJECT_ROOT]
SEARCH_EXTS = (".yml", ".yaml", ".j2", ".jinja2", ".tmpl", ".py")
def _iter_files(base: str, *, py_only: bool = False):
for root, _, files in os.walk(base):
for fn in files:
if py_only and not fn.endswith(".py"):
continue
if not py_only and not fn.endswith(SEARCH_EXTS):
continue
yield os.path.join(root, fn)
def _is_filter_plugins_dir(path: str) -> bool:
return "filter_plugins" in os.path.normpath(path).split(os.sep)
def _read(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
except Exception:
return ""
# ---------------------------
# Filter definition extraction
# ---------------------------
class _FiltersCollector(ast.NodeVisitor):
"""
Extract mappings returned by FilterModule.filters().
Handles:
return {'name': fn, "x": y}
d = {'name': fn}; d.update({...}); return d
return dict(name=fn, x=y)
"""
def __init__(self):
self.defs: List[Tuple[str, str]] = [] # (filter_name, callable_name)
def visit_Return(self, node: ast.Return):
mapping = self._extract_mapping(node.value)
for k, v in mapping:
self.defs.append((k, v))
def _extract_mapping(self, node) -> List[Tuple[str, str]]:
pairs: List[Tuple[str, str]] = []
# dict literal
if isinstance(node, ast.Dict):
for k, v in zip(node.keys, node.values):
key = k.value if isinstance(k, ast.Constant) and isinstance(k.value, str) else None
val = self._name_of(v)
if key:
pairs.append((key, val))
return pairs
# dict(...) call
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == "dict":
# keywords: dict(name=fn)
for kw in node.keywords or []:
if kw.arg:
pairs.append((kw.arg, self._name_of(kw.value)))
return pairs
# Name (variable) that might be a dict assembled earlier in the function
if isinstance(node, ast.Name):
# Fallback: we can't easily dataflow-resolve here; handled elsewhere by walking Assign/Call
return []
return []
@staticmethod
def _name_of(v) -> str:
if isinstance(v, ast.Name):
return v.id
if isinstance(v, ast.Attribute):
return v.attr # take right-most name
return ""
def _collect_filters_from_filters_method(func: ast.FunctionDef) -> List[Tuple[str, str]]:
"""
Walks the function to assemble any mapping that flows into the return.
We capture direct return dicts and also a common pattern:
d = {...}
d.update({...})
return d
"""
collector = _FiltersCollector()
collector.visit(func)
# additionally scan simple 'X = {...}' and 'X.update({...})' patterns,
# and if 'return X' occurs, merge those dicts.
name_dicts: Dict[str, List[Tuple[str, str]]] = {}
returns: List[str] = []
for n in ast.walk(func):
if isinstance(n, ast.Assign):
# X = { ... }
if len(n.targets) == 1 and isinstance(n.targets[0], ast.Name):
tgt = n.targets[0].id
pairs = _FiltersCollector()._extract_mapping(n.value)
if pairs:
name_dicts.setdefault(tgt, []).extend(pairs)
elif isinstance(n, ast.Call):
# X.update({ ... })
if isinstance(n.func, ast.Attribute) and n.func.attr == "update":
obj = n.func.value
if isinstance(obj, ast.Name):
add_pairs = _FiltersCollector()._extract_mapping(n.args[0] if n.args else None)
if add_pairs:
name_dicts.setdefault(obj.id, []).extend(add_pairs)
elif isinstance(n, ast.Return) and isinstance(n.value, ast.Name):
returns.append(n.value.id)
for rname in returns:
for p in name_dicts.get(rname, []):
collector.defs.append(p)
# dedupe
seen = set()
out: List[Tuple[str, str]] = []
for k, v in collector.defs:
if (k, v) not in seen:
seen.add((k, v))
out.append((k, v))
return out
def _ast_collect_filters_from_file(path: str) -> List[Tuple[str, str, str]]:
code = _read(path)
if not code:
return []
try:
tree = ast.parse(code, filename=path)
except Exception:
return []
results: List[Tuple[str, str, str]] = []
for node in tree.body:
if isinstance(node, ast.ClassDef) and node.name == "FilterModule":
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name == "filters":
for (fname, callname) in _collect_filters_from_filters_method(item):
results.append((fname, callname, path))
return results
def collect_defined_filters() -> List[Dict[str, str]]:
found: List[Dict[str, str]] = []
for base in FILTER_PLUGIN_BASES:
for path in _iter_files(base, py_only=True):
if not _is_filter_plugins_dir(path):
continue
for (filter_name, callable_name, fpath) in _ast_collect_filters_from_file(path):
found.append({"filter": filter_name, "callable": callable_name, "file": fpath})
return found
# ---------------------------
# Usage detection
# ---------------------------
def _compile_jinja_patterns(name: str) -> list[re.Pattern]:
"""
Build robust patterns that match Jinja filter usage without using '%' string formatting.
Handles:
- {{ ... | name }}
- {% ... | name %}
- {% filter name %}...{% endfilter %}
- bare YAML/Jinja like: when: x | name
"""
escaped = re.escape(name)
return [
re.compile(r"\{\{[^}]*\|\s*" + escaped + r"\b", re.DOTALL), # {{ ... | name }}
re.compile(r"\{%\s*[^%]*\|\s*" + escaped + r"\b", re.DOTALL), # {% ... | name %}
re.compile(r"\{%\s*filter\s+" + escaped + r"\b"), # {% filter name %}
re.compile(r"\|\s*" + escaped + r"\b"), # bare: when: x | name
]
def _python_call_pattern(callable_name: str) -> Optional[re.Pattern]:
if not callable_name:
return None
return re.compile(r"\b%s\s*\(" % re.escape(callable_name))
def search_usage(filter_name: str, callable_name: str, *, skip_file: str) -> tuple[bool, bool]:
"""
Search for filter usage.
Returns tuple:
(used_anywhere, used_outside_tests)
- used_anywhere: True if found in repo at all
- used_outside_tests: True if found outside tests/
"""
jinja_pats = _compile_jinja_patterns(filter_name)
py_pat = _python_call_pattern(callable_name)
used_anywhere = False
used_outside_tests = False
for base in SEARCH_BASES:
for path in _iter_files(base, py_only=False):
try:
if os.path.samefile(path, skip_file):
continue
except Exception:
pass
content = _read(path)
if not content:
continue
hit = False
for pat in jinja_pats:
if pat.search(content):
hit = True
break
if not hit and py_pat and path.endswith(".py") and py_pat.search(content):
hit = True
if hit:
used_anywhere = True
if "/tests/" not in path and not path.endswith("tests"):
used_outside_tests = True
return used_anywhere, used_outside_tests
class TestFilterDefinitionsAreUsed(unittest.TestCase):
def test_every_defined_filter_is_used(self):
definitions = collect_defined_filters()
if not definitions:
self.skipTest("No filters found under filter_plugins/.")
unused = []
for d in definitions:
f_name, c_name, f_path = d["filter"], d["callable"], d["file"]
used_any, used_outside = search_usage(f_name, c_name, skip_file=f_path)
if not used_any:
unused.append((f_name, c_name, f_path, "not used anywhere"))
elif not used_outside:
unused.append((f_name, c_name, f_path, "only used in tests"))
if unused:
msg = ["The following filters are invalidly unused:"]
for f, c, p, reason in sorted(unused):
msg.append(f"- '{f}' (callable '{c or 'unknown'}') defined in {p}{reason}")
self.fail("\n".join(msg))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,111 +0,0 @@
import os
import sys
import unittest
# Add the filter_plugins directory to the import path
dir_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), '../../../filter_plugins')
)
sys.path.insert(0, dir_path)
from ansible.errors import AnsibleFilterError
from alias_domains_map import FilterModule
class TestDomainFilters(unittest.TestCase):
def setUp(self):
self.filter_module = FilterModule()
# Sample primary domain
self.primary = 'example.com'
def test_alias_empty_apps(self):
apps = {}
expected = {}
result = self.filter_module.alias_domains_map(apps, self.primary)
self.assertEqual(result, expected)
def test_alias_without_aliases_and_no_canonical(self):
apps = {'app1': {}}
# canonical defaults to ['app1.example.com'], so alias should be []
expected = {'app1': []}
result = self.filter_module.alias_domains_map(apps, self.primary)
self.assertEqual(result, expected)
def test_alias_with_explicit_aliases(self):
apps = {
'app1': {
'server':{
'domains': {'aliases': ['alias.com']}
}
}
}
# canonical defaults to ['app1.example.com'], so alias should include alias.com and default
expected = {'app1': ['alias.com', 'app1.example.com']}
result = self.filter_module.alias_domains_map(apps, self.primary)
self.assertCountEqual(result['app1'], expected['app1'])
def test_alias_with_canonical_not_default(self):
apps = {
'app1': {
'server':{'domains': {'canonical': ['foo.com']}}
}
}
# foo.com is canonical, default not in canonical so added as alias
expected = {'app1': ['app1.example.com']}
result = self.filter_module.alias_domains_map(apps, self.primary)
self.assertEqual(result, expected)
def test_alias_with_existing_default(self):
apps = {
'app1': {
'server':{
'domains': {
'canonical': ['foo.com'],
'aliases': ['app1.example.com']
}
}
}
}
# default present in aliases, should not be duplicated
expected = {'app1': ['app1.example.com']}
result = self.filter_module.alias_domains_map(apps, self.primary)
self.assertEqual(result, expected)
def test_invalid_aliases_type(self):
apps = {
'app1': {'server':{'domains': {'aliases': 123}}}
}
with self.assertRaises(AnsibleFilterError):
self.filter_module.alias_domains_map(apps, self.primary)
def test_alias_with_empty_domains_cfg(self):
apps = {
'app1': {
'server':{
'domains': {}
}
}
}
expected = apps
result = self.filter_module.alias_domains_map(apps, self.primary)
self.assertEqual(result, expected)
def test_alias_with_canonical_dict_not_default(self):
apps = {
'app1': {
'server':{
'domains': {
'canonical': {
'one': 'one.com',
'two': 'two.com'
}
}
}
}
}
expected = {'app1': ['app1.example.com']}
result = self.filter_module.alias_domains_map(apps, self.primary)
self.assertEqual(result, expected)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,63 +0,0 @@
# tests/unit/filter_plugins/test_get_application_id.py
import unittest
import os
import tempfile
import shutil
import yaml
from ansible.errors import AnsibleFilterError
from filter_plugins.get_application_id import get_application_id
class TestGetApplicationIdFilter(unittest.TestCase):
def setUp(self):
# Create a temporary project directory and switch to it
self.tmpdir = tempfile.mkdtemp()
self.original_cwd = os.getcwd()
os.chdir(self.tmpdir)
# Create the roles/testrole/vars directory structure
self.role_name = 'testrole'
self.vars_dir = os.path.join('roles', self.role_name, 'vars')
os.makedirs(self.vars_dir)
self.vars_file = os.path.join(self.vars_dir, 'main.yml')
def tearDown(self):
# Return to original cwd and remove temp directory
os.chdir(self.original_cwd)
shutil.rmtree(self.tmpdir)
def write_vars_file(self, content):
with open(self.vars_file, 'w') as f:
yaml.dump(content, f)
def test_returns_application_id(self):
# Given a valid vars file with application_id
expected_id = '12345'
self.write_vars_file({'application_id': expected_id})
# When
result = get_application_id(self.role_name)
# Then
self.assertEqual(result, expected_id)
def test_file_not_found_raises_error(self):
# Given no vars file for a nonexistent role
with self.assertRaises(AnsibleFilterError) as cm:
get_application_id('nonexistent_role')
self.assertIn("Vars file not found", str(cm.exception))
def test_missing_key_raises_error(self):
# Given a vars file without application_id
self.write_vars_file({'other_key': 'value'})
with self.assertRaises(AnsibleFilterError) as cm:
get_application_id(self.role_name)
self.assertIn("Key 'application_id' not found", str(cm.exception))
def test_invalid_yaml_raises_error(self):
# Write invalid YAML content
with open(self.vars_file, 'w') as f:
f.write(":::not a yaml:::")
with self.assertRaises(AnsibleFilterError) as cm:
get_application_id(self.role_name)
self.assertIn("Error reading YAML", str(cm.exception))
if __name__ == '__main__':
unittest.main()

View File

@@ -1,122 +0,0 @@
import os
import sys
import unittest
from unittest.mock import patch, mock_open
from ansible.errors import AnsibleFilterError
# make sure our plugin is on PYTHONPATH
root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins'))
sys.path.insert(0, root)
import load_configuration
from load_configuration import FilterModule, _cfg_cache
class TestLoadConfigurationFilter(unittest.TestCase):
def setUp(self):
_cfg_cache.clear()
self.f = FilterModule().filters()['load_configuration']
self.app = 'html'
self.nested_cfg = {
'html': {
'features': {'matomo': True},
'server': {
'domains':{'canonical': ['html.example.com']}
}
}
}
self.flat_cfg = {
'features': {'matomo': False},
'server': {'domains':{'canonical': ['flat.example.com']}}
}
def test_invalid_key(self):
with self.assertRaises(AnsibleFilterError):
self.f(self.app, None)
@patch('load_configuration.os.path.isdir', return_value=False)
def test_no_roles_dir(self, _):
with self.assertRaises(AnsibleFilterError):
self.f(self.app, 'features.matomo')
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists', return_value=False)
def test_no_matching_role(self, *_):
self.assertIsNone(self.f(self.app, 'features.matomo'))
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_primary_missing_conf(self, mock_yaml, mock_file, mock_exists, *_):
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml')
mock_yaml.return_value = {'application_id': self.app}
with self.assertRaises(AnsibleFilterError):
self.f(self.app, 'features.matomo')
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_primary_and_cache(self, mock_yaml, mock_file, mock_exists, *_):
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml') or p.endswith('config/main.yml')
mock_yaml.side_effect = [
{'application_id': self.app}, # main.yml
self.nested_cfg # config/main.yml
]
# first load
self.assertTrue(self.f(self.app, 'features.matomo'))
self.assertIn(self.app, _cfg_cache)
mock_yaml.reset_mock()
# from cache
self.assertEqual(self.f(self.app, 'server.domains.canonical'),
['html.example.com'])
mock_yaml.assert_not_called()
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists', return_value=True)
@patch('load_configuration.open', mock_open(read_data="html: {}"))
@patch('load_configuration.yaml.safe_load', return_value={'html': {}})
def test_key_not_found_after_load(self, *_):
with self.assertRaises(AnsibleFilterError):
self.f(self.app, 'does.not.exist')
@patch('load_configuration.os.listdir', return_value=['r2'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_fallback_nested(self, mock_yaml, mock_file, mock_exists, *_):
mock_exists.side_effect = lambda p: p.endswith('config/main.yml')
mock_yaml.return_value = self.nested_cfg
# nested fallback must work
self.assertTrue(self.f(self.app, 'features.matomo'))
self.assertEqual(self.f(self.app, 'server.domains.canonical'),
['html.example.com'])
@patch('load_configuration.os.listdir', return_value=['r4'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_fallback_with_indexed_key(self, mock_yaml, mock_file, mock_exists, *_):
# Testing with an indexed key like domains.canonical[0]
mock_exists.side_effect = lambda p: p.endswith('config/main.yml')
mock_yaml.return_value = {
'file': {
'server': {
'domains':{
'canonical': ['files.example.com', 'extra.example.com']
}
}
}
}
# should get the first element of the canonical domains list
self.assertEqual(self.f('file', 'server.domains.canonical[0]'),
'files.example.com')
if __name__ == '__main__':
unittest.main()

View File

@@ -1,56 +0,0 @@
import unittest
import sys
import os
# Ensure filter_plugins directory is on the path
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins'))
)
from safe_join import safe_join
class TestSafeJoinFilter(unittest.TestCase):
def test_join_with_trailing_slashes(self):
self.assertEqual(
safe_join('http://example.com/', '/path/to'),
'http://example.com/path/to'
)
def test_join_without_slashes(self):
self.assertEqual(
safe_join('http://example.com', 'path/to'),
'http://example.com/path/to'
)
def test_base_none(self):
with self.assertRaises(ValueError):
safe_join(None, 'path')
def test_tail_none(self):
with self.assertRaises(ValueError):
safe_join('http://example.com', None)
def test_base_empty(self):
self.assertEqual(safe_join('', 'path'), '/path')
def test_tail_empty(self):
# joining with empty tail should yield base with trailing slash
self.assertEqual(
safe_join('http://example.com', ''),
'http://example.com/'
)
def test_numeric_base(self):
# numeric base is cast to string
self.assertEqual(safe_join(123, 'path'), '123/path')
def test_exception_in_str(self):
class Bad:
def __str__(self):
raise ValueError('bad')
# on exception, safe_join returns ''
self.assertEqual(safe_join(Bad(), 'x'), '')
if __name__ == '__main__':
unittest.main()

View File

@@ -1,59 +0,0 @@
import unittest
import sys
import os
# Ensure filter_plugins directory is on the path
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins'))
)
from safe import safe_placeholders
class TestSafePlaceholdersFilter(unittest.TestCase):
def test_simple_replacement(self):
template = "Hello, {user}!"
mapping = {'user': 'Alice'}
self.assertEqual(safe_placeholders(template, mapping), "Hello, Alice!")
def test_missing_placeholder(self):
template = "Hello, {user}!"
# Missing placeholder should raise KeyError
with self.assertRaises(KeyError):
safe_placeholders(template, {})
def test_none_template(self):
self.assertEqual(safe_placeholders(None, {'user': 'Alice'}), "")
def test_no_placeholders(self):
template = "Just a plain string"
mapping = {'any': 'value'}
self.assertEqual(safe_placeholders(template, mapping), "Just a plain string")
def test_multiple_placeholders(self):
template = "{greet}, {user}!"
mapping = {'greet': 'Hi', 'user': 'Bob'}
self.assertEqual(safe_placeholders(template, mapping), "Hi, Bob!")
def test_numeric_values(self):
template = "Count: {n}"
mapping = {'n': 0}
self.assertEqual(safe_placeholders(template, mapping), "Count: 0")
def test_extra_mapping_keys(self):
template = "Value: {a}"
mapping = {'a': '1', 'b': '2'}
self.assertEqual(safe_placeholders(template, mapping), "Value: 1")
def test_malformed_template(self):
# Unclosed placeholder should be caught and return empty string
template = "Unclosed {key"
mapping = {'key': 'value'}
self.assertEqual(safe_placeholders(template, mapping), "")
def test_mapping_none(self):
template = "Test {x}"
self.assertEqual(safe_placeholders(template, None), "")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,44 +0,0 @@
import unittest
import sys
import os
from jinja2 import Undefined
# Ensure filter_plugins directory is on the path
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../filter_plugins'))
)
from safe import FilterModule
class TestSafeVarFilter(unittest.TestCase):
def setUp(self):
# Retrieve the safe_var filter function
self.filter = FilterModule().filters()['safe_var']
def test_returns_non_empty_string(self):
self.assertEqual(self.filter('hello'), 'hello')
def test_returns_empty_string(self):
self.assertEqual(self.filter(''), '')
def test_returns_empty_for_none(self):
self.assertEqual(self.filter(None), '')
def test_returns_empty_for_jinja_undefined(self):
# Instantiate an Undefined without arguments
undef = Undefined()
self.assertEqual(self.filter(undef), '')
def test_returns_zero_for_zero(self):
# 0 is falsey but not None or Undefined, so safe_var returns it
self.assertEqual(self.filter(0), 0)
def test_returns_list_and_dict_unchanged(self):
data = {'key': 'value'}
self.assertEqual(self.filter(data), data)
lst = [1, 2, 3]
self.assertEqual(self.filter(lst), lst)
if __name__ == '__main__':
unittest.main()

View File

@@ -23,7 +23,6 @@ class TestDockerCardsLookup(unittest.TestCase):
os.makedirs(os.path.join(self.role_dir, "meta"))
os.makedirs(os.path.join(self.role_dir, "vars"))
# Create vars/main.yml so get_application_id() can find the application_id.
vars_main = os.path.join(self.role_dir, "vars", "main.yml")
with open(vars_main, "w", encoding="utf-8") as f:
f.write("application_id: portfolio\n")

View File

@@ -77,12 +77,6 @@ class SplitPostgresConnectionsTests(unittest.TestCase):
def test_registry_contains_filters(self):
registry = self.mod.FilterModule().filters()
self.assertIn("split_postgres_connections", registry)
self.assertIn("list_postgres_roles", registry)
def test_list_postgres_roles(self):
roles = self.mod.list_postgres_roles(self.roles_dir)
self.assertIsInstance(roles, list)
self.assertSetEqual(set(roles), {"app_a", "app_b"})
def test_split_postgres_connections_division(self):
# There are 2 postgres roles -> 200 / 2 = 100

View File

@@ -1,61 +0,0 @@
import unittest
import os
import sys
# Add the path to roles/sys-ctl-bkp-docker-2-loc/filter_plugins
CURRENT_DIR = os.path.dirname(__file__)
FILTER_PLUGIN_DIR = os.path.abspath(
os.path.join(CURRENT_DIR, '../../../../../roles/sys-ctl-bkp-docker-2-loc/filter_plugins')
)
sys.path.insert(0, FILTER_PLUGIN_DIR)
from dict_to_cli_args import dict_to_cli_args
class TestDictToCliArgs(unittest.TestCase):
def test_simple_string_args(self):
data = {"backup-dir": "/mnt/backups", "version-suffix": "-nightly"}
expected = "--backup-dir=/mnt/backups --version-suffix=-nightly"
self.assertEqual(dict_to_cli_args(data), expected)
def test_boolean_true(self):
data = {"shutdown": True, "everything": True}
expected = "--shutdown --everything"
self.assertEqual(dict_to_cli_args(data), expected)
def test_boolean_false(self):
data = {"shutdown": False, "everything": True}
expected = "--everything"
self.assertEqual(dict_to_cli_args(data), expected)
def test_list_argument(self):
data = {"ignore-volumes": ["redis", "memcached"]}
expected = '--ignore-volumes="redis memcached"'
self.assertEqual(dict_to_cli_args(data), expected)
def test_mixed_arguments(self):
data = {
"backup-dir": "/mnt/backups",
"shutdown": True,
"ignore-volumes": ["redis", "memcached"]
}
result = dict_to_cli_args(data)
self.assertIn("--backup-dir=/mnt/backups", result)
self.assertIn("--shutdown", result)
self.assertIn('--ignore-volumes="redis memcached"', result)
def test_empty_dict(self):
self.assertEqual(dict_to_cli_args({}), "")
def test_none_value(self):
data = {"some-value": None, "other": "yes"}
expected = "--other=yes"
self.assertEqual(dict_to_cli_args(data), expected)
def test_invalid_type(self):
with self.assertRaises(TypeError):
dict_to_cli_args(["not", "a", "dict"])
if __name__ == "__main__":
unittest.main()