Refactor web health checker & domain expectations (filter-based)

- Move all domain→expected-status mapping to filter `web_health_expectations`.
- Require explicit app selection via non-empty `group_names`; only those apps are included.
- Add `www_enabled` flag (wired via `WWW_REDIRECT_ENABLED`) to generate/force www.* → 301.
- Support `redirect_maps` to include manual redirects (sources forced to 301), independent of app selection.
- Aliases always 301; canonicals use per-key override or `server.status_codes.default`, else [200,302,301].
- Remove legacy fallbacks (`server.status_codes.home` / `landingpage`).
- Wire filter output into systemd ExecStart script as JSON expectations.
- Normalize various templates to use `to_json` and minor spacing fixes.
- Update app configs (e.g., YOURLS default=301; Confluence default=302; Bluesky web=405; MediaWiki/Confluence canonical/aliases).
- Constructor now uses `WWW_REDIRECT_ENABLED` for domain generation.

Tests:
- Add comprehensive unit tests for filter: selection by group, keyed/default codes, aliases, www handling, redirect_maps, input sanitization.
- Add unit tests for the standalone checker script (JSON parsing, OK/mismatch counting, sanitization).

See conversation: https://chatgpt.com/share/68c2b93e-de58-800f-8c16-ea05755ba776
This commit is contained in:
2025-09-11 13:58:16 +02:00
parent 6418a462ec
commit cbfb096cdb
35 changed files with 717 additions and 106 deletions

