From dcc7a68973f9832a83e62c643a99213cf4fd8dc9 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Wed, 21 Jan 2026 18:53:44 +0100 Subject: [PATCH] refactor: convert script to automtu package with CI workflow https://chatgpt.com/share/697112b2-0410-800f-93ff-9372b603d43f --- .github/workflows/ci.yml | 46 +++++ Guide.md | 212 +++-------------------- Makefile | 11 +- README.md | 98 ++--------- main.py | 326 ------------------------------------ pyproject.toml | 23 +++ src/automtu/__init__.py | 2 + src/automtu/__main__.py | 13 ++ src/automtu/cli.py | 93 ++++++++++ src/automtu/core.py | 161 ++++++++++++++++++ src/automtu/net.py | 82 +++++++++ src/automtu/pmtu.py | 60 +++++++ src/automtu/wg.py | 48 ++++++ test.py | 287 ------------------------------- tests/__init__.py | 0 tests/unit/__init__.py | 0 tests/unit/test___init__.py | 14 ++ tests/unit/test___main__.py | 26 +++ tests/unit/test_cli.py | 62 +++++++ tests/unit/test_core.py | 94 +++++++++++ tests/unit/test_net.py | 49 ++++++ tests/unit/test_pmtu.py | 36 ++++ tests/unit/test_wg.py | 58 +++++++ 23 files changed, 905 insertions(+), 896 deletions(-) create mode 100644 .github/workflows/ci.yml delete mode 100755 main.py create mode 100644 pyproject.toml create mode 100644 src/automtu/__init__.py create mode 100644 src/automtu/__main__.py create mode 100644 src/automtu/cli.py create mode 100644 src/automtu/core.py create mode 100644 src/automtu/net.py create mode 100644 src/automtu/pmtu.py create mode 100644 src/automtu/wg.py delete mode 100644 test.py create mode 100644 tests/__init__.py create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/test___init__.py create mode 100644 tests/unit/test___main__.py create mode 100644 tests/unit/test_cli.py create mode 100644 tests/unit/test_core.py create mode 100644 tests/unit/test_net.py create mode 100644 tests/unit/test_pmtu.py create mode 100644 tests/unit/test_wg.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..be4f101 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,46 @@ +name: CI + +on: + push: + pull_request: + +jobs: + test-and-lint: + runs-on: ubuntu-latest + timeout-minutes: 15 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: "pip" + + - name: Install dependencies + shell: bash + run: | + set -euo pipefail + python -m pip install --upgrade pip + python -m pip install -e . + python -m pip install ruff + + - name: Ruff (lint) + shell: bash + run: | + set -euo pipefail + ruff check . + + - name: Ruff (format check) + shell: bash + run: | + set -euo pipefail + ruff format --check . + + - name: Unit tests + shell: bash + run: | + set -euo pipefail + make test diff --git a/Guide.md b/Guide.md index 29cf1cd..4fbacf5 100644 --- a/Guide.md +++ b/Guide.md @@ -1,202 +1,30 @@ -# wg-mtu-auto — Practical Guide - -This guide shows **how to *determine* and *set*** correct MTU values: - -* **with WireGuard** (`wg0`), and -* **without WireGuard** (just your egress interface like `eth0`/`wlan0`). +# automtu — Practical Guide The tool can: +- auto-detect your egress interface (e.g., eth0) +- probe Path MTU (PMTU) using `ping -M do` +- compute a safe WireGuard MTU: `effective_mtu - overhead` (default overhead=80) +- apply MTU to egress and/or WireGuard (explicit flags) -* auto-detect your **egress interface** -* optionally **probe Path MTU (PMTU)** to one or more remote targets -* compute a safe **WireGuard MTU** (`effective_mtu - 80` by default) -* **apply** MTU to `wg0` and/or your egress interface +## Recipes ---- +1) Only compute/show (safe default): + automtu --dry-run -## TL;DR recipes +2) Probe PMTU and apply to egress: + sudo automtu --pmtu-target 1.1.1.1 --apply-egress-mtu -### 1) Just compute & set WireGuard MTU (no PMTU probing) +3) Auto-add WireGuard peer endpoints as PMTU targets and apply WG MTU: + sudo automtu --auto-pmtu-from-wg --apply-wg-mtu -```bash -sudo automtu -# Equivalent from repo: -# sudo python3 main.py -``` +4) Prefer WireGuard as egress basis if default route uses wg0: + sudo automtu --prefer-wg-egress --auto-pmtu-from-wg --apply-wg-mtu -* Detects egress (e.g., `eth0`), reads its MTU (e.g., 1500) -* Computes `wg0` MTU = `egress_mtu - 80` (min clamp 1280) -* Applies to `wg0` (if present) +5) Force WG MTU: + sudo automtu --set-wg-mtu 1372 --apply-wg-mtu -Dry-run: +## Notes -```bash -automtu --dry-run -``` - ---- - -### 2) Compute & apply MTU on *egress* (non-WireGuard) - -Useful if you want the *link itself* (e.g., `eth0`) to use the discovered PMTU. - -```bash -sudo automtu --pmtu-target 1.1.1.1 --apply-egress-mtu -``` - -* Probes PMTU to `1.1.1.1`, applies that result to `eth0` -* Also computes a matching WireGuard MTU (`PMTU - 80`) and sets `wg0` (if present) - -> If the selected egress is `wg0`, egress application is **skipped** on purpose. - ---- - -### 3) With WireGuard peers: auto-add endpoints as PMTU targets - -```bash -sudo automtu --auto-pmtu-from-wg -``` - -* Reads `wg0` peer endpoints (`wg show ...` / `wg showconf`) -* Probes PMTU to those endpoints -* Picks an **effective PMTU** (policy = `min` by default) -* Applies **`wg0` MTU = effective PMTU − 80** - -Add extra targets & choose different policy: - -```bash -sudo automtu --auto-pmtu-from-wg \ - --pmtu-target 46.4.224.77,1.1.1.1 \ - --pmtu-policy median -``` - ---- - -### 4) Force a specific WireGuard MTU (override) - -```bash -sudo automtu --set-wg-mtu 1372 -``` - -* Skips the computed value and **forces** 1372 on `wg0` (clamped to ≥1280) - ---- - -## When to use which approach? - -* **You just use WireGuard** and want a safe default: - `sudo automtu` → picks `wg0 = egress_mtu - 80` (e.g., `1500 - 80 = 1420`). - -* **You suspect smaller upstream MTU** (PPPoE/ISP/VPN/“somewhere in the path”): - Use PMTU probing towards stable targets (your WG peer, DNS resolvers): - - ```bash - sudo automtu --pmtu-target 46.4.224.77 --pmtu-target 1.1.1.1 - ``` - - Then optionally apply the PMTU to your egress: - - ```bash - sudo automtu --pmtu-target 46.4.224.77 --apply-egress-mtu - ``` - -* **You have WireGuard peers** and want the tool to discover them automatically: - `sudo automtu --auto-pmtu-from-wg` - (You can still add manual targets and change policy.) - ---- - -## How it works (short) - -1. **Egress detection** - Reads default routes and picks a non-VPN interface (e.g., `eth0`). - If you want to prefer `wg0` when the default route already uses it: - - ```bash - sudo automtu --prefer-wg-egress --wg-if wg0 - ``` - -2. **PMTU probing (optional)** - Uses `ping -M do` (DF set) with a quick binary search to find the largest unfragmented payload for each target. - From the successful results, selects an **effective PMTU** using a policy: - - * `--pmtu-policy min` (default, safest) - * `--pmtu-policy median` - * `--pmtu-policy max` - -3. **WireGuard MTU calculation** - `wg_mtu = max(wg_min, effective_mtu - wg_overhead)` - Defaults: `wg_min=1280`, `wg_overhead=80`. - -4. **Apply** - - * If `--apply-egress-mtu` is set, apply **effective PMTU** to the egress (unless egress is `wg0`). - * Apply **WireGuard MTU** to `wg0` (or the iface passed via `--wg-if`). - * If `--set-wg-mtu X` is given, it **overrides** the computed value. - ---- - -## Examples (copy & paste) - -### A) Quick WireGuard tuning with peer awareness - -```bash -sudo automtu --auto-pmtu-from-wg -``` - -### B) Manual targets, conservative (min) policy - -```bash -sudo automtu --pmtu-target 46.4.224.77 --pmtu-target 1.1.1.1 -``` - -### C) Apply PMTU on egress + set matching wg0 - -```bash -sudo automtu --pmtu-target 1.1.1.1 --apply-egress-mtu -``` - -### D) Prefer WireGuard as egress (if default route uses WG) - -```bash -sudo automtu --prefer-wg-egress --wg-if wg0 --auto-pmtu-from-wg -``` - -### E) Force a specific wg0 MTU - -```bash -sudo automtu --set-wg-mtu 1372 -``` - -### F) Dry-run any of the above - -```bash -automtu --dry-run --auto-pmtu-from-wg --pmtu-target 1.1.1.1 -``` - ---- - -## Persisting the value in WireGuard - -Runtime changes are **not** permanent. To persist: - -* Either let your automation run this tool before/after `wg-quick up wg0`, **or** -* Add a fixed value in your `wg0` config (`/etc/wireguard/wg0.conf`): - - ```ini - [Interface] - MTU = 1372 - ``` - - > Static MTU is fine if the path is stable. If your route/ISP changes, prefer running this tool. - ---- - -## Notes & Troubleshooting - -* If **all PMTU probes fail**, the tool prints a warning and falls back to the egress MTU (e.g., `1500`) and sets `wg0 = egress - 80`. - Some networks block ICMP “fragmentation needed”; use multiple targets or rely on egress-only. -* You can **override defaults** via flags or environment: - - * `WG_IF=wg0 WG_OVERHEAD=80 WG_MIN=1280 automtu ...` -* The tool **deduplicates targets** and understands IPv4/IPv6 endpoints (e.g., `2a01:...`). +- Applying MTU requires root (unless `--dry-run`). +- PMTU probing can fail if ICMP is blocked; the tool then falls back to egress MTU. +- Runtime MTU changes are not persistent across reboots. diff --git a/Makefile b/Makefile index b1cfa35..589a23c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -PY ?= python3 +PYTHON ?= python3 .PHONY: test install help @@ -9,11 +9,4 @@ help: @echo " make help - this help" test: - $(PY) -m unittest -v test.py - -install: - @echo "Installation is provided via your package manager:" - @echo " pkgmgr install automtu" - @echo "" - @echo "Alternatively, run the tool directly:" - @echo " $(PY) main.py [--options]" + @PYTHONPATH="$(CURDIR)/src" "$(PYTHON)" -m unittest discover -s tests/unit -p "test_*.py" -v diff --git a/README.md b/README.md index dd39c6c..79cc8eb 100644 --- a/README.md +++ b/README.md @@ -1,95 +1,29 @@ -# wg-mtu-auto +# automtu [![GitHub Sponsors](https://img.shields.io/badge/Sponsor-GitHub%20Sponsors-blue?logo=github)](https://github.com/sponsors/kevinveenbirkenbach) [![Patreon](https://img.shields.io/badge/Support-Patreon-orange?logo=patreon)](https://www.patreon.com/c/kevinveenbirkenbach) [![Buy Me a Coffee](https://img.shields.io/badge/Buy%20me%20a%20Coffee-Funding-yellow?logo=buymeacoffee)](https://buymeacoffee.com/kevinveenbirkenbach) [![PayPal](https://img.shields.io/badge/Donate-PayPal-blue?logo=paypal)](https://s.veen.world/paypaldonate) +Auto-detect your egress interface, optionally probe Path MTU (PMTU) using DF-ping, +compute a WireGuard MTU, and apply MTU settings. -Automatically detect the optimal WireGuard MTU by analyzing your local egress interface and optionally probing the Path MTU (PMTU) to one or more remote hosts. -The tool ensures stable and efficient VPN connections by preventing fragmentation and latency caused by mismatched MTU settings. +## Install (editable) +pip install -e . ---- +## Usage -## ✨ Features +Show only (no changes): + automtu --dry-run -- **Automatic Egress Detection** — Finds your primary internet interface automatically. -- **WireGuard MTU Calculation** — Computes `wg0` MTU based on egress MTU minus overhead (default 80 bytes). -- **Optional Path MTU Probing** — Uses ICMP “Don’t Fragment” (`ping -M do`) to find the real usable MTU across network paths. -- **Multi-Target PMTU Support** — Test multiple remote hosts and choose an effective value via policy (`min`, `median`, `max`). -- **Dry-Run Mode** — Simulate changes without applying them. -- **Safe for Automation** — Integrates well with WireGuard systemd services or Ansible setups. +Probe PMTU to targets and apply on egress: + sudo automtu --pmtu-target registry-1.docker.io --apply-egress-mtu ---- +Auto-add WireGuard peer endpoints as targets and apply WG MTU: + sudo automtu --auto-pmtu-from-wg --apply-wg-mtu -## 🚀 Installation +Force WG MTU: + sudo automtu --set-wg-mtu 1372 --apply-wg-mtu -### Option 1 — Using [pkgmgr](https://github.com/kevinveenbirkenbach/package-manager) +Help: + automtu --help -If you use Kevin Veen-Birkenbach’s package manager (`pkgmgr`): - -```bash -pkgmgr install automtu -```` - -This will automatically fetch and install `wg-mtu-auto` system-wide. - -### Option 2 — Run directly from source - -Clone this repository and execute the script manually: - -```bash -git clone https://github.com/kevinveenbirkenbach/wg-mtu-auto.git -cd wg-mtu-auto -sudo python3 main.py --help -``` - ---- - -## 🧩 Usage Examples - -### Basic detection (no PMTU) - -```bash -sudo automtu -``` - -### Specify egress interface and force MTU - -```bash -sudo automtu --egress-if eth0 --force-egress-mtu 1452 -``` - -### Probe multiple PMTU targets (safe policy: `min`) - -```bash -sudo automtu --pmtu-target 46.4.224.77 --pmtu-target 2a01:4f8:2201:4695::2 -``` - -### Choose median or max policy - -```bash -sudo automtu --pmtu-target 46.4.224.77,1.1.1.1 --pmtu-policy median -sudo automtu --pmtu-target 46.4.224.77,1.1.1.1 --pmtu-policy max -``` - -### Dry-run (no system changes) - -```bash -automtu --dry-run -``` - ---- - -## 🧪 Development - -Run unit tests using: - -```bash -make test -``` - -To see installation guidance (does not install anything): - -```bash -make install -``` --- diff --git a/main.py b/main.py deleted file mode 100755 index f9f56fe..0000000 --- a/main.py +++ /dev/null @@ -1,326 +0,0 @@ -#!/usr/bin/env python3 -""" -wg_mtu_auto.py — Auto-detect egress IF, optionally probe Path MTU to one or more targets, -compute the correct WireGuard MTU, and apply it. - -Examples: - sudo ./main.py - sudo ./main.py --force-egress-mtu 1452 - sudo ./main.py --pmtu-target 46.4.224.77 --pmtu-target 2a01:4f8:2201:4695::2 - sudo ./main.py --pmtu-target 46.4.224.77,2a01:4f8:2201:4695::2 --pmtu-policy min - sudo ./main.py --prefer-wg-egress --auto-pmtu-from-wg - ./main.py --dry-run -""" -import argparse -import ipaddress -import os -import pathlib -import re -import statistics -import subprocess -import sys - - -# ----------------- helpers ----------------- - -def run(cmd): # -> str - return subprocess.run( - cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True - ).stdout.strip() - - -def rc(cmd): # -> int - return subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode - - -def exists_iface(iface): # -> bool - return pathlib.Path(f"/sys/class/net/{iface}").exists() - - -def read_mtu(iface): # -> int - with open(f"/sys/class/net/{iface}/mtu", "r") as f: - return int(f.read().strip()) - - -def set_mtu(iface, mtu, dry): - if dry: - print(f"[wg-mtu] DRY-RUN: ip link set mtu {mtu} dev {iface}") - else: - subprocess.run(["ip", "link", "set", "mtu", str(mtu), "dev", iface], check=True) - - -def require_root(dry): - if not dry and os.geteuid() != 0: - print("[wg-mtu][ERROR] Please run as root (sudo) or use --dry-run.", file=sys.stderr) - sys.exit(1) - - -def is_ipv6(addr): # -> bool - try: - return isinstance(ipaddress.ip_address(addr), ipaddress.IPv6Address) - except ValueError: - # best-effort for hostnames (contains ':') - return ":" in addr - - -# ----------------- route & iface selection ----------------- - -def default_route_lines(): - lines = [] - for cmd in (["ip", "-4", "route", "show", "default"], ["ip", "-6", "route", "show", "default"]): - out = run(cmd) - if out: - lines.extend(out.splitlines()) - return lines - - -def get_default_ifaces(ignore_vpn=True): # -> list[str] - devs = [] - for line in default_route_lines(): - m = re.search(r"\bdev\s+(\S+)", line) - if m: - devs.append(m.group(1)) - # fallback via route get - if not devs: - for cmd in (["ip", "route", "get", "1.1.1.1"], ["ip", "-6", "route", "get", "2606:4700:4700::1111"]): - out = run(cmd) - m = re.search(r"\bdev\s+(\S+)", out) - if m: - devs.append(m.group(1)) - uniq = [] - for d in devs: - if not d or d == "lo" or not exists_iface(d): - continue - if ignore_vpn and re.match(r"^(wg|tun)\d*$", d): - continue - if d not in uniq: - uniq.append(d) - return uniq - - -def wg_default_is_active(wg_if: str) -> bool: - # check if any default route is via wg_if - return any(re.search(rf"\bdev\s+{re.escape(wg_if)}\b", line) for line in default_route_lines()) - - -# ----------------- PMTU probing ----------------- - -def ping_ok(payload, target, timeout_s): # -> bool - base = ["ping", "-M", "do", "-c", "1", "-s", str(payload), "-W", str(max(1, int(round(timeout_s))))] - if is_ipv6(target): - base.insert(1, "-6") - return rc(base + [target]) == 0 - - -def probe_pmtu(target, lo_payload=1200, hi_payload=1472, timeout=1.0): # -> int|None - """Binary-search the largest payload that passes with DF. Return Path MTU (payload + hdr) or None. - Header: +28 (IPv4), +48 (IPv6).""" - hdr = 48 if is_ipv6(target) else 28 - # ensure lower bound works; if not, try smaller floors - if not ping_ok(lo_payload, target, timeout): - for p in (1180, 1160, 1140): - if ping_ok(p, target, timeout): - lo_payload = p - break - else: - return None - lo, hi, best = lo_payload, hi_payload, None - while lo <= hi: - mid = (lo + hi) // 2 - if ping_ok(mid, target, timeout): - best = mid - lo = mid + 1 - else: - hi = mid - 1 - return (best + hdr) if best is not None else None - - -def choose_effective(pmtus, policy="min"): # -> int - if not pmtus: - raise ValueError("no PMTUs to choose from") - if policy == "min": - return min(pmtus) - if policy == "max": - return max(pmtus) - if policy == "median": - return int(statistics.median(sorted(pmtus))) - raise ValueError(f"unknown policy {policy}") - - -# ----------------- WireGuard helpers (opt-in) ----------------- - -def wg_is_active(wg_if: str) -> bool: - if not exists_iface(wg_if): - return False - return rc(["wg", "show", wg_if]) == 0 - - -def wg_peer_endpoints(wg_if: str) -> list[str]: - """Return list of peer endpoints (hostnames/IPs) – port stripped.""" - targets = [] - - # 1) Try: wg show endpoints - out = run(["wg", "show", wg_if, "endpoints"]) - for line in out.splitlines(): - # format: \t - parts = line.strip().split() - if len(parts) >= 2 and parts[-1] != "(none)": - ep = parts[-1] # host:port - host = ep.rsplit(":", 1)[0] - # IPv6 endpoint may be like [2001:db8::1]:51820 - host = host.strip("[]") - targets.append(host) - - # 2) Fallback: wg showconf (root may be required) - if not targets: - conf = run(["wg", "showconf", wg_if]) - if conf: - for m in re.finditer(r"^Endpoint\s*=\s*(.+)$", conf, flags=re.MULTILINE): - ep = m.group(1).strip() - host = ep.rsplit(":", 1)[0].strip("[]") - targets.append(host) - - # dedupe & sanity - cleaned = [] - for t in targets: - if t and t not in cleaned: - cleaned.append(t) - return cleaned - - -# ----------------- main ----------------- - -def main(): - ap = argparse.ArgumentParser(description="Compute/apply WireGuard MTU based on egress MTU and optional multi-target PMTU probing.") - ap.add_argument("--egress-if", help="Explicit egress interface (auto-detected if omitted).") - ap.add_argument("--prefer-wg-egress", action="store_true", - help="Allow/consider wg* as egress and prefer it if default route uses wg (default: disabled).") - ap.add_argument("--auto-pmtu-from-wg", action="store_true", - help="Automatically add WireGuard peer endpoints as PMTU targets (default: disabled).") - ap.add_argument("--wg-if", default=os.environ.get("WG_IF", "wg0"), help="WireGuard interface name (default: wg0).") - ap.add_argument("--wg-overhead", type=int, default=int(os.environ.get("WG_OVERHEAD", "80")), help="Bytes of WG overhead to subtract (default: 80).") - ap.add_argument("--wg-min", type=int, default=int(os.environ.get("WG_MIN", "1280")), help="Minimum allowed WG MTU (default: 1280).") - # PMTU (multi-target) - ap.add_argument("--pmtu-target", action="append", help="Target hostname/IP to probe PMTU. Can be given multiple times OR comma-separated.") - ap.add_argument("--pmtu-timeout", type=float, default=1.0, help="Timeout (seconds) per ping probe (default: 1.0).") - ap.add_argument("--pmtu-min-payload", type=int, default=1200, help="Lower bound payload for PMTU search (default: 1200).") - ap.add_argument("--pmtu-max-payload", type=int, default=1472, help="Upper bound payload for PMTU search (default: 1472 ~ 1500-28).") - ap.add_argument("--pmtu-policy", choices=["min", "median", "max"], default="min", - help="How to choose effective PMTU across multiple targets (default: min).") - ap.add_argument("--apply-egress-mtu", action="store_true", - help="Apply the effective Path MTU to the detected egress interface (e.g. eth0).") - - ap.add_argument("--dry-run", action="store_true", help="Show actions without applying changes.") - # NEW: force a specific WireGuard MTU (overrides computed value) - ap.add_argument("--set-wg-mtu", type=int, help="Force a specific MTU to apply on the WireGuard interface (overrides computed value).") - # (legacy / optional) force egress MTU - ap.add_argument("--force-egress-mtu", type=int, help="Force this MTU on the egress interface before computing wg MTU.") - args = ap.parse_args() - - require_root(args.dry_run) - - # Egress detection (wg ignored by default) - if args.egress_if: - egress = args.egress_if - else: - ignore_vpn = not args.prefer_wg_egress - cands = get_default_ifaces(ignore_vpn=ignore_vpn) - # If we allow wg and default route is via wg-if, prefer it first - if args.prefer_wg_egress and wg_is_active(args.wg_if) and wg_default_is_active(args.wg_if): - if args.wg_if in cands: - cands.remove(args.wg_if) - cands.insert(0, args.wg_if) - egress = cands[0] if cands else None - - if not egress: - print("[wg-mtu][ERROR] Could not detect egress interface (use --egress-if).", file=sys.stderr) - sys.exit(2) - if not exists_iface(egress): - print(f"[wg-mtu][ERROR] Interface {egress} does not exist.", file=sys.stderr) - sys.exit(3) - print(f"[wg-mtu] Detected egress interface: {egress}") - - # Egress MTU - if args.prefer_wg_egress and egress == args.wg_if: - print(f"[wg-mtu] Using WireGuard interface {args.wg_if} as egress basis.") - if args.wg_if == egress and not wg_is_active(args.wg_if): - print(f"[wg-mtu][WARN] {args.wg_if} selected as egress but WireGuard is not active.", file=sys.stderr) - - if args.force_egress_mtu: - print(f"[wg-mtu] Forcing egress MTU {args.force_egress_mtu} on {egress}") - set_mtu(egress, args.force_egress_mtu, args.dry_run) - base_mtu = args.force_egress_mtu - else: - base_mtu = read_mtu(egress) - print(f"[wg-mtu] Egress base MTU: {base_mtu}") - - # Build PMTU target list - pmtu_targets = [] - if args.pmtu_target: - for item in args.pmtu_target: - pmtu_targets.extend([x.strip() for x in item.split(",") if x.strip()]) - - if args.auto_pmtu_from_wg: - if wg_is_active(args.wg_if): - wg_targets = wg_peer_endpoints(args.wg_if) - if wg_targets: - print(f"[wg-mtu] Auto-added WG peer endpoints as PMTU targets: {', '.join(wg_targets)}") - pmtu_targets.extend(wg_targets) - else: - print("[wg-mtu] INFO: No WG peer endpoints discovered (wg show/showconf).") - else: - print(f"[wg-mtu] INFO: {args.wg_if} is not active; skipping auto PMTU targets from WG.") - - # Deduplicate PMTU targets - if pmtu_targets: - pmtu_targets = list(dict.fromkeys(pmtu_targets)) - - # PMTU probing - effective_mtu = base_mtu - if pmtu_targets: - good = [] - print(f"[wg-mtu] Probing Path MTU for: {', '.join(pmtu_targets)} (policy={args.pmtu_policy})") - for t in pmtu_targets: - p = probe_pmtu(t, args.pmtu_min_payload, args.pmtu_max_payload, args.pmtu_timeout) - print(f"[wg-mtu] - {t}: {p if p else 'probe failed'}") - if p: - good.append(p) - if good: - chosen = choose_effective(good, args.pmtu_policy) - print(f"[wg-mtu] Selected Path MTU (policy={args.pmtu_policy}): {chosen}") - effective_mtu = min(base_mtu, chosen) - else: - print("[wg-mtu] WARNING: All PMTU probes failed. Falling back to egress MTU.") - - # Optionally apply the effective MTU to the egress interface - if args.apply_egress_mtu: - if egress == args.wg_if: - print(f"[wg-mtu] INFO: Skipping egress MTU apply because egress == {args.wg_if}.") - else: - print(f"[wg-mtu] Applying effective MTU {effective_mtu} to egress {egress}") - set_mtu(egress, effective_mtu, args.dry_run) - - # Compute WG MTU - wg_mtu = max(args.wg_min, effective_mtu - args.wg_overhead) - print(f"[wg-mtu] Computed {args.wg_if} MTU: {wg_mtu} (overhead={args.wg_overhead}, min={args.wg_min})") - - # --- NEW: override with --set-wg-mtu if provided - if args.set_wg_mtu is not None: - if args.set_wg_mtu < args.wg_min: - print(f"[wg-mtu][WARN] --set-wg-mtu {args.set_wg_mtu} is below wg-min {args.wg_min}; clamping to {args.wg_min}.") - args.set_wg_mtu = args.wg_min - wg_mtu = args.set_wg_mtu - print(f"[wg-mtu] Forcing WireGuard MTU (override): {wg_mtu}") - - # Apply - if exists_iface(args.wg_if): - set_mtu(args.wg_if, wg_mtu, args.dry_run) - print(f"[wg-mtu] Applied: {args.wg_if} MTU {wg_mtu}") - else: - print(f"[wg-mtu] NOTE: {args.wg_if} not present yet. Start WireGuard first, then re-run this script.") - - print(f"[wg-mtu] Done. Summary: egress={egress} mtu={base_mtu}, effective_mtu={effective_mtu}, {args.wg_if}_mtu={wg_mtu}") - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8f953e7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,23 @@ +[build-system] +requires = ["setuptools>=69"] +build-backend = "setuptools.build_meta" + +[project] +name = "automtu" +version = "0.1.0" +description = "Auto-detect egress interface, probe Path MTU, and apply MTU (WireGuard/egress)." +readme = "README.md" +requires-python = ">=3.10" +license = { text = "MIT" } +authors = [{ name = "Kevin Veen-Birkenbach" }] +keywords = ["mtu", "wireguard", "pmtu", "networking"] +dependencies = [] + +[project.scripts] +automtu = "automtu.__main__:main" + +[tool.setuptools] +package-dir = { "" = "src" } + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/automtu/__init__.py b/src/automtu/__init__.py new file mode 100644 index 0000000..07c5de9 --- /dev/null +++ b/src/automtu/__init__.py @@ -0,0 +1,2 @@ +__all__ = ["__version__"] +__version__ = "0.1.0" diff --git a/src/automtu/__main__.py b/src/automtu/__main__.py new file mode 100644 index 0000000..038971c --- /dev/null +++ b/src/automtu/__main__.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from .cli import build_parser +from .core import run_automtu + + +def main() -> int: + args = build_parser().parse_args() + return run_automtu(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/automtu/cli.py b/src/automtu/cli.py new file mode 100644 index 0000000..cc78033 --- /dev/null +++ b/src/automtu/cli.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import argparse +import os + + +def build_parser() -> argparse.ArgumentParser: + ap = argparse.ArgumentParser( + prog="automtu", + description="Probe Path MTU and compute/apply MTU for egress and/or WireGuard.", + ) + + ap.add_argument( + "--egress-if", help="Explicit egress interface (auto-detected if omitted)." + ) + ap.add_argument( + "--prefer-wg-egress", + action="store_true", + help="Allow wg* as egress and prefer it if default route uses wg (default: off).", + ) + + ap.add_argument( + "--pmtu-target", + action="append", + help="Target hostname/IP to probe PMTU. Repeatable or comma-separated.", + ) + ap.add_argument( + "--pmtu-timeout", + type=float, + default=1.0, + help="Ping timeout seconds (default: 1.0).", + ) + ap.add_argument( + "--pmtu-min-payload", + type=int, + default=1200, + help="PMTU lower payload bound (default: 1200).", + ) + ap.add_argument( + "--pmtu-max-payload", + type=int, + default=1472, + help="PMTU upper payload bound (default: 1472).", + ) + ap.add_argument( + "--pmtu-policy", + choices=["min", "median", "max"], + default="min", + help="Aggregate PMTU across targets (default: min).", + ) + + ap.add_argument( + "--apply-egress-mtu", + action="store_true", + help="Apply effective MTU to egress interface.", + ) + ap.add_argument( + "--apply-wg-mtu", action="store_true", help="Apply MTU to WireGuard interface." + ) + + ap.add_argument( + "--wg-if", + default=os.environ.get("WG_IF", "wg0"), + help="WireGuard interface (default: wg0).", + ) + ap.add_argument( + "--wg-overhead", + type=int, + default=int(os.environ.get("WG_OVERHEAD", "80")), + help="WG overhead.", + ) + ap.add_argument( + "--wg-min", + type=int, + default=int(os.environ.get("WG_MIN", "1280")), + help="Minimum WG MTU.", + ) + ap.add_argument( + "--auto-pmtu-from-wg", + action="store_true", + help="Add WG peer endpoints as PMTU targets.", + ) + ap.add_argument("--set-wg-mtu", type=int, help="Force MTU for WireGuard interface.") + + ap.add_argument( + "--force-egress-mtu", + type=int, + help="Force MTU on the egress interface before computing.", + ) + ap.add_argument( + "--dry-run", action="store_true", help="Show actions without applying changes." + ) + return ap diff --git a/src/automtu/core.py b/src/automtu/core.py new file mode 100644 index 0000000..2245616 --- /dev/null +++ b/src/automtu/core.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +import statistics +import sys +from dataclasses import dataclass +from typing import Iterable, Optional + +from .net import ( + default_route_uses_iface, + detect_egress_iface, + iface_exists, + read_iface_mtu, + require_root, + set_iface_mtu, +) +from .pmtu import probe_pmtu +from .wg import wg_is_active, wg_peer_endpoints + + +@dataclass(frozen=True) +class Result: + egress: str + base_mtu: int + effective_mtu: int + wg_if: str + wg_mtu: int + + +def _split_targets(items: Optional[list[str]]) -> list[str]: + raw: list[str] = [] + for item in items or []: + raw.extend([x.strip() for x in item.split(",") if x.strip()]) + return list(dict.fromkeys(raw)) + + +def _choose(values: Iterable[int], policy: str) -> int: + vals = sorted(values) + if policy == "min": + return vals[0] + if policy == "max": + return vals[-1] + if policy == "median": + return int(statistics.median(vals)) + raise ValueError(f"unknown policy: {policy}") + + +def run_automtu(args) -> int: + require_root(args.dry_run) + + egress = args.egress_if or detect_egress_iface(ignore_vpn=not args.prefer_wg_egress) + if not egress: + print( + "[automtu][ERROR] Could not detect egress interface (use --egress-if).", + file=sys.stderr, + ) + return 2 + if not iface_exists(egress): + print(f"[automtu][ERROR] Interface {egress} does not exist.", file=sys.stderr) + return 3 + + if ( + args.egress_if is None + and args.prefer_wg_egress + and iface_exists(args.wg_if) + and wg_is_active(args.wg_if) + and default_route_uses_iface(args.wg_if) + ): + egress = args.wg_if + print(f"[automtu] Using WireGuard interface {args.wg_if} as egress basis.") + + print(f"[automtu] Detected egress interface: {egress}") + + if args.force_egress_mtu: + print(f"[automtu] Forcing egress MTU {args.force_egress_mtu} on {egress}") + set_iface_mtu(egress, args.force_egress_mtu, args.dry_run) + base_mtu = args.force_egress_mtu + else: + base_mtu = read_iface_mtu(egress) + print(f"[automtu] Egress base MTU: {base_mtu}") + + targets = _split_targets(args.pmtu_target) + if args.auto_pmtu_from_wg: + if wg_is_active(args.wg_if): + peers = wg_peer_endpoints(args.wg_if) + if peers: + print( + f"[automtu] Auto-added WG peer endpoints as PMTU targets: {', '.join(peers)}" + ) + targets = list(dict.fromkeys([*targets, *peers])) + else: + print( + f"[automtu] INFO: {args.wg_if} not active; skipping auto PMTU targets." + ) + + effective_mtu = base_mtu + if targets: + print( + f"[automtu] Probing Path MTU for: {', '.join(targets)} (policy={args.pmtu_policy})" + ) + good: list[int] = [] + for t in targets: + p = probe_pmtu( + t, args.pmtu_min_payload, args.pmtu_max_payload, args.pmtu_timeout + ) + print(f"[automtu] - {t}: {p if p else 'probe failed'}") + if p: + good.append(p) + + if good: + chosen = _choose(good, args.pmtu_policy) + print(f"[automtu] Selected Path MTU (policy={args.pmtu_policy}): {chosen}") + effective_mtu = min(base_mtu, chosen) + else: + print( + "[automtu] WARNING: All PMTU probes failed. Falling back to egress MTU." + ) + + if args.apply_egress_mtu: + if egress == args.wg_if: + print( + f"[automtu] INFO: Skipping egress MTU apply because egress == {args.wg_if}." + ) + else: + print( + f"[automtu] Applying effective MTU {effective_mtu} to egress {egress}" + ) + set_iface_mtu(egress, effective_mtu, args.dry_run) + + wg_mtu = max(int(args.wg_min), int(effective_mtu) - int(args.wg_overhead)) + print( + f"[automtu] Computed {args.wg_if} MTU: {wg_mtu} (overhead={args.wg_overhead}, min={args.wg_min})" + ) + + if args.set_wg_mtu is not None: + forced = max(int(args.wg_min), int(args.set_wg_mtu)) + if forced != int(args.set_wg_mtu): + print( + f"[automtu][WARN] --set-wg-mtu clamped to {forced} (wg-min={args.wg_min})." + ) + wg_mtu = forced + print(f"[automtu] Forcing WireGuard MTU (override): {wg_mtu}") + + if args.apply_wg_mtu: + if iface_exists(args.wg_if): + set_iface_mtu(args.wg_if, wg_mtu, args.dry_run) + print(f"[automtu] Applied: {args.wg_if} MTU {wg_mtu}") + else: + print( + f"[automtu] NOTE: {args.wg_if} not present yet. Start WireGuard first, then re-run." + ) + else: + print("[automtu] INFO: Not applying WireGuard MTU (use --apply-wg-mtu).") + + _ = Result( + egress=egress, + base_mtu=base_mtu, + effective_mtu=effective_mtu, + wg_if=args.wg_if, + wg_mtu=wg_mtu, + ) + return 0 diff --git a/src/automtu/net.py b/src/automtu/net.py new file mode 100644 index 0000000..b7bff7d --- /dev/null +++ b/src/automtu/net.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import os +import pathlib +import re +import subprocess +import sys +from typing import Optional + + +def _run(cmd: list[str]) -> str: + return subprocess.run( + cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True + ).stdout.strip() + + +def iface_exists(iface: str) -> bool: + return pathlib.Path(f"/sys/class/net/{iface}").exists() + + +def read_iface_mtu(iface: str) -> int: + return int(pathlib.Path(f"/sys/class/net/{iface}/mtu").read_text().strip()) + + +def set_iface_mtu(iface: str, mtu: int, dry: bool) -> None: + if dry: + print(f"[automtu] DRY-RUN: ip link set mtu {mtu} dev {iface}") + return + subprocess.run(["ip", "link", "set", "mtu", str(mtu), "dev", iface], check=True) + + +def require_root(dry: bool) -> None: + if not dry and os.geteuid() != 0: + print( + "[automtu][ERROR] Please run as root (sudo) or use --dry-run.", + file=sys.stderr, + ) + raise SystemExit(1) + + +def detect_egress_iface(ignore_vpn: bool = True) -> Optional[str]: + devs: list[str] = [] + for cmd in ( + ["ip", "-4", "route", "show", "default"], + ["ip", "-6", "route", "show", "default"], + ): + for line in _run(cmd).splitlines(): + m = re.search(r"\bdev\s+(\S+)", line) + if m: + devs.append(m.group(1)) + + if not devs: + for cmd in ( + ["ip", "route", "get", "1.1.1.1"], + ["ip", "-6", "route", "get", "2606:4700:4700::1111"], + ): + m = re.search(r"\bdev\s+(\S+)", _run(cmd)) + if m: + devs.append(m.group(1)) + + seen: set[str] = set() + for d in devs: + if not d or d == "lo" or not iface_exists(d): + continue + if ignore_vpn and re.match(r"^(wg|tun)\d*$", d): + continue + if d in seen: + continue + seen.add(d) + return d + return None + + +def default_route_uses_iface(iface: str) -> bool: + pat = rf"\bdev\s+{re.escape(iface)}\b" + for cmd in ( + ["ip", "-4", "route", "show", "default"], + ["ip", "-6", "route", "show", "default"], + ): + if re.search(pat, _run(cmd)): + return True + return False diff --git a/src/automtu/pmtu.py b/src/automtu/pmtu.py new file mode 100644 index 0000000..6a365cc --- /dev/null +++ b/src/automtu/pmtu.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import ipaddress +import subprocess +from typing import Optional + + +def _is_ipv6(target: str) -> bool: + try: + return isinstance(ipaddress.ip_address(target), ipaddress.IPv6Address) + except ValueError: + return ":" in target # best-effort for hostnames + + +def _rc(cmd: list[str]) -> int: + return subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ).returncode + + +def _ping_ok(payload: int, target: str, timeout_s: float) -> bool: + cmd = [ + "ping", + "-M", + "do", + "-c", + "1", + "-s", + str(payload), + "-W", + str(max(1, int(round(timeout_s)))), + ] + if _is_ipv6(target): + cmd.insert(1, "-6") + return _rc(cmd + [target]) == 0 + + +def probe_pmtu( + target: str, lo_payload: int = 1200, hi_payload: int = 1472, timeout: float = 1.0 +) -> Optional[int]: + hdr = 48 if _is_ipv6(target) else 28 + + if not _ping_ok(lo_payload, target, timeout): + for p in (1180, 1160, 1140): + if _ping_ok(p, target, timeout): + lo_payload = p + break + else: + return None + + lo, hi, best = lo_payload, hi_payload, None + while lo <= hi: + mid = (lo + hi) // 2 + if _ping_ok(mid, target, timeout): + best = mid + lo = mid + 1 + else: + hi = mid - 1 + + return (best + hdr) if best is not None else None diff --git a/src/automtu/wg.py b/src/automtu/wg.py new file mode 100644 index 0000000..61f8374 --- /dev/null +++ b/src/automtu/wg.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import re +import subprocess +from typing import List + +from .net import iface_exists + + +def _run(cmd: list[str]) -> str: + return subprocess.run( + cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True + ).stdout.strip() + + +def _rc(cmd: list[str]) -> int: + return subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ).returncode + + +def wg_is_active(wg_if: str) -> bool: + return iface_exists(wg_if) and _rc(["wg", "show", wg_if]) == 0 + + +def wg_peer_endpoints(wg_if: str) -> List[str]: + targets: list[str] = [] + + out = _run(["wg", "show", wg_if, "endpoints"]) + for line in out.splitlines(): + parts = line.strip().split() + if len(parts) >= 2 and parts[-1] != "(none)": + ep = parts[-1] + host = ep.rsplit(":", 1)[0].strip("[]") + targets.append(host) + + if not targets: + conf = _run(["wg", "showconf", wg_if]) + for m in re.finditer(r"^Endpoint\s*=\s*(.+)$", conf, flags=re.MULTILINE): + ep = m.group(1).strip() + host = ep.rsplit(":", 1)[0].strip("[]") + targets.append(host) + + dedup: list[str] = [] + for t in targets: + if t and t not in dedup: + dedup.append(t) + return dedup diff --git a/test.py b/test.py deleted file mode 100644 index 1bf6288..0000000 --- a/test.py +++ /dev/null @@ -1,287 +0,0 @@ -import io -import sys -import unittest -from unittest.mock import patch -from contextlib import redirect_stdout - -# Import the script as a module -import main as automtu - - -class TestWgMtuAutoExtended(unittest.TestCase): - # ---------- Baseline behavior (unchanged) ---------- - - @patch("main.set_mtu") - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) - @patch("main.require_root", return_value=None) - def test_no_pmtu_uses_egress_minus_overhead( - self, _req_root, mock_get_def, _exists, _read_mtu, mock_set_mtu - ): - argv = ["main.py", "--dry-run"] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - self.assertIn("Detected egress interface: eth0", out) - self.assertIn("Egress base MTU: 1500", out) - self.assertIn("Computed wg0 MTU: 1420", out) - mock_set_mtu.assert_any_call("wg0", 1420, True) - # get_default_ifaces should be called with ignore_vpn=True by default - mock_get_def.assert_called_with(ignore_vpn=True) - - # ---------- prefer-wg-egress selection ---------- - - @patch("main.wg_default_is_active", return_value=True) - @patch("main.wg_is_active", return_value=True) - @patch("main.set_mtu") - @patch("main.read_mtu", return_value=1420) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0", "wg0"]) - @patch("main.require_root", return_value=None) - def test_prefer_wg_egress_picks_wg0_when_default_route_via_wg( - self, _req_root, mock_get_def, _exists, _read_mtu, _set_mtu, _wg_is_active, _wg_def_active - ): - argv = ["main.py", "--dry-run", "--prefer-wg-egress", "--wg-if", "wg0"] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - # When prefer-wg is set AND wg default route is active, wg0 should be chosen as egress - self.assertIn("Detected egress interface: wg0", out) - self.assertIn("Using WireGuard interface wg0 as egress basis.", out) - # Computed MTU: base 1420 - 80 = 1340 (clamped by min=1280) - self.assertIn("Computed wg0 MTU: 1340", out) - # get_default_ifaces should be called with ignore_vpn=False (because prefer-wg) - mock_get_def.assert_called_with(ignore_vpn=False) - - # ---------- auto-pmtu-from-wg adds peer endpoints ---------- - - @patch("main.wg_peer_endpoints", return_value=["46.4.224.77", "2a01:db8::1"]) - @patch("main.wg_is_active", return_value=True) - @patch("main.probe_pmtu", side_effect=[1452, 1420]) # results for two peers - @patch("main.set_mtu") - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) - @patch("main.require_root", return_value=None) - def test_auto_pmtu_from_wg_adds_targets_and_uses_min_policy( - self, _req_root, _get_def, _exists, _read_mtu, _set_mtu, _probe_pmtu, _wg_active, _wg_peers - ): - argv = ["main.py", "--dry-run", "--auto-pmtu-from-wg", "--wg-if", "wg0"] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - # Confirm WG peers were added - self.assertIn("Auto-added WG peer endpoints as PMTU targets: 46.4.224.77, 2a01:db8::1", out) - # The policy default is 'min', so chosen PMTU should be 1420 - self.assertIn("Selected Path MTU (policy=min): 1420", out) - # Computed wg0 MTU: 1420 - 80 = 1340 - self.assertIn("Computed wg0 MTU: 1340", out) - # Ensure probe was called twice (for both peers) - self.assertEqual(_probe_pmtu.call_count, 2) - - # ---------- manual PMTU still works with prefer-wg-egress ---------- - - @patch("main.wg_default_is_active", return_value=True) - @patch("main.wg_is_active", return_value=True) - @patch("main.probe_pmtu", side_effect=[1472, 1452, 1500]) - @patch("main.set_mtu") - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) - @patch("main.require_root", return_value=None) - def test_prefer_wg_egress_with_manual_targets_and_median_policy( - self, _req_root, _get_def, _exists, _read_mtu, _set_mtu, _probe_pmtu, _wg_is_active, _wg_def_active - ): - argv = [ - "main.py", "--dry-run", - "--prefer-wg-egress", "--wg-if", "wg0", - "--pmtu-target", "a", "--pmtu-target", "b", "--pmtu-target", "c", - "--pmtu-policy", "median" - ] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - # As default route via wg is active, wg0 should be used - self.assertIn("Detected egress interface: wg0", out) - # PMTU values: 1472, 1452, 1500 -> median = 1472 - self.assertIn("Selected Path MTU (policy=median): 1472", out) - # Computed WG MTU: 1472 - 80 = 1392 - self.assertIn("Computed wg0 MTU: 1392", out) - self.assertEqual(_probe_pmtu.call_count, 3) - - # ---------- PMTU all fail fallback ---------- - - @patch("main.set_mtu") - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) - @patch("main.require_root", return_value=None) - def test_pmtu_all_fail_falls_back_to_base( - self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu - ): - with patch("main.probe_pmtu", side_effect=[None, None]): - argv = ["main.py", "--dry-run", "--pmtu-target", "bad1", "--pmtu-target", "bad2"] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - self.assertIn("WARNING: All PMTU probes failed. Falling back to egress MTU.", out) - self.assertIn("Computed wg0 MTU: 1420", out) # 1500 - 80 - mock_set_mtu.assert_any_call("wg0", 1420, True) - - # ---------- NEW: --set-wg-mtu overrides computed ---------- - - @patch("main.set_mtu") - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) - @patch("main.require_root", return_value=None) - def test_force_set_wg_mtu_overrides_computed( - self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu - ): - """ - --set-wg-mtu must override the computed value. - Base=1500 -> computed 1420 (1500-80), but we force 1300. - """ - argv = ["main.py", "--dry-run", "--set-wg-mtu", "1300"] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - # Computation is printed first - self.assertIn("Computed wg0 MTU: 1420", out) - # Then override message appears and applied value is 1300 - self.assertIn("Forcing WireGuard MTU (override): 1300", out) - mock_set_mtu.assert_any_call("wg0", 1300, True) - - # also test clamping below wg-min - argv2 = ["main.py", "--dry-run", "--set-wg-mtu", "1200"] # below default wg_min=1280 - with patch.object(sys, "argv", argv2): - out2 = io.StringIO() - with redirect_stdout(out2): - automtu.main() - s = out2.getvalue() - self.assertIn("[wg-mtu][WARN] --set-wg-mtu 1200 is below wg-min 1280; clamping to 1280.", s) - self.assertIn("Forcing WireGuard MTU (override): 1280", s) - mock_set_mtu.assert_any_call("wg0", 1280, True) - - # ---------- --apply-egress-mtu: setzt egress vor wg ---------- - - @patch("main.set_mtu") - @patch("main.probe_pmtu", return_value=1452) - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) - @patch("main.require_root", return_value=None) - def test_apply_egress_mtu_sets_egress_then_wg( - self, _req_root, _get_def, _exists, _read_mtu, _probe_pmtu, mock_set_mtu - ): - """ - --apply-egress-mtu setzt egress=eth0 auf effective MTU (1452) und danach wg0 auf 1452-80=1372. - Reihenfolge der set_mtu-Aufrufe: erst eth0, dann wg0. - """ - argv = ["main.py", "--dry-run", "--apply-egress-mtu", "--pmtu-target", "46.4.224.77"] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - self.assertIn("Applying effective MTU 1452 to egress eth0", out) - self.assertIn("Computed wg0 MTU: 1372", out) - - # Call order prüfen - calls = [ - unittest.mock.call("eth0", 1452, True), # egress zuerst - unittest.mock.call("wg0", 1372, True), # dann wg0 - ] - mock_set_mtu.assert_has_calls(calls, any_order=False) - self.assertEqual(mock_set_mtu.call_count, 2) - - # ---------- --apply-egress-mtu: egress == wg_if -> skip apply auf egress ---------- - - @patch("main.wg_default_is_active", return_value=True) - @patch("main.wg_is_active", return_value=True) - @patch("main.set_mtu") - @patch("main.probe_pmtu", return_value=1500) - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["wg0"]) # wg0 wird als egress gewählt - @patch("main.require_root", return_value=None) - def test_apply_egress_mtu_skips_when_egress_is_wg( - self, _req_root, _get_def, _exists, _read_mtu, _probe, mock_set_mtu, _wg_active, _wg_def_active - ): - """ - Wenn egress == wg0, soll das Skript den egress-Apply überspringen, aber wg0 ganz normal setzen. - """ - argv = ["main.py", "--dry-run", "--apply-egress-mtu", "--prefer-wg-egress", "--wg-if", "wg0"] - with patch.object(sys, "argv", argv): - out = io.StringIO() - with redirect_stdout(out): - automtu.main() - s = out.getvalue() - - self.assertIn("Detected egress interface: wg0", s) - self.assertIn("Skipping egress MTU apply because egress == wg0", s) - # wg0 wird trotzdem gesetzt (1500-80=1420) - mock_set_mtu.assert_any_call("wg0", 1420, True) - # Es darf nur ein set_mtu-Aufruf erfolgen (kein separater egress-Apply) - self.assertEqual(mock_set_mtu.call_count, 1) - - # ---------- --apply-egress-mtu plus --set-wg-mtu override ---------- - - @patch("main.set_mtu") - @patch("main.probe_pmtu", return_value=1452) - @patch("main.read_mtu", return_value=1500) - @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) - @patch("main.require_root", return_value=None) - def test_apply_egress_mtu_with_forced_wg_override( - self, _req_root, _get_def, _exists, _read_mtu, _probe_pmtu, mock_set_mtu - ): - """ - Egress wird auf 1452 gesetzt, aber wg0 wird mit --set-wg-mtu (z. B. 1300) überschrieben, - unabhängig vom berechneten Wert (1372). - """ - argv = [ - "main.py", "--dry-run", - "--apply-egress-mtu", - "--pmtu-target", "1.1.1.1", - "--set-wg-mtu", "1300", - ] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() - - out = buf.getvalue() - self.assertIn("Applying effective MTU 1452 to egress eth0", out) - self.assertIn("Computed wg0 MTU: 1372", out) # erst berechnet - self.assertIn("Forcing WireGuard MTU (override): 1300", out) # dann überschrieben - - calls = [ - unittest.mock.call("eth0", 1452, True), - unittest.mock.call("wg0", 1300, True), - ] - mock_set_mtu.assert_has_calls(calls, any_order=False) - self.assertEqual(mock_set_mtu.call_count, 2) - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test___init__.py b/tests/unit/test___init__.py new file mode 100644 index 0000000..d476ab0 --- /dev/null +++ b/tests/unit/test___init__.py @@ -0,0 +1,14 @@ +import unittest + +import automtu + + +class TestInit(unittest.TestCase): + def test_version_is_exposed(self) -> None: + self.assertTrue(hasattr(automtu, "__version__")) + self.assertIsInstance(automtu.__version__, str) + self.assertRegex(automtu.__version__, r"^\d+\.\d+\.\d+$") + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/unit/test___main__.py b/tests/unit/test___main__.py new file mode 100644 index 0000000..2c60383 --- /dev/null +++ b/tests/unit/test___main__.py @@ -0,0 +1,26 @@ +import unittest +from unittest.mock import Mock, patch + +import automtu.__main__ as entry + + +class TestMain(unittest.TestCase): + def test_main_calls_parser_and_core(self) -> None: + fake_args = object() + fake_parser = Mock() + fake_parser.parse_args.return_value = fake_args + + with ( + patch("automtu.__main__.build_parser", return_value=fake_parser) as p_build, + patch("automtu.__main__.run_automtu", return_value=0) as p_run, + ): + rc = entry.main() + + self.assertEqual(rc, 0) + p_build.assert_called_once_with() + fake_parser.parse_args.assert_called_once_with() + p_run.assert_called_once_with(fake_args) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py new file mode 100644 index 0000000..b9f8d47 --- /dev/null +++ b/tests/unit/test_cli.py @@ -0,0 +1,62 @@ +import unittest + +from automtu.cli import build_parser + + +class TestCli(unittest.TestCase): + def test_build_parser_accepts_known_args(self) -> None: + p = build_parser() + + # Parse only; no side effects. + args = p.parse_args( + [ + "--egress-if", + "eth0", + "--prefer-wg-egress", + "--pmtu-target", + "1.1.1.1,8.8.8.8", + "--pmtu-timeout", + "2.0", + "--pmtu-min-payload", + "1200", + "--pmtu-max-payload", + "1472", + "--pmtu-policy", + "median", + "--apply-egress-mtu", + "--apply-wg-mtu", + "--wg-if", + "wg0", + "--wg-overhead", + "80", + "--wg-min", + "1280", + "--auto-pmtu-from-wg", + "--set-wg-mtu", + "1372", + "--force-egress-mtu", + "1452", + "--dry-run", + ] + ) + + self.assertEqual(args.egress_if, "eth0") + self.assertTrue(args.prefer_wg_egress) + self.assertEqual(args.pmtu_target, ["1.1.1.1,8.8.8.8"]) + self.assertEqual(args.pmtu_timeout, 2.0) + self.assertEqual(args.pmtu_min_payload, 1200) + self.assertEqual(args.pmtu_max_payload, 1472) + self.assertEqual(args.pmtu_policy, "median") + self.assertTrue(args.apply_egress_mtu) + self.assertTrue(args.apply_wg_mtu) + self.assertEqual(args.wg_if, "wg0") + self.assertEqual(args.wg_overhead, 80) + self.assertEqual(args.wg_min, 1280) + self.assertTrue(args.auto_pmtu_from_wg) + self.assertEqual(args.set_wg_mtu, 1372) + self.assertEqual(args.force_egress_mtu, 1452) + self.assertTrue(args.dry_run) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py new file mode 100644 index 0000000..af5e03b --- /dev/null +++ b/tests/unit/test_core.py @@ -0,0 +1,94 @@ +import io +import unittest +from contextlib import redirect_stdout +from types import SimpleNamespace +from unittest.mock import patch + +from automtu.core import run_automtu + + +class TestCore(unittest.TestCase): + def test_run_automtu_happy_path_all_mocked(self) -> None: + args = SimpleNamespace( + dry_run=True, + egress_if=None, + prefer_wg_egress=False, + force_egress_mtu=None, + pmtu_target=["1.1.1.1,8.8.8.8"], + auto_pmtu_from_wg=False, + pmtu_min_payload=1200, + pmtu_max_payload=1472, + pmtu_timeout=1.0, + pmtu_policy="min", + apply_egress_mtu=True, + apply_wg_mtu=True, + wg_if="wg0", + wg_overhead=80, + wg_min=1280, + set_wg_mtu=None, + ) + + # PMTU probes: 1452 and 1500 -> min policy => 1452, effective=min(base(1500),1452)=1452 + # wg_mtu = 1452-80 = 1372 + with ( + patch("automtu.core.require_root", return_value=None), + patch("automtu.core.detect_egress_iface", return_value="eth0"), + patch("automtu.core.iface_exists", return_value=True), + patch("automtu.core.read_iface_mtu", return_value=1500), + patch("automtu.core.probe_pmtu", side_effect=[1452, 1500]), + patch("automtu.core.set_iface_mtu") as mock_set, + patch("automtu.core.wg_is_active", return_value=False), + patch("automtu.core.wg_peer_endpoints", return_value=[]), + patch("automtu.core.default_route_uses_iface", return_value=False), + ): + buf = io.StringIO() + with redirect_stdout(buf): + rc = run_automtu(args) + + self.assertEqual(rc, 0) + s = buf.getvalue() + self.assertIn("Detected egress interface: eth0", s) + self.assertIn("Egress base MTU: 1500", s) + self.assertIn("Selected Path MTU (policy=min): 1452", s) + self.assertIn("Computed wg0 MTU: 1372", s) + + # apply egress and wg are both true -> two calls in order + mock_set.assert_any_call("eth0", 1452, True) + mock_set.assert_any_call("wg0", 1372, True) + + def test_run_automtu_does_not_apply_wg_without_flag(self) -> None: + args = SimpleNamespace( + dry_run=True, + egress_if="eth0", + prefer_wg_egress=False, + force_egress_mtu=None, + pmtu_target=None, + auto_pmtu_from_wg=False, + pmtu_min_payload=1200, + pmtu_max_payload=1472, + pmtu_timeout=1.0, + pmtu_policy="min", + apply_egress_mtu=False, + apply_wg_mtu=False, + wg_if="wg0", + wg_overhead=80, + wg_min=1280, + set_wg_mtu=None, + ) + + with ( + patch("automtu.core.require_root", return_value=None), + patch("automtu.core.iface_exists", return_value=True), + patch("automtu.core.read_iface_mtu", return_value=1500), + patch("automtu.core.set_iface_mtu") as mock_set, + ): + buf = io.StringIO() + with redirect_stdout(buf): + rc = run_automtu(args) + + self.assertEqual(rc, 0) + mock_set.assert_not_called() + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/unit/test_net.py b/tests/unit/test_net.py new file mode 100644 index 0000000..8575094 --- /dev/null +++ b/tests/unit/test_net.py @@ -0,0 +1,49 @@ +import unittest +from unittest.mock import patch + +import automtu.net as net + + +class TestNet(unittest.TestCase): + def test_detect_egress_iface_parses_default_route_and_ignores_vpn(self) -> None: + # Simulate: ip route show default -> dev wg0 first, then eth0 + ip4_default = ( + "default via 10.0.0.1 dev wg0 proto dhcp src 10.0.0.2 metric 100\n" + ) + ip6_default = "default via fe80::1 dev eth0 proto ra metric 100\n" + + def fake_run(cmd: list[str]) -> str: + if cmd[:4] == ["ip", "-4", "route", "show"]: + return ip4_default.strip() + if cmd[:4] == ["ip", "-6", "route", "show"]: + return ip6_default.strip() + return "" + + with ( + patch("automtu.net._run", side_effect=fake_run), + patch("automtu.net.iface_exists", return_value=True), + ): + # ignore_vpn=True -> should skip wg0 and pick eth0 + self.assertEqual(net.detect_egress_iface(ignore_vpn=True), "eth0") + + # ignore_vpn=False -> first seen is wg0 + self.assertEqual(net.detect_egress_iface(ignore_vpn=False), "wg0") + + def test_default_route_uses_iface_true_false(self) -> None: + ip4_default = "default via 10.0.0.1 dev eth0\n" + ip6_default = "" + + def fake_run(cmd: list[str]) -> str: + if cmd[:4] == ["ip", "-4", "route", "show"]: + return ip4_default.strip() + if cmd[:4] == ["ip", "-6", "route", "show"]: + return ip6_default.strip() + return "" + + with patch("automtu.net._run", side_effect=fake_run): + self.assertTrue(net.default_route_uses_iface("eth0")) + self.assertFalse(net.default_route_uses_iface("wg0")) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/unit/test_pmtu.py b/tests/unit/test_pmtu.py new file mode 100644 index 0000000..ddafc05 --- /dev/null +++ b/tests/unit/test_pmtu.py @@ -0,0 +1,36 @@ +import unittest +from unittest.mock import patch + +import automtu.pmtu as pmtu + + +class TestPmtu(unittest.TestCase): + def test_probe_pmtu_binary_search_and_hdr_addition(self) -> None: + # Mock _is_ipv6 -> IPv4, so hdr = 28. + # Mock _ping_ok so that payload <= 1400 works, >1400 fails. + def fake_ping_ok(payload: int, target: str, timeout_s: float) -> bool: + return payload <= 1400 + + with ( + patch("automtu.pmtu._is_ipv6", return_value=False), + patch("automtu.pmtu._ping_ok", side_effect=fake_ping_ok), + ): + # lo=1200 works, hi=1472 partially works -> best = 1400 -> mtu = 1400+28 = 1428 + mtu = pmtu.probe_pmtu( + "1.1.1.1", lo_payload=1200, hi_payload=1472, timeout=1.0 + ) + self.assertEqual(mtu, 1428) + + def test_probe_pmtu_returns_none_if_even_floor_fails(self) -> None: + with ( + patch("automtu.pmtu._is_ipv6", return_value=False), + patch("automtu.pmtu._ping_ok", return_value=False), + ): + mtu = pmtu.probe_pmtu( + "1.1.1.1", lo_payload=1200, hi_payload=1472, timeout=1.0 + ) + self.assertIsNone(mtu) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/unit/test_wg.py b/tests/unit/test_wg.py new file mode 100644 index 0000000..d858bf5 --- /dev/null +++ b/tests/unit/test_wg.py @@ -0,0 +1,58 @@ +import unittest +from unittest.mock import patch + +import automtu.wg as wg + + +class TestWg(unittest.TestCase): + def test_wg_peer_endpoints_from_wg_show(self) -> None: + # wg show wg0 endpoints output + out = "abcde12345\t46.4.224.77:51820\nfffff00000\t[2a01:db8::1]:51820\n" + + with ( + patch( + "automtu.wg._run", + side_effect=lambda cmd: out if cmd[:3] == ["wg", "show", "wg0"] else "", + ), + patch("automtu.wg.iface_exists", return_value=True), + patch("automtu.wg._rc", return_value=0), + ): + eps = wg.wg_peer_endpoints("wg0") + + self.assertEqual(eps, ["46.4.224.77", "2a01:db8::1"]) + + def test_wg_peer_endpoints_fallback_showconf(self) -> None: + show_endpoints_empty = "" + showconf = ( + "[Interface]\n" + "PrivateKey = x\n" + "[Peer]\n" + "Endpoint = 46.4.224.77:51820\n" + "[Peer]\n" + "Endpoint = [2a01:db8::1]:51820\n" + ) + + def fake_run(cmd: list[str]) -> str: + if cmd == ["wg", "show", "wg0", "endpoints"]: + return show_endpoints_empty + if cmd == ["wg", "showconf", "wg0"]: + return showconf + return "" + + with patch("automtu.wg._run", side_effect=fake_run): + eps = wg.wg_peer_endpoints("wg0") + + self.assertEqual(eps, ["46.4.224.77", "2a01:db8::1"]) + + def test_wg_is_active_uses_iface_exists_and_wg_show_rc(self) -> None: + with ( + patch("automtu.wg.iface_exists", return_value=True), + patch("automtu.wg._rc", return_value=0), + ): + self.assertTrue(wg.wg_is_active("wg0")) + with patch("automtu.wg.iface_exists", return_value=False): + self.assertFalse(wg.wg_is_active("wg0")) + + +if __name__ == "__main__": + unittest.main(verbosity=2)