diff --git a/src/automtu/cli.py b/src/automtu/cli.py index cc78033..8efd48d 100644 --- a/src/automtu/cli.py +++ b/src/automtu/cli.py @@ -90,4 +90,17 @@ def build_parser() -> argparse.ArgumentParser: ap.add_argument( "--dry-run", action="store_true", help="Show actions without applying changes." ) + + # --- Machine-readable output modes --- + ap.add_argument( + "--print-mtu", + choices=["egress", "effective", "wg"], + help="Print only the selected MTU value as a number (stdout) for automation (e.g. Ansible).", + ) + ap.add_argument( + "--print-json", + action="store_true", + help="Print a JSON object with computed values (stdout) for automation.", + ) + return ap diff --git a/src/automtu/core.py b/src/automtu/core.py index 2245616..f6fb4bd 100644 --- a/src/automtu/core.py +++ b/src/automtu/core.py @@ -13,6 +13,7 @@ from .net import ( require_root, set_iface_mtu, ) +from .output import Logger, OutputMode, emit_json, emit_single_number from .pmtu import probe_pmtu from .wg import wg_is_active, wg_peer_endpoints @@ -45,7 +46,24 @@ def _choose(values: Iterable[int], policy: str) -> int: def run_automtu(args) -> int: - require_root(args.dry_run) + mode = OutputMode( + print_mtu=getattr(args, "print_mtu", None), + print_json=bool(getattr(args, "print_json", False)), + ) + err = mode.validate() + if err: + print(f"[automtu][ERROR] {err}", file=sys.stderr) + return 4 + + log = Logger(mode.machine).log + + # Root is only needed if we actually change something (without --dry-run). + needs_root = bool( + args.apply_egress_mtu + or args.apply_wg_mtu + or (args.force_egress_mtu is not None) + ) + require_root(dry=args.dry_run, needs_root=needs_root) egress = args.egress_if or detect_egress_iface(ignore_vpn=not args.prefer_wg_egress) if not egress: @@ -66,35 +84,42 @@ def run_automtu(args) -> int: and default_route_uses_iface(args.wg_if) ): egress = args.wg_if - print(f"[automtu] Using WireGuard interface {args.wg_if} as egress basis.") + log(f"[automtu] Using WireGuard interface {args.wg_if} as egress basis.") - print(f"[automtu] Detected egress interface: {egress}") + log(f"[automtu] Detected egress interface: {egress}") + # Base MTU (optionally forced) if args.force_egress_mtu: - print(f"[automtu] Forcing egress MTU {args.force_egress_mtu} on {egress}") + log(f"[automtu] Forcing egress MTU {args.force_egress_mtu} on {egress}") set_iface_mtu(egress, args.force_egress_mtu, args.dry_run) - base_mtu = args.force_egress_mtu + base_mtu = int(args.force_egress_mtu) else: - base_mtu = read_iface_mtu(egress) - print(f"[automtu] Egress base MTU: {base_mtu}") + base_mtu = int(read_iface_mtu(egress)) + log(f"[automtu] Egress base MTU: {base_mtu}") + # Targets (explicit + optional WG auto targets) targets = _split_targets(args.pmtu_target) + auto_targets_added: list[str] = [] + if args.auto_pmtu_from_wg: if wg_is_active(args.wg_if): peers = wg_peer_endpoints(args.wg_if) if peers: - print( + auto_targets_added = peers[:] + log( f"[automtu] Auto-added WG peer endpoints as PMTU targets: {', '.join(peers)}" ) targets = list(dict.fromkeys([*targets, *peers])) else: - print( - f"[automtu] INFO: {args.wg_if} not active; skipping auto PMTU targets." - ) + log(f"[automtu] INFO: {args.wg_if} not active; skipping auto PMTU targets.") + # PMTU probing effective_mtu = base_mtu + probe_results: dict[str, Optional[int]] = {} + chosen_pmtu: Optional[int] = None + if targets: - print( + log( f"[automtu] Probing Path MTU for: {', '.join(targets)} (policy={args.pmtu_policy})" ) good: list[int] = [] @@ -102,54 +127,101 @@ def run_automtu(args) -> int: p = probe_pmtu( t, args.pmtu_min_payload, args.pmtu_max_payload, args.pmtu_timeout ) - print(f"[automtu] - {t}: {p if p else 'probe failed'}") + probe_results[t] = p + log(f"[automtu] - {t}: {p if p else 'probe failed'}") if p: - good.append(p) + good.append(int(p)) if good: - chosen = _choose(good, args.pmtu_policy) - print(f"[automtu] Selected Path MTU (policy={args.pmtu_policy}): {chosen}") - effective_mtu = min(base_mtu, chosen) + chosen_pmtu = _choose(good, args.pmtu_policy) + log( + f"[automtu] Selected Path MTU (policy={args.pmtu_policy}): {chosen_pmtu}" + ) + effective_mtu = min(base_mtu, chosen_pmtu) else: - print( + log( "[automtu] WARNING: All PMTU probes failed. Falling back to egress MTU." ) + # Apply egress MTU (optional) + egress_applied = False if args.apply_egress_mtu: if egress == args.wg_if: - print( + log( f"[automtu] INFO: Skipping egress MTU apply because egress == {args.wg_if}." ) else: - print( - f"[automtu] Applying effective MTU {effective_mtu} to egress {egress}" - ) + log(f"[automtu] Applying effective MTU {effective_mtu} to egress {egress}") set_iface_mtu(egress, effective_mtu, args.dry_run) + egress_applied = True + # Compute WG MTU wg_mtu = max(int(args.wg_min), int(effective_mtu) - int(args.wg_overhead)) - print( + log( f"[automtu] Computed {args.wg_if} MTU: {wg_mtu} (overhead={args.wg_overhead}, min={args.wg_min})" ) + wg_mtu_set: Optional[int] = None + wg_mtu_clamped = False + if args.set_wg_mtu is not None: - forced = max(int(args.wg_min), int(args.set_wg_mtu)) - if forced != int(args.set_wg_mtu): - print( + wg_mtu_set = int(args.set_wg_mtu) + forced = max(int(args.wg_min), wg_mtu_set) + wg_mtu_clamped = forced != wg_mtu_set + if wg_mtu_clamped: + log( f"[automtu][WARN] --set-wg-mtu clamped to {forced} (wg-min={args.wg_min})." ) wg_mtu = forced - print(f"[automtu] Forcing WireGuard MTU (override): {wg_mtu}") + log(f"[automtu] Forcing WireGuard MTU (override): {wg_mtu}") + + # Apply WG MTU (optional) + wg_present = iface_exists(args.wg_if) + wg_active = wg_is_active(args.wg_if) if wg_present else False + wg_applied = False if args.apply_wg_mtu: - if iface_exists(args.wg_if): + if wg_present: set_iface_mtu(args.wg_if, wg_mtu, args.dry_run) - print(f"[automtu] Applied: {args.wg_if} MTU {wg_mtu}") + log(f"[automtu] Applied: {args.wg_if} MTU {wg_mtu}") + wg_applied = True else: - print( + log( f"[automtu] NOTE: {args.wg_if} not present yet. Start WireGuard first, then re-run." ) else: - print("[automtu] INFO: Not applying WireGuard MTU (use --apply-wg-mtu).") + log("[automtu] INFO: Not applying WireGuard MTU (use --apply-wg-mtu).") + + # Machine-readable outputs + if emit_single_number( + mode, base_mtu=base_mtu, effective_mtu=effective_mtu, wg_mtu=wg_mtu + ): + return 0 + + if emit_json( + mode, + egress_iface=egress, + base_mtu=base_mtu, + effective_mtu=effective_mtu, + egress_forced_mtu=int(args.force_egress_mtu) if args.force_egress_mtu else None, + egress_applied=egress_applied, + pmtu_targets=targets, + pmtu_auto_targets_added=auto_targets_added, + pmtu_policy=args.pmtu_policy, + pmtu_chosen=chosen_pmtu, + pmtu_results=probe_results, + wg_iface=args.wg_if, + wg_mtu=wg_mtu, + wg_overhead=int(args.wg_overhead), + wg_min=int(args.wg_min), + wg_set_mtu=wg_mtu_set, + wg_clamped=wg_mtu_clamped, + wg_present=wg_present, + wg_active=wg_active, + wg_applied=wg_applied, + dry_run=bool(args.dry_run), + ): + return 0 _ = Result( egress=egress, diff --git a/src/automtu/net.py b/src/automtu/net.py index b7bff7d..b7aefe2 100644 --- a/src/automtu/net.py +++ b/src/automtu/net.py @@ -29,8 +29,8 @@ def set_iface_mtu(iface: str, mtu: int, dry: bool) -> None: subprocess.run(["ip", "link", "set", "mtu", str(mtu), "dev", iface], check=True) -def require_root(dry: bool) -> None: - if not dry and os.geteuid() != 0: +def require_root(*, dry: bool, needs_root: bool) -> None: + if needs_root and (not dry) and os.geteuid() != 0: print( "[automtu][ERROR] Please run as root (sudo) or use --dry-run.", file=sys.stderr, diff --git a/src/automtu/output.py b/src/automtu/output.py new file mode 100644 index 0000000..c4390a9 --- /dev/null +++ b/src/automtu/output.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import json +import sys +from dataclasses import dataclass +from typing import Optional + + +@dataclass(frozen=True) +class OutputMode: + print_mtu: Optional[str] # "egress" | "effective" | "wg" | None + print_json: bool + + @property + def machine(self) -> bool: + return bool(self.print_mtu or self.print_json) + + def validate(self) -> Optional[str]: + if self.print_mtu and self.print_json: + return "--print-mtu and --print-json are mutually exclusive." + return None + + +class Logger: + """ + Routes logs to stderr when in machine mode, so stdout can be cleanly parsed. + """ + + def __init__(self, machine_mode: bool) -> None: + self._machine = machine_mode + + def log(self, msg: str) -> None: + if self._machine: + print(msg, file=sys.stderr) + else: + print(msg) + + +def emit_single_number( + mode: OutputMode, *, base_mtu: int, effective_mtu: int, wg_mtu: int +) -> bool: + """ + Returns True if it emitted output (and caller should return). + """ + if not mode.print_mtu: + return False + + key = mode.print_mtu + if key == "egress": + print(int(base_mtu)) + return True + if key == "effective": + print(int(effective_mtu)) + return True + if key == "wg": + print(int(wg_mtu)) + return True + + # Should never happen due to argparse choices + print("[automtu][ERROR] Invalid --print-mtu value.", file=sys.stderr) + raise SystemExit(4) + + +def emit_json( + mode: OutputMode, + *, + egress_iface: str, + base_mtu: int, + effective_mtu: int, + egress_forced_mtu: Optional[int], + egress_applied: bool, + pmtu_targets: list[str], + pmtu_auto_targets_added: list[str], + pmtu_policy: str, + pmtu_chosen: Optional[int], + pmtu_results: dict[str, Optional[int]], + wg_iface: str, + wg_mtu: int, + wg_overhead: int, + wg_min: int, + wg_set_mtu: Optional[int], + wg_clamped: bool, + wg_present: bool, + wg_active: bool, + wg_applied: bool, + dry_run: bool, +) -> bool: + """ + Returns True if it emitted output (and caller should return). + """ + if not mode.print_json: + return False + + payload = { + "egress": { + "iface": egress_iface, + "base_mtu": int(base_mtu), + "effective_mtu": int(effective_mtu), + "forced_mtu": int(egress_forced_mtu) + if egress_forced_mtu is not None + else None, + "applied": bool(egress_applied), + }, + "pmtu": { + "targets": list(pmtu_targets), + "auto_targets_added": list(pmtu_auto_targets_added), + "policy": pmtu_policy, + "chosen": int(pmtu_chosen) if pmtu_chosen is not None else None, + "results": { + k: (int(v) if v is not None else None) for k, v in pmtu_results.items() + }, + }, + "wg": { + "iface": wg_iface, + "mtu": int(wg_mtu), + "overhead": int(wg_overhead), + "min": int(wg_min), + "set_mtu": int(wg_set_mtu) if wg_set_mtu is not None else None, + "clamped": bool(wg_clamped), + "present": bool(wg_present), + "active": bool(wg_active), + "applied": bool(wg_applied), + }, + "dry_run": bool(dry_run), + } + + print(json.dumps(payload, sort_keys=True)) + return True diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index b9f8d47..80be5f1 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -37,6 +37,8 @@ class TestCli(unittest.TestCase): "--force-egress-mtu", "1452", "--dry-run", + "--print-mtu", + "wg", ] ) @@ -56,6 +58,8 @@ class TestCli(unittest.TestCase): self.assertEqual(args.set_wg_mtu, 1372) self.assertEqual(args.force_egress_mtu, 1452) self.assertTrue(args.dry_run) + self.assertEqual(args.print_mtu, "wg") + self.assertFalse(args.print_json) if __name__ == "__main__": diff --git a/tests/unit/test_output.py b/tests/unit/test_output.py new file mode 100644 index 0000000..7527ba7 --- /dev/null +++ b/tests/unit/test_output.py @@ -0,0 +1,87 @@ +import io +import json +import unittest +from contextlib import redirect_stdout, redirect_stderr + +from automtu.output import OutputMode, emit_json, emit_single_number, Logger + + +class TestOutput(unittest.TestCase): + def test_output_mode_validate_mutual_exclusive(self) -> None: + mode = OutputMode(print_mtu="effective", print_json=True) + self.assertIsNotNone(mode.validate()) + + mode_ok = OutputMode(print_mtu=None, print_json=True) + self.assertIsNone(mode_ok.validate()) + + def test_logger_routes_to_stderr_in_machine_mode(self) -> None: + log = Logger(machine_mode=True).log + + out = io.StringIO() + err = io.StringIO() + with redirect_stdout(out), redirect_stderr(err): + log("[automtu] hello") + + self.assertEqual(out.getvalue(), "") + self.assertIn("hello", err.getvalue()) + + def test_emit_single_number_effective(self) -> None: + mode = OutputMode(print_mtu="effective", print_json=False) + + out = io.StringIO() + with redirect_stdout(out): + emitted = emit_single_number( + mode, base_mtu=1500, effective_mtu=1452, wg_mtu=1372 + ) + + self.assertTrue(emitted) + self.assertEqual(out.getvalue().strip(), "1452") + + def test_emit_json_is_valid_and_contains_expected_fields(self) -> None: + mode = OutputMode(print_mtu=None, print_json=True) + + out = io.StringIO() + with redirect_stdout(out): + emitted = emit_json( + mode, + egress_iface="eth0", + base_mtu=1500, + effective_mtu=1452, + egress_forced_mtu=None, + egress_applied=False, + pmtu_targets=["1.1.1.1", "8.8.8.8"], + pmtu_auto_targets_added=[], + pmtu_policy="min", + pmtu_chosen=1452, + pmtu_results={"1.1.1.1": 1452, "8.8.8.8": None}, + wg_iface="wg0", + wg_mtu=1372, + wg_overhead=80, + wg_min=1280, + wg_set_mtu=None, + wg_clamped=False, + wg_present=True, + wg_active=False, + wg_applied=False, + dry_run=True, + ) + + self.assertTrue(emitted) + + payload = json.loads(out.getvalue()) + self.assertEqual(payload["egress"]["iface"], "eth0") + self.assertEqual(payload["egress"]["base_mtu"], 1500) + self.assertEqual(payload["egress"]["effective_mtu"], 1452) + + self.assertEqual(payload["pmtu"]["policy"], "min") + self.assertEqual(payload["pmtu"]["chosen"], 1452) + self.assertEqual(payload["pmtu"]["results"]["1.1.1.1"], 1452) + self.assertIsNone(payload["pmtu"]["results"]["8.8.8.8"]) + + self.assertEqual(payload["wg"]["iface"], "wg0") + self.assertEqual(payload["wg"]["mtu"], 1372) + self.assertTrue(payload["dry_run"]) + + +if __name__ == "__main__": + unittest.main(verbosity=2)