mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-07-18 06:24:25 +02:00
Improved performance
This commit is contained in:
parent
882cf47c20
commit
9142eeba3c
@ -4,160 +4,152 @@ import unittest
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import yaml # requires PyYAML
|
import yaml # requires PyYAML
|
||||||
# Import the get_role filter directly
|
|
||||||
from filter_plugins.get_role import get_role
|
from filter_plugins.get_role import get_role
|
||||||
|
|
||||||
|
|
||||||
class TestGetAppConfPaths(unittest.TestCase):
|
class TestGetAppConfPaths(unittest.TestCase):
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
# Locate project root and load YAML configs
|
# Setup paths
|
||||||
root = Path(__file__).resolve().parents[2]
|
root = Path(__file__).resolve().parents[2]
|
||||||
cls.root = root
|
cls.root = root
|
||||||
cls.app_config_path = root / "group_vars" / "all" / "04_applications.yml"
|
cls.app_config_path = root / 'group_vars' / 'all' / '04_applications.yml'
|
||||||
with cls.app_config_path.open(encoding="utf-8") as f:
|
cls.user_config_path = root / 'group_vars' / 'all' / '03_users.yml'
|
||||||
app_cfg = yaml.safe_load(f)
|
|
||||||
cls.defaults_app = app_cfg.get("defaults_applications", {})
|
|
||||||
|
|
||||||
cls.user_config_path = root / "group_vars" / "all" / "03_users.yml"
|
# Load application defaults
|
||||||
with cls.user_config_path.open(encoding="utf-8") as f:
|
with cls.app_config_path.open(encoding='utf-8') as f:
|
||||||
user_cfg = yaml.safe_load(f)
|
app_cfg = yaml.safe_load(f) or {}
|
||||||
cls.defaults_users = user_cfg.get("default_users", {})
|
cls.defaults_app = app_cfg.get('defaults_applications', {})
|
||||||
|
|
||||||
# Regex to match get_app_conf(application_id, 'some.path', ...)
|
# Load default users
|
||||||
cls.pattern = re.compile(
|
with cls.user_config_path.open(encoding='utf-8') as f:
|
||||||
r"get_app_conf\(\s*([^\),]+)\s*,\s*['\"]([^'\"]+)['\"]\s*,\s*[^\)]*\)"
|
user_cfg = yaml.safe_load(f) or {}
|
||||||
)
|
cls.defaults_users = user_cfg.get('default_users', {})
|
||||||
|
|
||||||
# Store occurrences: literal_ids -> {path: [(file, line), ...]}, variable paths likewise
|
# Preload role schemas: map application_id -> schema dict
|
||||||
cls.literal_paths_by_id = {}
|
cls.role_schemas = {}
|
||||||
cls.variable_paths = {}
|
cls.role_for_app = {}
|
||||||
|
roles_path = str(root / 'roles')
|
||||||
|
for app_id in cls.defaults_app:
|
||||||
|
try:
|
||||||
|
role = get_role(app_id, roles_path)
|
||||||
|
cls.role_for_app[app_id] = role
|
||||||
|
schema_file = root / 'roles' / role / 'schema' / 'main.yml'
|
||||||
|
with schema_file.open(encoding='utf-8') as sf:
|
||||||
|
schema = yaml.safe_load(sf) or {}
|
||||||
|
cls.role_schemas[app_id] = schema
|
||||||
|
except Exception:
|
||||||
|
# skip apps without schema or role
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Regex to find get_app_conf calls
|
||||||
|
cls.pattern = re.compile(r"get_app_conf\(\s*([^\),]+)\s*,\s*['\"]([^'\"]+)['\"]")
|
||||||
|
|
||||||
|
# Scan files once
|
||||||
|
cls.literal_paths = {} # app_id -> {path: [(file,line)...]}
|
||||||
|
cls.variable_paths = {} # path -> [(file,line)...]
|
||||||
|
|
||||||
# Scan all files except tests/
|
|
||||||
for dirpath, dirs, files in os.walk(root):
|
for dirpath, dirs, files in os.walk(root):
|
||||||
if "tests" in Path(dirpath).parts:
|
# skip descending into the tests directory
|
||||||
|
if 'tests' in dirs:
|
||||||
|
dirs.remove('tests')
|
||||||
|
if 'tests' in Path(dirpath).parts:
|
||||||
continue
|
continue
|
||||||
for fname in files:
|
for fname in files:
|
||||||
file_path = Path(dirpath) / fname
|
file_path = Path(dirpath) / fname
|
||||||
try:
|
try:
|
||||||
text = file_path.read_text(encoding="utf-8")
|
text = file_path.read_text(encoding='utf-8')
|
||||||
except (UnicodeDecodeError, PermissionError):
|
except Exception:
|
||||||
continue
|
continue
|
||||||
for m in cls.pattern.finditer(text):
|
for m in cls.pattern.finditer(text):
|
||||||
lineno = text.count("\n", 0, m.start()) + 1
|
lineno = text.count('\n', 0, m.start()) + 1
|
||||||
app_arg = m.group(1).strip()
|
app_arg, path_arg = m.group(1).strip(), m.group(2).strip()
|
||||||
path_arg = m.group(2).strip()
|
if (app_arg.startswith("'") and app_arg.endswith("'")) or (app_arg.startswith('"') and app_arg.endswith('"')):
|
||||||
if (app_arg.startswith("'") and app_arg.endswith("'")) or (
|
|
||||||
app_arg.startswith('"') and app_arg.endswith('"')
|
|
||||||
):
|
|
||||||
app_id = app_arg.strip("'\"")
|
app_id = app_arg.strip("'\"")
|
||||||
cls.literal_paths_by_id.setdefault(app_id, {}).setdefault(path_arg, []).append(
|
cls.literal_paths.setdefault(app_id, {}).setdefault(path_arg, []).append((file_path, lineno))
|
||||||
(file_path, lineno)
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
cls.variable_paths.setdefault(path_arg, []).append(
|
cls.variable_paths.setdefault(path_arg, []).append((file_path, lineno))
|
||||||
(file_path, lineno)
|
|
||||||
)
|
|
||||||
|
|
||||||
def _assert_nested_key(self, mapping, dotted_path, context):
|
def assertNested(self, mapping, dotted, context):
|
||||||
keys = dotted_path.split('.')
|
keys = dotted.split('.')
|
||||||
current = mapping
|
cur = mapping
|
||||||
for key in keys:
|
for k in keys:
|
||||||
self.assertIsInstance(current, dict,
|
assert isinstance(cur, dict), f"{context}: expected dict at {k}"
|
||||||
f"Expected dict at '{'.'.join(keys[:keys.index(key)])}' in {context}")
|
assert k in cur, f"{context}: missing '{k}' in '{dotted}'"
|
||||||
self.assertIn(key, current,
|
cur = cur[k]
|
||||||
f"Missing '{key}' in path '{dotted_path}' under {context}")
|
|
||||||
current = current[key]
|
|
||||||
|
|
||||||
def _validate_credentials(self, application_id, key, occs):
|
|
||||||
# Delegate to get_role filter to find role name
|
|
||||||
role = get_role(application_id, roles_path=str(self.root / 'roles'))
|
|
||||||
schema_file = self.root / 'roles' / role / 'schema' / 'main.yml'
|
|
||||||
self.assertTrue(schema_file.is_file(), f"Schema file not found: {schema_file}")
|
|
||||||
with schema_file.open(encoding="utf-8") as sf:
|
|
||||||
schema = yaml.safe_load(sf) or {}
|
|
||||||
creds = schema.get('credentials')
|
|
||||||
self.assertIsInstance(creds, dict,
|
|
||||||
f"'credentials' missing or not dict in {schema_file}")
|
|
||||||
self.assertIn(key, creds,
|
|
||||||
f"Missing credential '{key}' in {schema_file} for role '{role}'")
|
|
||||||
|
|
||||||
def _validate_path(self, app_id, dotted_path, occs):
|
|
||||||
entry_app = self.defaults_app.get(app_id, {})
|
|
||||||
try:
|
|
||||||
self._assert_nested_key(entry_app, dotted_path, app_id)
|
|
||||||
return
|
|
||||||
except AssertionError:
|
|
||||||
# users.* fallback
|
|
||||||
if dotted_path.startswith('users.'):
|
|
||||||
subpath = dotted_path.split('.', 1)[1]
|
|
||||||
try:
|
|
||||||
self._assert_nested_key(self.defaults_users, subpath, 'default_users')
|
|
||||||
return
|
|
||||||
except AssertionError:
|
|
||||||
pass
|
|
||||||
# credentials.* fallback via get_role
|
|
||||||
if dotted_path.startswith('credentials.'):
|
|
||||||
key = dotted_path.split('.', 1)[1]
|
|
||||||
self._validate_credentials(app_id, key, occs)
|
|
||||||
return
|
|
||||||
# images.* only require top-level images dict
|
|
||||||
if dotted_path.startswith('images.'):
|
|
||||||
if any('images' in cfg and isinstance(cfg['images'], dict)
|
|
||||||
for cfg in self.defaults_app.values()):
|
|
||||||
return
|
|
||||||
file_path, lineno = occs[0]
|
|
||||||
self.fail(f"'{dotted_path}' not found for '{app_id}'; called at {file_path}:{lineno}")
|
|
||||||
|
|
||||||
def test_literal_paths(self):
|
def test_literal_paths(self):
|
||||||
for app_id, paths in sorted(self.literal_paths_by_id.items()):
|
# Check each literal path exists
|
||||||
with self.subTest(app_id=app_id):
|
for app_id, paths in self.literal_paths.items():
|
||||||
self.assertIn(app_id, self.defaults_app,
|
with self.subTest(app=app_id):
|
||||||
f"App '{app_id}' not in defaults_applications")
|
self.assertIn(app_id, self.defaults_app, f"App '{app_id}' missing in defaults_applications")
|
||||||
for dotted_path, occs in sorted(paths.items()):
|
for dotted, occs in paths.items():
|
||||||
with self.subTest(path=dotted_path):
|
with self.subTest(path=dotted):
|
||||||
self._validate_path(app_id, dotted_path, occs)
|
self._validate(app_id, dotted, occs)
|
||||||
|
|
||||||
def test_variable_paths(self):
|
def test_variable_paths(self):
|
||||||
|
# dynamic paths: must exist somewhere
|
||||||
if not self.variable_paths:
|
if not self.variable_paths:
|
||||||
self.skipTest("No dynamic calls found.")
|
self.skipTest('No dynamic get_app_conf calls')
|
||||||
for dotted_path, occs in sorted(self.variable_paths.items()):
|
for dotted, occs in self.variable_paths.items():
|
||||||
with self.subTest(path=dotted_path):
|
with self.subTest(path=dotted):
|
||||||
valid = False
|
found = False
|
||||||
# credentials.* dynamic
|
# credentials.*: check preloaded schemas
|
||||||
if dotted_path.startswith('credentials.'):
|
if dotted.startswith('credentials.'):
|
||||||
key = dotted_path.split('.',1)[1]
|
key = dotted.split('.',1)[1]
|
||||||
for aid in self.defaults_app:
|
for aid, schema in self.role_schemas.items():
|
||||||
try:
|
creds = schema.get('credentials', {})
|
||||||
self._validate_credentials(aid, key, occs)
|
if isinstance(creds, dict) and key in creds:
|
||||||
valid = True; break
|
found = True; break
|
||||||
except AssertionError:
|
if found: continue
|
||||||
pass
|
# images.*: any images dict
|
||||||
if valid: continue
|
if dotted.startswith('images.'):
|
||||||
# images.* dynamic
|
if any(isinstance(cfg.get('images'), dict) for cfg in self.defaults_app.values()):
|
||||||
if dotted_path.startswith('images.'):
|
|
||||||
if any('images' in cfg and isinstance(cfg['images'], dict)
|
|
||||||
for cfg in self.defaults_app.values()):
|
|
||||||
continue
|
continue
|
||||||
# check app defaults
|
# users.*: default_users fallback
|
||||||
|
if dotted.startswith('users.'):
|
||||||
|
sub = dotted.split('.',1)[1]
|
||||||
|
if sub in self.defaults_users:
|
||||||
|
continue
|
||||||
|
# application defaults
|
||||||
for aid, cfg in self.defaults_app.items():
|
for aid, cfg in self.defaults_app.items():
|
||||||
try:
|
try:
|
||||||
self._assert_nested_key(cfg, dotted_path, aid)
|
self.assertNested(cfg, dotted, aid)
|
||||||
valid = True; break
|
found = True; break
|
||||||
except AssertionError:
|
except AssertionError:
|
||||||
pass
|
pass
|
||||||
# users.* fallback
|
if not found:
|
||||||
if not valid and dotted_path.startswith('users.'):
|
|
||||||
subpath = dotted_path.split('.',1)[1]
|
|
||||||
try:
|
|
||||||
self._assert_nested_key(self.defaults_users, subpath, 'default_users')
|
|
||||||
valid = True
|
|
||||||
except AssertionError:
|
|
||||||
pass
|
|
||||||
if not valid:
|
|
||||||
file_path, lineno = occs[0]
|
file_path, lineno = occs[0]
|
||||||
self.fail(f"No entry for '{dotted_path}'; called at {file_path}:{lineno}")
|
self.fail(f"No entry for '{dotted}'; called at {file_path}:{lineno}")
|
||||||
|
|
||||||
|
def _validate(self, app_id, dotted, occs):
|
||||||
|
# try app defaults
|
||||||
|
cfg = self.defaults_app.get(app_id, {})
|
||||||
|
try:
|
||||||
|
self.assertNested(cfg, dotted, app_id)
|
||||||
|
return
|
||||||
|
except AssertionError:
|
||||||
|
pass
|
||||||
|
# users.* fallback
|
||||||
|
if dotted.startswith('users.'):
|
||||||
|
sub = dotted.split('.',1)[1]
|
||||||
|
if sub in self.defaults_users:
|
||||||
|
return
|
||||||
|
# credentials.* fallback
|
||||||
|
if dotted.startswith('credentials.'):
|
||||||
|
key = dotted.split('.',1)[1]
|
||||||
|
schema = self.role_schemas.get(app_id, {})
|
||||||
|
creds = schema.get('credentials', {})
|
||||||
|
self.assertIn(key, creds, f"Credential '{key}' missing for app '{app_id}'")
|
||||||
|
return
|
||||||
|
# images.* fallback
|
||||||
|
if dotted.startswith('images.'):
|
||||||
|
if isinstance(cfg.get('images'), dict):
|
||||||
|
return
|
||||||
|
# final fail
|
||||||
|
file_path, lineno = occs[0]
|
||||||
|
self.fail(f"'{dotted}' not found for '{app_id}'; called at {file_path}:{lineno}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user