diff --git a/tests/integration/test_csp_configuration_consistency.py b/tests/integration/test_csp_configuration_consistency.py index a3a0fca0..f16b288d 100644 --- a/tests/integration/test_csp_configuration_consistency.py +++ b/tests/integration/test_csp_configuration_consistency.py @@ -1,51 +1,63 @@ +# tests/integration/test_csp_configuration_consistency.py import unittest import yaml from pathlib import Path from urllib.parse import urlparse + class TestCspConfigurationConsistency(unittest.TestCase): + """ + Iterate all roles; for each config/main.yml that defines 'server.csp', + assert consistent structure and values: + - csp is a dict + - whitelist/flags/hashes are dicts if present + - directives used are supported + - flags are dicts of {flag_name: bool}, flag_name in SUPPORTED_FLAGS + - whitelist entries are valid URLs/schemes/Jinja-or '*' + - hashes entries are str or list[str], non-empty + On error, include role name and file path for easier debugging. + """ + SUPPORTED_DIRECTIVES = { - 'default-src', - 'connect-src', - 'frame-ancestors', - 'frame-src', - 'script-src', - 'script-src-elem', - 'style-src', - 'style-src-elem', - 'font-src', - 'worker-src', - 'manifest-src', - 'media-src' + "default-src", + "connect-src", + "frame-ancestors", + "frame-src", + "script-src", + "script-src-elem", + "style-src", + "style-src-elem", + "font-src", + "worker-src", + "manifest-src", + "media-src", } - SUPPORTED_FLAGS = {'unsafe-eval', 'unsafe-inline'} + + SUPPORTED_FLAGS = {"unsafe-eval", "unsafe-inline"} def is_valid_whitelist_entry(self, entry: str) -> bool: """ Accept entries that are: - Jinja expressions (contain '{{' and '}}') + - '*' wildcard - Data or Blob URIs (start with 'data:' or 'blob:') - - HTTP/HTTPS/WS/WSS URLs + - HTTP/HTTPS/WS/WSS URLs (with netloc) """ - if '{{' in entry and '}}' in entry: + if not isinstance(entry, str): + return False + e = entry.strip() + if not e: + return False + if "{{" in e and "}}" in e: return True - if entry.startswith(('data:', 'blob:')): + if e == "*": return True - if entry == '*': + if e.startswith(("data:", "blob:")): return True - parsed = urlparse(entry) - return parsed.scheme in ('http', 'https','ws', 'wss') and bool(parsed.netloc) + parsed = urlparse(e) + return parsed.scheme in ("http", "https", "ws", "wss") and bool(parsed.netloc) def test_csp_configuration_structure(self): - """ - Iterate all roles; for each config/main.yml that defines 'csp', - assert that: - - csp is a dict - - its whitelist/flags/hashes keys only use supported directives - - flags for each directive are a dict of {flag_name: bool}, with flag_name in SUPPORTED_FLAGS - - whitelist entries are valid as per is_valid_whitelist_entry - - hashes entries are str or list of non-empty str - """ roles_dir = Path(__file__).resolve().parent.parent.parent / "roles" errors = [] @@ -57,70 +69,115 @@ class TestCspConfigurationConsistency(unittest.TestCase): if not cfg_file.exists(): continue + # Parse YAML (collect role + file path on error) try: cfg = yaml.safe_load(cfg_file.read_text(encoding="utf-8")) or {} except yaml.YAMLError as e: - errors.append(f"{role_path.name}: YAML parse error: {e}") + errors.append(f"{role_path.name}: YAML parse error in {cfg_file}: {e}") continue - csp = cfg.get('server',{}).get('csp') + csp = cfg.get("server", {}).get("csp") if csp is None: - continue # nothing to check + continue # No CSP section, nothing to check if not isinstance(csp, dict): - errors.append(f"{role_path.name}: 'csp' must be a dict") + errors.append(f"{role_path.name}: 'server.csp' must be a dict (found {type(csp).__name__}) in {cfg_file}") + # Can't proceed safely with sub-sections continue - # Ensure sub-sections are dicts - for section in ('whitelist', 'flags', 'hashes'): - if section in csp and not isinstance(csp[section], dict): - errors.append(f"{role_path.name}: csp.{section} must be a dict") + # ---------- Validate whitelist ---------- + wl = csp.get("whitelist", {}) + if wl is not None and not isinstance(wl, dict): + errors.append( + f"{role_path.name}: server.csp.whitelist must be a dict (found {type(wl).__name__}) in {cfg_file}" + ) + wl = {} # prevent crash; continue to scan other sections + if isinstance(wl, dict): + for directive, val in wl.items(): + if directive not in self.SUPPORTED_DIRECTIVES: + errors.append( + f"{role_path.name}: whitelist contains unsupported directive '{directive}' ({cfg_file})" + ) + # val may be str or list[str] + if isinstance(val, str): + values = [val] + elif isinstance(val, list): + values = val + else: + errors.append( + f"{role_path.name}: whitelist.{directive} must be a string or list of strings (found {type(val).__name__}) ({cfg_file})" + ) + values = [] - # Validate whitelist - wl = csp.get('whitelist', {}) - for directive, val in wl.items(): - if directive not in self.SUPPORTED_DIRECTIVES: - errors.append(f"{role_path.name}: whitelist contains unsupported directive '{directive}'") - # val may be str or list - values = [val] if isinstance(val, str) else (val if isinstance(val, list) else None) - if values is None: - errors.append(f"{role_path.name}: whitelist.{directive} must be a string or list of strings") - else: for entry in values: if not isinstance(entry, str) or not entry.strip(): - errors.append(f"{role_path.name}: whitelist.{directive} contains empty or non-string entry") + errors.append( + f"{role_path.name}: whitelist.{directive} contains empty or non-string entry ({cfg_file})" + ) elif not self.is_valid_whitelist_entry(entry): - errors.append(f"{role_path.name}: whitelist.{directive} entry '{entry}' is not a valid entry") + errors.append( + f"{role_path.name}: whitelist.{directive} entry '{entry}' is not a valid value ({cfg_file})" + ) - # Validate flags - fl = csp.get('flags', {}) - for directive, flag_dict in fl.items(): - if directive not in self.SUPPORTED_DIRECTIVES: - errors.append(f"{role_path.name}: flags contains unsupported directive '{directive}'") - if not isinstance(flag_dict, dict): - errors.append(f"{role_path.name}: flags.{directive} must be a dict of flag_name->bool") - continue - for flag_name, flag_val in flag_dict.items(): - if flag_name not in self.SUPPORTED_FLAGS: - errors.append(f"{role_path.name}: flags.{directive} has unsupported flag '{flag_name}'") - if not isinstance(flag_val, bool): - errors.append(f"{role_path.name}: flags.{directive}.{flag_name} must be a boolean") + # ---------- Validate flags ---------- + fl = csp.get("flags", {}) + if fl is not None and not isinstance(fl, dict): + errors.append( + f"{role_path.name}: server.csp.flags must be a dict (found {type(fl).__name__}) in {cfg_file}" + ) + fl = {} + if isinstance(fl, dict): + for directive, flag_dict in fl.items(): + if directive not in self.SUPPORTED_DIRECTIVES: + errors.append( + f"{role_path.name}: flags contains unsupported directive '{directive}' ({cfg_file})" + ) + if not isinstance(flag_dict, dict): + errors.append( + f"{role_path.name}: flags.{directive} must be a dict of flag_name->bool (found {type(flag_dict).__name__}) ({cfg_file})" + ) + continue + for flag_name, flag_val in flag_dict.items(): + if flag_name not in self.SUPPORTED_FLAGS: + errors.append( + f"{role_path.name}: flags.{directive} has unsupported flag '{flag_name}' ({cfg_file})" + ) + if not isinstance(flag_val, bool): + errors.append( + f"{role_path.name}: flags.{directive}.{flag_name} must be a boolean (found {type(flag_val).__name__}) ({cfg_file})" + ) + + # ---------- Validate hashes ---------- + hs = csp.get("hashes", {}) + if hs is not None and not isinstance(hs, dict): + errors.append( + f"{role_path.name}: server.csp.hashes must be a dict (found {type(hs).__name__}) in {cfg_file}" + ) + hs = {} + if isinstance(hs, dict): + for directive, snippet_val in hs.items(): + if directive not in self.SUPPORTED_DIRECTIVES: + errors.append( + f"{role_path.name}: hashes contains unsupported directive '{directive}' ({cfg_file})" + ) + if isinstance(snippet_val, str): + snippets = [snippet_val] + elif isinstance(snippet_val, list): + snippets = snippet_val + else: + errors.append( + f"{role_path.name}: hashes.{directive} must be a string or list of strings (found {type(snippet_val).__name__}) ({cfg_file})" + ) + snippets = [] - # Validate hashes - hs = csp.get('hashes', {}) - for directive, snippet_val in hs.items(): - if directive not in self.SUPPORTED_DIRECTIVES: - errors.append(f"{role_path.name}: hashes contains unsupported directive '{directive}'") - snippets = [snippet_val] if isinstance(snippet_val, str) else (snippet_val if isinstance(snippet_val, list) else None) - if snippets is None: - errors.append(f"{role_path.name}: hashes.{directive} must be a string or list of strings") - else: for snippet in snippets: if not isinstance(snippet, str) or not snippet.strip(): - errors.append(f"{role_path.name}: hashes.{directive} contains empty or non-string snippet") + errors.append( + f"{role_path.name}: hashes.{directive} contains empty or non-string snippet ({cfg_file})" + ) if errors: - self.fail("CSP configuration validation failures:\n" + "\n".join(errors)) + self.fail(f"CSP configuration validation failures ({len(errors)}):\n" + "\n".join(errors)) if __name__ == "__main__":