Files
computer-playbook/tests/integration/test_csp_configuration_consistency.py

185 lines
7.8 KiB
Python

# tests/integration/test_csp_configuration_consistency.py
import unittest
import yaml
from pathlib import Path
from urllib.parse import urlparse
class TestCspConfigurationConsistency(unittest.TestCase):
"""
Iterate all roles; for each config/main.yml that defines 'server.csp',
assert consistent structure and values:
- csp is a dict
- whitelist/flags/hashes are dicts if present
- directives used are supported
- flags are dicts of {flag_name: bool}, flag_name in SUPPORTED_FLAGS
- whitelist entries are valid URLs/schemes/Jinja-or '*'
- hashes entries are str or list[str], non-empty
On error, include role name and file path for easier debugging.
"""
SUPPORTED_DIRECTIVES = {
"default-src",
"connect-src",
"frame-ancestors",
"frame-src",
"script-src",
"script-src-elem",
"style-src",
"style-src-elem",
"font-src",
"worker-src",
"manifest-src",
"media-src",
}
SUPPORTED_FLAGS = {"unsafe-eval", "unsafe-inline"}
def is_valid_whitelist_entry(self, entry: str) -> bool:
"""
Accept entries that are:
- Jinja expressions (contain '{{' and '}}')
- '*' wildcard
- Data or Blob URIs (start with 'data:' or 'blob:')
- HTTP/HTTPS/WS/WSS URLs (with netloc)
"""
if not isinstance(entry, str):
return False
e = entry.strip()
if not e:
return False
if "{{" in e and "}}" in e:
return True
if e == "*":
return True
if e.startswith(("data:", "blob:")):
return True
parsed = urlparse(e)
return parsed.scheme in ("http", "https", "ws", "wss") and bool(parsed.netloc)
def test_csp_configuration_structure(self):
roles_dir = Path(__file__).resolve().parent.parent.parent / "roles"
errors = []
for role_path in sorted(roles_dir.iterdir()):
if not role_path.is_dir():
continue
cfg_file = role_path / "config" / "main.yml"
if not cfg_file.exists():
continue
# Parse YAML (collect role + file path on error)
try:
cfg = yaml.safe_load(cfg_file.read_text(encoding="utf-8")) or {}
except yaml.YAMLError as e:
errors.append(f"{role_path.name}: YAML parse error in {cfg_file}: {e}")
continue
csp = cfg.get("server", {}).get("csp")
if csp is None:
continue # No CSP section, nothing to check
if not isinstance(csp, dict):
errors.append(f"{role_path.name}: 'server.csp' must be a dict (found {type(csp).__name__}) in {cfg_file}")
# Can't proceed safely with sub-sections
continue
# ---------- Validate whitelist ----------
wl = csp.get("whitelist", {})
if wl is not None and not isinstance(wl, dict):
errors.append(
f"{role_path.name}: server.csp.whitelist must be a dict (found {type(wl).__name__}) in {cfg_file}"
)
wl = {} # prevent crash; continue to scan other sections
if isinstance(wl, dict):
for directive, val in wl.items():
if directive not in self.SUPPORTED_DIRECTIVES:
errors.append(
f"{role_path.name}: whitelist contains unsupported directive '{directive}' ({cfg_file})"
)
# val may be str or list[str]
if isinstance(val, str):
values = [val]
elif isinstance(val, list):
values = val
else:
errors.append(
f"{role_path.name}: whitelist.{directive} must be a string or list of strings (found {type(val).__name__}) ({cfg_file})"
)
values = []
for entry in values:
if not isinstance(entry, str) or not entry.strip():
errors.append(
f"{role_path.name}: whitelist.{directive} contains empty or non-string entry ({cfg_file})"
)
elif not self.is_valid_whitelist_entry(entry):
errors.append(
f"{role_path.name}: whitelist.{directive} entry '{entry}' is not a valid value ({cfg_file})"
)
# ---------- Validate flags ----------
fl = csp.get("flags", {})
if fl is not None and not isinstance(fl, dict):
errors.append(
f"{role_path.name}: server.csp.flags must be a dict (found {type(fl).__name__}) in {cfg_file}"
)
fl = {}
if isinstance(fl, dict):
for directive, flag_dict in fl.items():
if directive not in self.SUPPORTED_DIRECTIVES:
errors.append(
f"{role_path.name}: flags contains unsupported directive '{directive}' ({cfg_file})"
)
if not isinstance(flag_dict, dict):
errors.append(
f"{role_path.name}: flags.{directive} must be a dict of flag_name->bool (found {type(flag_dict).__name__}) ({cfg_file})"
)
continue
for flag_name, flag_val in flag_dict.items():
if flag_name not in self.SUPPORTED_FLAGS:
errors.append(
f"{role_path.name}: flags.{directive} has unsupported flag '{flag_name}' ({cfg_file})"
)
if not isinstance(flag_val, bool):
errors.append(
f"{role_path.name}: flags.{directive}.{flag_name} must be a boolean (found {type(flag_val).__name__}) ({cfg_file})"
)
# ---------- Validate hashes ----------
hs = csp.get("hashes", {})
if hs is not None and not isinstance(hs, dict):
errors.append(
f"{role_path.name}: server.csp.hashes must be a dict (found {type(hs).__name__}) in {cfg_file}"
)
hs = {}
if isinstance(hs, dict):
for directive, snippet_val in hs.items():
if directive not in self.SUPPORTED_DIRECTIVES:
errors.append(
f"{role_path.name}: hashes contains unsupported directive '{directive}' ({cfg_file})"
)
if isinstance(snippet_val, str):
snippets = [snippet_val]
elif isinstance(snippet_val, list):
snippets = snippet_val
else:
errors.append(
f"{role_path.name}: hashes.{directive} must be a string or list of strings (found {type(snippet_val).__name__}) ({cfg_file})"
)
snippets = []
for snippet in snippets:
if not isinstance(snippet, str) or not snippet.strip():
errors.append(
f"{role_path.name}: hashes.{directive} contains empty or non-string snippet ({cfg_file})"
)
if errors:
self.fail(f"CSP configuration validation failures ({len(errors)}):\n" + "\n".join(errors))
if __name__ == "__main__":
unittest.main()