Big restructuring

This commit is contained in:
2025-05-20 00:13:45 +02:00
parent efe994a4c5
commit f748f9cef1
44 changed files with 697 additions and 469 deletions

View File

@@ -0,0 +1,83 @@
import os
import sys
import unittest
from unittest.mock import patch, mock_open
from ansible.errors import AnsibleFilterError
# ensure filter_plugins is on the path
dir_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), '../../filter_plugins')
)
sys.path.insert(0, dir_path)
from applications_if_group_and_deps import FilterModule
class TestApplicationsIfGroupAndDeps(unittest.TestCase):
def setUp(self):
self.filter = FilterModule()
# minimal applications dict
self.apps = {
'app1': {'foo': 'bar'},
'app2': {'baz': 'qux'},
'roleA': {'some': 'cfg'},
}
def test_invalid_inputs(self):
with self.assertRaises(AnsibleFilterError):
self.filter.applications_if_group_and_deps('not a dict', [])
with self.assertRaises(AnsibleFilterError):
self.filter.applications_if_group_and_deps({}, 'not a list')
def test_direct_inclusion(self):
# if an app key is directly in group_names it should be returned
groups = ['app1', 'unrelated']
result = self.filter.applications_if_group_and_deps(self.apps, groups)
self.assertEqual(set(result.keys()), {'app1'})
@patch('applications_if_group_and_deps.yaml.safe_load')
@patch('applications_if_group_and_deps.open', new_callable=mock_open)
@patch('applications_if_group_and_deps.os.path.isfile')
def test_indirect_inclusion_via_dependencies(self, mock_isfile, mock_file, mock_yaml):
"""
Simulate that group 'groupX' has a dependency on 'roleA', and that
roleA's vars/main.yml contains application_id: 'roleA'.
Then passing group_names=['groupX'] should include 'roleA'.
"""
# pretend both meta/main.yml and vars/main.yml exist
mock_isfile.return_value = True
# safe_load() calls:
# 1) groupX/meta/main.yml → dependencies ['roleA']
# 2) roleA/meta/main.yml → dependencies []
# 3) roleA/vars/main.yml → application_id 'roleA'
mock_yaml.side_effect = [
{'dependencies': ['roleA']},
{'dependencies': []},
{'application_id': 'roleA'}
]
result = self.filter.applications_if_group_and_deps(self.apps, ['groupX'])
self.assertEqual(set(result.keys()), {'roleA'})
@patch('applications_if_group_and_deps.yaml.safe_load')
@patch('applications_if_group_and_deps.open', new_callable=mock_open)
@patch('applications_if_group_and_deps.os.path.isfile')
def test_no_vars_file(self, mock_isfile, mock_file, mock_yaml):
"""
If a meta/main.yml dependency exists but vars/main.yml is missing,
that role won't contribute an application_id, so nothing is returned.
"""
# meta exists, vars does not
def isfile_side(path):
return path.endswith('meta/main.yml')
mock_isfile.side_effect = isfile_side
# meta declares dependency
mock_yaml.return_value = {'dependencies': ['roleA']}
result = self.filter.applications_if_group_and_deps(self.apps, ['groupX'])
self.assertEqual(result, {})
if __name__ == '__main__':
unittest.main()

View File