View File

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
Ultra-thin checker: consume a JSON mapping of {domain: [expected_status_codes]}
and verify HTTP HEAD responses. All mapping logic is done in the filter
`web_health_expectations`.
"""
import argparse
import json
import sys
from typing import Dict, List
import requests
def parse_args(argv=None):
p = argparse.ArgumentParser(description="Web health checker (expects precomputed domain→codes mapping).")
p.add_argument("--web-protocol", default="https", choices=["http", "https"], help="Protocol to use")
p.add_argument("--expectations", required=True, help="JSON STRING: {\"domain\": [codes], ...}")
return p.parse_args(argv)
def _parse_json_mapping(name: str, value: str) -> Dict[str, List[int]]:
try:
obj = json.loads(value)
except json.JSONDecodeError as e:
raise SystemExit(f"--{name} must be a valid JSON string: {e}")
if not isinstance(obj, dict):
raise SystemExit(f"--{name} must be a JSON object (mapping)")
# sanitize list-of-ints shape
clean = {}
for k, v in obj.items():
if isinstance(v, list):
try:
clean[k] = [int(x) for x in v]
except Exception:
clean[k] = []
else:
clean[k] = []
return clean
def main(argv=None) -> int:
args = parse_args(argv)
expectations = _parse_json_mapping("expectations", args.expectations)
errors = 0
for domain in sorted(expectations.keys()):
expected = expectations[domain] or []
url = f"{args.web_protocol}://{domain}"
try:
r = requests.head(url, allow_redirects=False, timeout=10)
if expected and r.status_code in expected:
print(f"{domain}: OK")
elif not expected:
# If somehow empty list slipped through, treat as failure to be explicit
print(f"{domain}: ERROR: No expectations provided. Got {r.status_code}.")
errors += 1
else:
print(f"{domain}: ERROR: Expected {expected}. Got {r.status_code}.")
errors += 1
except requests.RequestException as e:
print(f"{domain}: error due to {e}")
errors += 1
if errors:
print(f"Warning: {errors} domains responded with an unexpected https status code.")
return errors
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,186 @@
# roles/sys-ctl-hlth-webserver/filter_plugins/web_health_expectations.py
import os
import sys
from collections.abc import Mapping
# Make repo-level module_utils importable (go up three levels from this file)
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))
from module_utils.config_utils import get_app_conf # reuse existing helper
DEFAULT_OK = [200, 302, 301]
def _to_list(x, *, allow_mapping: bool = True):
"""Normalize into a flat list of **strings only**."""
if x is None:
return []
if isinstance(x, bytes):
try:
return [x.decode("utf-8")]
except Exception:
return []
if isinstance(x, str):
return [x]
if isinstance(x, (list, tuple, set)):
out = []
for v in x:
if isinstance(v, (list, tuple, set)):
out.extend(_to_list(v, allow_mapping=False))
elif isinstance(v, bytes):
try:
out.append(v.decode("utf-8"))
except Exception:
pass
elif isinstance(v, str):
out.append(v)
elif isinstance(v, Mapping):
continue
return out
if isinstance(x, Mapping) and allow_mapping:
out = []
for v in x.values():
out.extend(_to_list(v, allow_mapping=True))
return out
return []
def _valid_http_code(x):
"""Return int(x) if 100 <= code <= 599 else None."""
try:
v = int(x)
except (TypeError, ValueError):
return None
return v if 100 <= v <= 599 else None
def _extract_redirect_sources(redirect_maps):
"""Extract a set of source domains from redirect maps."""
sources = set()
if not redirect_maps:
return sources
def _add_one(obj):
if isinstance(obj, str) and obj:
sources.add(obj)
elif isinstance(obj, Mapping):
s = obj.get("source")
if isinstance(s, str) and s:
sources.add(s)
if isinstance(redirect_maps, (list, tuple, set)):
for item in redirect_maps:
_add_one(item)
else:
_add_one(redirect_maps)
return sources
def _normalize_selection(group_names):
"""Return a non-empty set of group names, or raise ValueError."""
if isinstance(group_names, (list, set, tuple)):
sel = {str(x) for x in group_names if str(x)}
elif isinstance(group_names, str):
sel = {g.strip() for g in group_names.split(",") if g.strip()}
else:
sel = set()
if not sel:
raise ValueError("web_health_expectations: 'group_names' must be provided and non-empty")
return sel
def web_health_expectations(applications, www_enabled: bool = False, group_names=None, redirect_maps=None):
"""Produce a **flat mapping**: domain -> [expected_status_codes].
Selection (REQUIRED):
- `group_names` must be provided and non-empty.
- Only include applications whose key is in `group_names`.
Rules:
- Canonical domains (dict-key overrides, else default, else DEFAULT_OK).
- Flat canonical (default, else DEFAULT_OK).
- Aliases always [301].
- No legacy fallbacks (ignore 'home'/'landingpage').
- `redirect_maps`: force <source> -> [301] and override app-derived entries.
- If `www_enabled`: add and/or force www.* -> [301] for all domains.
"""
if not isinstance(applications, Mapping):
return {}
selection = _normalize_selection(group_names)
expectations = {}
for app_id in applications.keys():
if app_id not in selection:
continue
canonical_raw = get_app_conf(
applications, app_id, 'server.domains.canonical',
strict=False, default=[]
)
aliases_raw = get_app_conf(
applications, app_id, 'server.domains.aliases',
strict=False, default=[]
)
aliases = _to_list(aliases_raw, allow_mapping=True)
sc_raw = get_app_conf(
applications, app_id, 'server.status_codes',
strict=False, default={}
)
sc_map = {}
if isinstance(sc_raw, Mapping):
for k, v in sc_raw.items():
code = _valid_http_code(v)
if code is not None:
sc_map[str(k)] = code
if isinstance(canonical_raw, Mapping) and canonical_raw:
for key, domains in canonical_raw.items():
domains_list = _to_list(domains, allow_mapping=False)
code = _valid_http_code(sc_map.get(key))
if code is None:
code = _valid_http_code(sc_map.get("default"))
expected = [code] if code is not None else list(DEFAULT_OK)
for d in domains_list:
if d:
expectations[d] = expected
else:
for d in _to_list(canonical_raw, allow_mapping=True):
if not d:
continue
code = _valid_http_code(sc_map.get("default"))
expectations[d] = [code] if code is not None else list(DEFAULT_OK)
for d in aliases:
if d:
expectations[d] = [301]
for src in _extract_redirect_sources(redirect_maps):
expectations[src] = [301]
if www_enabled:
add = {}
for d in expectations.keys():
if not d.startswith("www."):
add[f"www.{d}"] = [301]
expectations.update(add)
for d in list(expectations.keys()):
if d.startswith("www."):
expectations[d] = [301]
return expectations
class FilterModule(object):
def filters(self):
return {
'web_health_expectations': web_health_expectations,
}

View File

@@ -20,3 +20,7 @@
system_service_timer_enabled: true
system_service_tpl_on_failure: "{{ SYS_SERVICE_ON_FAILURE_COMPOSE }}"
system_service_tpl_timeout_start_sec: "{{ CURRENT_PLAY_DOMAINS_ALL | timeout_start_sec_for_domains }}"
system_service_tpl_exec_start: >
{{ system_service_script_exec }}
--web-protocol {{ WEB_PROTOCOL }}
--expectations '{{ applications | web_health_expectations(www_enabled=WWW_REDIRECT_ENABLED, group_names=group_names) | to_json }}'

View File

@@ -1,69 +0,0 @@
import os
import requests
import sys
import re
def get_expected_statuses(domain: str, parts: list[str], redirected_domains: set[str]) -> list[int]:
"""
Determine the expected HTTP status codes based on the domain name.
Args:
domain: The full domain string (e.g. 'example.com').
parts: The domain split into its subcomponents (e.g. ['www', 'example', 'com']).
redirected_domains: A set of domains that should trigger a redirect.
Returns:
A list of expected HTTP status codes.
"""
if domain == '{{ domains | get_domain('web-app-listmonk') }}':
return [404]
if (parts and parts[0] == 'www') or (domain in redirected_domains):
return [301]
if domain == '{{ domains | get_domain('web-app-yourls') }}':
return [{{ applications | get_app_conf('web-app-yourls', 'server.status_codes.landingpage') }}]
return [200, 302, 301]
# file in which fqdn server configs are deposit
config_path = '{{ NGINX.DIRECTORIES.HTTP.SERVERS }}'
# Initialize the error counter
error_counter = 0
# Regex pattern to match domain.tld or *.domain.tld
pattern = re.compile(r"^(?:[\w-]+\.)*[\w-]+\.[\w-]+\.conf$")
# Iterate over each file in the configuration directory
for filename in os.listdir(config_path):
if filename.endswith('.conf') and pattern.match(filename):
# Extract the domain and subdomain from the filename
domain = filename.replace('.conf', '')
parts = domain.split('.')
# Prepare the URL and expected status codes
url = f"{{ WEB_PROTOCOL }}://{domain}"
redirected_domains = [domain['source'] for domain in {{ redirect_domain_mappings }}]
redirected_domains.append("{{domains | get_domain('web-app-mailu') }}")
expected_statuses = get_expected_statuses(domain, parts, redirected_domains)
try:
# Send a HEAD request to get only the response header
response = requests.head(url)
# Check if the status code matches the expected statuses
if response.status_code in expected_statuses:
print(f"{domain}: OK")
else:
print(f"{domain}: ERROR: Expected {expected_statuses}. Got {response.status_code}.")
error_counter += 1
except requests.RequestException as e:
# Handle exceptions for requests like connection errors
print(f"{domain}: error due to {e}")
error_counter += 1
if error_counter > 0:
print(f"Warning: {error_counter} domains responded with an unexpected https status code.")
# Exit the script with the number of errors as the exit code
sys.exit(error_counter)