@@ -5,44 +5,66 @@ import os
# Ensure filter_plugins directory is on the path
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
os.path.abspath(os.path.join(os.path.dirname(__file__), '../filter_plugins'))
)
from generate_base_sld_domains import FilterModule
from ansible.errors import AnsibleFilterError
class TestGenerateBaseSldDomains(unittest.TestCase):
def setUp(self):
self.filter = FilterModule().generate_base_sld_domains
def test_simple_string_and_redirect(self):
domains = {'app': 'sub.example.com'}
redirects = [{'source': 'alias.example.com'}]
result = self.filter(domains, redirects)
def test_simple_list(self):
domains = [
'sub.example.com',
'alias.example.com',
'example.com'
]
result = self.filter(domains)
self.assertEqual(result, ['example.com'])
def test_without_redirect_mappings(self):
domains = {
'a': 'a.co',
'b': ['b.co', 'sub.c.co'],
'c': {'x': 'x.co'}
}
result = self.filter(domains, None)
self.assertEqual(result, ['a.co', 'b.co', 'c.co', 'x.co'])
def test_mixed_tlds_and_subdomains(self):
domains = [
'a.co', 'b.co', 'sub.b.co', 'x.co', 'www.x.co'
]
result = self.filter(domains)
self.assertEqual(result, ['a.co', 'b.co', 'x.co'])
def test_redirect_list_sources(self):
domains = {'app': 'app.domain.org'}
redirects = [{'source': ['alias.domain.org', 'deep.sub.example.net']}]
result = self.filter(domains, redirects)
self.assertEqual(result, ['domain.org', 'example.net'])
def test_invalid_non_string_raise(self):
for bad in [42, None]:
with self.assertRaises(AnsibleFilterError):
self.filter([bad])
def test_duplicate_entries_and_sorting(self):
domains = {
'x': ['one.com', 'sub.one.com'],
'y': 'two.com',
'z': {'k': 'one.com'}
}
redirects = [{'source': 'deep.two.com'}]
result = self.filter(domains, redirects)
def test_localhost_allowed(self):
domains = ['localhost']
result = self.filter(domains)
self.assertEqual(result, ['localhost'])
def test_ip_raises(self):
with self.assertRaises(AnsibleFilterError):
self.filter(['127.0.0.1'])
def test_nested_subdomains(self):
domains = ['sub.sub2.one']
result = self.filter(domains)
self.assertEqual(result, ['sub2.one'])
def test_deeply_nested_subdomains(self):
domains = ['sub3.sub2.sub1.one']
result = self.filter(domains)
self.assertEqual(result, ['sub1.one'])
def test_empty_and_malformed_raise(self):
for bad in ['', '.', '...']:
with self.assertRaises(AnsibleFilterError):
self.filter([bad])
def test_sorting_and_duplicates(self):
domains = [
'one.com', 'sub.one.com', 'two.com', 'deep.two.com', 'one.com'
]
result = self.filter(domains)
self.assertEqual(result, ['one.com', 'two.com'])
if __name__ == '__main__':

View File

@@ -1,51 +0,0 @@
import unittest
from filter_plugins.group_domain_filters import FilterModule
class TestAddDomainIfGroup(unittest.TestCase):
def setUp(self):
self.filter = FilterModule().filters()["add_domain_if_group"]
def test_add_string_value(self):
result = self.filter({}, "akaunting", "accounting.example.org", ["akaunting"])
self.assertEqual(result, {"akaunting": "accounting.example.org"})
def test_add_list_value(self):
result = self.filter({}, "mastodon", ["microblog.example.org"], ["mastodon"])
self.assertEqual(result, {"mastodon": ["microblog.example.org"]})
def test_add_dict_value(self):
result = self.filter({}, "bluesky", {"web": "bskyweb.example.org", "api": "bluesky.example.org"}, ["bluesky"])
self.assertEqual(result, {"bluesky": {"web": "bskyweb.example.org", "api": "bluesky.example.org"}})
def test_ignore_if_not_in_group(self):
result = self.filter({}, "akaunting", "accounting.example.org", ["wordpress"])
self.assertEqual(result, {})
def test_merge_with_existing(self):
initial = {"wordpress": ["blog.example.org"]}
result = self.filter(initial, "akaunting", "accounting.example.org", ["akaunting"])
self.assertEqual(result, {
"wordpress": ["blog.example.org"],
"akaunting": "accounting.example.org"
})
def test_dict_is_not_mutated(self):
base = {"keycloak": "auth.example.org"}
copy = dict(base) # make a copy for comparison
_ = self.filter(base, "akaunting", "accounting.example.org", ["akaunting"])
self.assertEqual(base, copy) # original must stay unchanged
def test_multiple_adds_accumulate(self):
result = {}
result = self.filter(result, "akaunting", "accounting.example.org", ["akaunting", "wordpress"])
result = self.filter(result, "wordpress", ["blog.example.org"], ["akaunting", "wordpress"])
result = self.filter(result, "bluesky", {"web": "bskyweb.example.org", "api": "bluesky.example.org"}, ["bluesky"])
self.assertEqual(result, {
"akaunting": "accounting.example.org",
"wordpress": ["blog.example.org"],
"bluesky": {"web": "bskyweb.example.org", "api": "bluesky.example.org"},
})
if __name__ == "__main__":
unittest.main()

View File

@@ -1,79 +0,0 @@
import os
import tempfile
import shutil
import yaml
import unittest
# Import the filter module
import filter_plugins.group_domain_filters as gdf_module
class TestAddDomainIfGroupRecursive(unittest.TestCase):
def setUp(self):
# Create a temporary project structure
self.tempdir = tempfile.mkdtemp()
fp_dir = os.path.join(self.tempdir, 'filter_plugins')
roles_dir = os.path.join(self.tempdir, 'roles')
os.makedirs(fp_dir, exist_ok=True)
os.makedirs(roles_dir, exist_ok=True)
# Point module __file__ so plugin_dir resolves correctly
gdf_module.__file__ = os.path.join(fp_dir, 'group_domain_filters.py')
self.roles_dir = roles_dir
def tearDown(self):
shutil.rmtree(self.tempdir)
def write_role(self, role_name, dependencies, application_id):
"""
Helper: write a role directory with meta/main.yml and vars/main.yml
"""
meta_dir = os.path.join(self.roles_dir, role_name, 'meta')
vars_dir = os.path.join(self.roles_dir, role_name, 'vars')
os.makedirs(meta_dir, exist_ok=True)
os.makedirs(vars_dir, exist_ok=True)
# Write meta/main.yml
with open(os.path.join(meta_dir, 'main.yml'), 'w') as f:
yaml.safe_dump({'dependencies': dependencies}, f)
# Write vars/main.yml
with open(os.path.join(vars_dir, 'main.yml'), 'w') as f:
yaml.safe_dump({'application_id': application_id}, f)
def test_direct_application_id_in_group_names(self):
# If domain_key (application_id) is directly in group_names
result = gdf_module.FilterModule.add_domain_if_group({}, 'app1', 'domain1', ['app1'])
self.assertEqual(result, {'app1': 'domain1'})
def test_indirect_dependency_application_id(self):
# roleA depends on roleB; roleB has application_id 'appB'
self.write_role('roleA', ['roleB'], 'appA')
self.write_role('roleB', [], 'appB')
# group_names includes roleA, so appB should be reachable
result = gdf_module.FilterModule.add_domain_if_group({}, 'appB', 'domainB', ['roleA'])
self.assertEqual(result, {'appB': 'domainB'})
def test_multi_level_dependency_application_id(self):
# roleX -> roleY -> roleZ; roleZ id is 'appZ'
self.write_role('roleX', ['roleY'], 'appX')
self.write_role('roleY', ['roleZ'], 'appY')
self.write_role('roleZ', [], 'appZ')
# Starting from roleX, appZ reachable
result = gdf_module.FilterModule.add_domain_if_group({}, 'appZ', 'domainZ', ['roleX'])
self.assertEqual(result, {'appZ': 'domainZ'})
def test_domain_key_for_parent_role(self):
# roleParent has app 'appP', and depends on roleChild('appC')
self.write_role('roleParent', ['roleChild'], 'appP')
self.write_role('roleChild', [], 'appC')
# Even appP reachable via deps of roleParent (including itself)
result = gdf_module.FilterModule.add_domain_if_group({}, 'appP', 'domainP', ['roleParent'])
self.assertEqual(result, {'appP': 'domainP'})
def test_no_inclusion_for_unrelated(self):
# Unrelated roles
self.write_role('roleC', ['roleD'], 'appC')
self.write_role('roleD', [], 'appD')
# group_names does not include 'roleC' or 'roleD'
result = gdf_module.FilterModule.add_domain_if_group({}, 'appC', 'domainC', ['otherRole'])
self.assertEqual(result, {})
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,118 @@
import os
import sys
import unittest
from unittest.mock import patch, mock_open
from ansible.errors import AnsibleFilterError
# make sure our plugin is on PYTHONPATH
root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../filter_plugins'))
sys.path.insert(0, root)
import load_configuration
from load_configuration import FilterModule, _cfg_cache
class TestLoadConfigurationFilter(unittest.TestCase):
def setUp(self):
_cfg_cache.clear()
self.f = FilterModule().filters()['load_configuration']
self.app = 'html_server'
self.nested_cfg = {
'html_server': {
'features': {'matomo': True},
'domains': {'canonical': ['html.example.com']}
}
}
self.flat_cfg = {
'features': {'matomo': False},
'domains': {'canonical': ['flat.example.com']}
}
def test_invalid_key(self):
with self.assertRaises(AnsibleFilterError):
self.f(self.app, None)
@patch('load_configuration.os.path.isdir', return_value=False)
def test_no_roles_dir(self, _):
with self.assertRaises(AnsibleFilterError):
self.f(self.app, 'features.matomo')
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists', return_value=False)
def test_no_matching_role(self, *_):
self.assertIsNone(self.f(self.app, 'features.matomo'))
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_primary_missing_conf(self, mock_yaml, mock_file, mock_exists, *_):
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml')
mock_yaml.return_value = {'application_id': self.app}
with self.assertRaises(AnsibleFilterError):
self.f(self.app, 'features.matomo')
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_primary_and_cache(self, mock_yaml, mock_file, mock_exists, *_):
mock_exists.side_effect = lambda p: p.endswith('vars/main.yml') or p.endswith('vars/configuration.yml')
mock_yaml.side_effect = [
{'application_id': self.app}, # main.yml
self.nested_cfg # configuration.yml
]
# first load
self.assertTrue(self.f(self.app, 'features.matomo'))
self.assertIn(self.app, _cfg_cache)
mock_yaml.reset_mock()
# from cache
self.assertEqual(self.f(self.app, 'domains.canonical'),
['html.example.com'])
mock_yaml.assert_not_called()
@patch('load_configuration.os.listdir', return_value=['r1'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists', return_value=True)
@patch('load_configuration.open', mock_open(read_data="html_server: {}"))
@patch('load_configuration.yaml.safe_load', return_value={'html_server': {}})
def test_key_not_found_after_load(self, *_):
with self.assertRaises(AnsibleFilterError):
self.f(self.app, 'does.not.exist')
@patch('load_configuration.os.listdir', return_value=['r2'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_fallback_nested(self, mock_yaml, mock_file, mock_exists, *_):
mock_exists.side_effect = lambda p: p.endswith('vars/configuration.yml')
mock_yaml.return_value = self.nested_cfg
# nested fallback must work
self.assertTrue(self.f(self.app, 'features.matomo'))
self.assertEqual(self.f(self.app, 'domains.canonical'),
['html.example.com'])
@patch('load_configuration.os.listdir', return_value=['r4'])
@patch('load_configuration.os.path.isdir', return_value=True)
@patch('load_configuration.os.path.exists')
@patch('load_configuration.open', new_callable=mock_open)
@patch('load_configuration.yaml.safe_load')
def test_fallback_with_indexed_key(self, mock_yaml, mock_file, mock_exists, *_):
# Testing with an indexed key like domains.canonical[0]
mock_exists.side_effect = lambda p: p.endswith('vars/configuration.yml')
mock_yaml.return_value = {
'file-server': {
'domains': {
'canonical': ['files.example.com', 'extra.example.com']
}
}
}
# should get the first element of the canonical domains list
self.assertEqual(self.f('file-server', 'domains.canonical[0]'),
'files.example.com')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,45 @@
import unittest
from filter_plugins.merge_mapping import merge_mapping
from ansible.errors import AnsibleFilterError
class TestMergeMappingFilter(unittest.TestCase):
def test_basic_merge_overwrites_and_adds(self):
list1 = [
{'source': 'a', 'target': 1},
{'source': 'b', 'target': 2},
]
list2 = [
{'source': 'b', 'target': 3},
{'source': 'c', 'target': 4},
]
result = merge_mapping(list1, list2, 'source')
result_dict = {item['source']: item['target'] for item in result}
self.assertEqual(result_dict, {'a': 1, 'b': 3, 'c': 4})
def test_merge_preserves_and_overwrites_fields(self):
list1 = [{'source': 'x', 'value': 100, 'flag': True}]
list2 = [{'source': 'x', 'value': 200, 'note': 'updated'}]
result = merge_mapping(list1, list2, 'source')
self.assertEqual(len(result), 1)
merged = result[0]
self.assertEqual(merged['value'], 200)
self.assertTrue(merged['flag'])
self.assertEqual(merged['note'], 'updated')
def test_empty_lists_return_empty(self):
self.assertEqual(merge_mapping([], [], 'source'), [])
def test_missing_key_raises_error(self):
list1 = [{'target': 'no_source'}]
list2 = []
with self.assertRaises(AnsibleFilterError):
merge_mapping(list1, list2, 'source')
def test_non_list_inputs_raise_error(self):
with self.assertRaises(AnsibleFilterError):
merge_mapping("not a list", [], 'source')
with self.assertRaises(AnsibleFilterError):
merge_mapping([], "not a list", 'source')
if __name__ == '__main__':
unittest.main()

View File

@@ -1,57 +0,0 @@
import os
import sys
import unittest
sys.path.insert(
0,
os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../")
),
)
from filter_plugins.redirect_filters import FilterModule
class TestAddRedirectIfGroup(unittest.TestCase):
"""Unit-tests for the add_redirect_if_group filter."""
def setUp(self):
# Obtain the callable once for reuse
self.add_redirect = FilterModule().filters()["add_redirect_if_group"]
def test_appends_redirect_when_group_present(self):
original = [{"source": "a", "target": "b"}]
result = self.add_redirect(
original,
group="lam",
source="ldap.example.com",
target="lam.example.com",
group_names=["lam", "other"],
)
# Original list must stay unchanged
self.assertEqual(len(original), 1)
# Result list must contain the extra entry
self.assertEqual(len(result), 2)
self.assertIn(
{"source": "ldap.example.com", "target": "lam.example.com"}, result
)
def test_keeps_list_unchanged_when_group_absent(self):
original = [{"source": "a", "target": "b"}]
result = self.add_redirect(
original,
group="lam",
source="ldap.example.com",
target="lam.example.com",
group_names=["unrelated"],
)
# No new entries
self.assertEqual(result, original)
# But ensure a new list object was returned (no in-place mutation)
self.assertIsNot(result, original)
if __name__ == "__main__":
unittest.main()