refactor: convert script to automtu package with CI workflow
https://chatgpt.com/share/697112b2-0410-800f-93ff-9372b603d43f
This commit is contained in:
46
.github/workflows/ci.yml
vendored
Normal file
46
.github/workflows/ci.yml
vendored
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
name: CI
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
pull_request:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test-and-lint:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
timeout-minutes: 15
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Set up Python
|
||||||
|
uses: actions/setup-python@v5
|
||||||
|
with:
|
||||||
|
python-version: "3.12"
|
||||||
|
cache: "pip"
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
python -m pip install -e .
|
||||||
|
python -m pip install ruff
|
||||||
|
|
||||||
|
- name: Ruff (lint)
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
ruff check .
|
||||||
|
|
||||||
|
- name: Ruff (format check)
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
ruff format --check .
|
||||||
|
|
||||||
|
- name: Unit tests
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
set -euo pipefail
|
||||||
|
make test
|
||||||
212
Guide.md
212
Guide.md
@@ -1,202 +1,30 @@
|
|||||||
# wg-mtu-auto — Practical Guide
|
# automtu — Practical Guide
|
||||||
|
|
||||||
This guide shows **how to *determine* and *set*** correct MTU values:
|
|
||||||
|
|
||||||
* **with WireGuard** (`wg0`), and
|
|
||||||
* **without WireGuard** (just your egress interface like `eth0`/`wlan0`).
|
|
||||||
|
|
||||||
The tool can:
|
The tool can:
|
||||||
|
- auto-detect your egress interface (e.g., eth0)
|
||||||
|
- probe Path MTU (PMTU) using `ping -M do`
|
||||||
|
- compute a safe WireGuard MTU: `effective_mtu - overhead` (default overhead=80)
|
||||||
|
- apply MTU to egress and/or WireGuard (explicit flags)
|
||||||
|
|
||||||
* auto-detect your **egress interface**
|
## Recipes
|
||||||
* optionally **probe Path MTU (PMTU)** to one or more remote targets
|
|
||||||
* compute a safe **WireGuard MTU** (`effective_mtu - 80` by default)
|
|
||||||
* **apply** MTU to `wg0` and/or your egress interface
|
|
||||||
|
|
||||||
---
|
1) Only compute/show (safe default):
|
||||||
|
automtu --dry-run
|
||||||
|
|
||||||
## TL;DR recipes
|
2) Probe PMTU and apply to egress:
|
||||||
|
sudo automtu --pmtu-target 1.1.1.1 --apply-egress-mtu
|
||||||
|
|
||||||
### 1) Just compute & set WireGuard MTU (no PMTU probing)
|
3) Auto-add WireGuard peer endpoints as PMTU targets and apply WG MTU:
|
||||||
|
sudo automtu --auto-pmtu-from-wg --apply-wg-mtu
|
||||||
|
|
||||||
```bash
|
4) Prefer WireGuard as egress basis if default route uses wg0:
|
||||||
sudo automtu
|
sudo automtu --prefer-wg-egress --auto-pmtu-from-wg --apply-wg-mtu
|
||||||
# Equivalent from repo:
|
|
||||||
# sudo python3 main.py
|
|
||||||
```
|
|
||||||
|
|
||||||
* Detects egress (e.g., `eth0`), reads its MTU (e.g., 1500)
|
5) Force WG MTU:
|
||||||
* Computes `wg0` MTU = `egress_mtu - 80` (min clamp 1280)
|
sudo automtu --set-wg-mtu 1372 --apply-wg-mtu
|
||||||
* Applies to `wg0` (if present)
|
|
||||||
|
|
||||||
Dry-run:
|
## Notes
|
||||||
|
|
||||||
```bash
|
- Applying MTU requires root (unless `--dry-run`).
|
||||||
automtu --dry-run
|
- PMTU probing can fail if ICMP is blocked; the tool then falls back to egress MTU.
|
||||||
```
|
- Runtime MTU changes are not persistent across reboots.
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### 2) Compute & apply MTU on *egress* (non-WireGuard)
|
|
||||||
|
|
||||||
Useful if you want the *link itself* (e.g., `eth0`) to use the discovered PMTU.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --pmtu-target 1.1.1.1 --apply-egress-mtu
|
|
||||||
```
|
|
||||||
|
|
||||||
* Probes PMTU to `1.1.1.1`, applies that result to `eth0`
|
|
||||||
* Also computes a matching WireGuard MTU (`PMTU - 80`) and sets `wg0` (if present)
|
|
||||||
|
|
||||||
> If the selected egress is `wg0`, egress application is **skipped** on purpose.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### 3) With WireGuard peers: auto-add endpoints as PMTU targets
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --auto-pmtu-from-wg
|
|
||||||
```
|
|
||||||
|
|
||||||
* Reads `wg0` peer endpoints (`wg show ...` / `wg showconf`)
|
|
||||||
* Probes PMTU to those endpoints
|
|
||||||
* Picks an **effective PMTU** (policy = `min` by default)
|
|
||||||
* Applies **`wg0` MTU = effective PMTU − 80**
|
|
||||||
|
|
||||||
Add extra targets & choose different policy:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --auto-pmtu-from-wg \
|
|
||||||
--pmtu-target 46.4.224.77,1.1.1.1 \
|
|
||||||
--pmtu-policy median
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### 4) Force a specific WireGuard MTU (override)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --set-wg-mtu 1372
|
|
||||||
```
|
|
||||||
|
|
||||||
* Skips the computed value and **forces** 1372 on `wg0` (clamped to ≥1280)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## When to use which approach?
|
|
||||||
|
|
||||||
* **You just use WireGuard** and want a safe default:
|
|
||||||
`sudo automtu` → picks `wg0 = egress_mtu - 80` (e.g., `1500 - 80 = 1420`).
|
|
||||||
|
|
||||||
* **You suspect smaller upstream MTU** (PPPoE/ISP/VPN/“somewhere in the path”):
|
|
||||||
Use PMTU probing towards stable targets (your WG peer, DNS resolvers):
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --pmtu-target 46.4.224.77 --pmtu-target 1.1.1.1
|
|
||||||
```
|
|
||||||
|
|
||||||
Then optionally apply the PMTU to your egress:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --pmtu-target 46.4.224.77 --apply-egress-mtu
|
|
||||||
```
|
|
||||||
|
|
||||||
* **You have WireGuard peers** and want the tool to discover them automatically:
|
|
||||||
`sudo automtu --auto-pmtu-from-wg`
|
|
||||||
(You can still add manual targets and change policy.)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## How it works (short)
|
|
||||||
|
|
||||||
1. **Egress detection**
|
|
||||||
Reads default routes and picks a non-VPN interface (e.g., `eth0`).
|
|
||||||
If you want to prefer `wg0` when the default route already uses it:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --prefer-wg-egress --wg-if wg0
|
|
||||||
```
|
|
||||||
|
|
||||||
2. **PMTU probing (optional)**
|
|
||||||
Uses `ping -M do` (DF set) with a quick binary search to find the largest unfragmented payload for each target.
|
|
||||||
From the successful results, selects an **effective PMTU** using a policy:
|
|
||||||
|
|
||||||
* `--pmtu-policy min` (default, safest)
|
|
||||||
* `--pmtu-policy median`
|
|
||||||
* `--pmtu-policy max`
|
|
||||||
|
|
||||||
3. **WireGuard MTU calculation**
|
|
||||||
`wg_mtu = max(wg_min, effective_mtu - wg_overhead)`
|
|
||||||
Defaults: `wg_min=1280`, `wg_overhead=80`.
|
|
||||||
|
|
||||||
4. **Apply**
|
|
||||||
|
|
||||||
* If `--apply-egress-mtu` is set, apply **effective PMTU** to the egress (unless egress is `wg0`).
|
|
||||||
* Apply **WireGuard MTU** to `wg0` (or the iface passed via `--wg-if`).
|
|
||||||
* If `--set-wg-mtu X` is given, it **overrides** the computed value.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Examples (copy & paste)
|
|
||||||
|
|
||||||
### A) Quick WireGuard tuning with peer awareness
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --auto-pmtu-from-wg
|
|
||||||
```
|
|
||||||
|
|
||||||
### B) Manual targets, conservative (min) policy
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --pmtu-target 46.4.224.77 --pmtu-target 1.1.1.1
|
|
||||||
```
|
|
||||||
|
|
||||||
### C) Apply PMTU on egress + set matching wg0
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --pmtu-target 1.1.1.1 --apply-egress-mtu
|
|
||||||
```
|
|
||||||
|
|
||||||
### D) Prefer WireGuard as egress (if default route uses WG)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --prefer-wg-egress --wg-if wg0 --auto-pmtu-from-wg
|
|
||||||
```
|
|
||||||
|
|
||||||
### E) Force a specific wg0 MTU
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --set-wg-mtu 1372
|
|
||||||
```
|
|
||||||
|
|
||||||
### F) Dry-run any of the above
|
|
||||||
|
|
||||||
```bash
|
|
||||||
automtu --dry-run --auto-pmtu-from-wg --pmtu-target 1.1.1.1
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Persisting the value in WireGuard
|
|
||||||
|
|
||||||
Runtime changes are **not** permanent. To persist:
|
|
||||||
|
|
||||||
* Either let your automation run this tool before/after `wg-quick up wg0`, **or**
|
|
||||||
* Add a fixed value in your `wg0` config (`/etc/wireguard/wg0.conf`):
|
|
||||||
|
|
||||||
```ini
|
|
||||||
[Interface]
|
|
||||||
MTU = 1372
|
|
||||||
```
|
|
||||||
|
|
||||||
> Static MTU is fine if the path is stable. If your route/ISP changes, prefer running this tool.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Notes & Troubleshooting
|
|
||||||
|
|
||||||
* If **all PMTU probes fail**, the tool prints a warning and falls back to the egress MTU (e.g., `1500`) and sets `wg0 = egress - 80`.
|
|
||||||
Some networks block ICMP “fragmentation needed”; use multiple targets or rely on egress-only.
|
|
||||||
* You can **override defaults** via flags or environment:
|
|
||||||
|
|
||||||
* `WG_IF=wg0 WG_OVERHEAD=80 WG_MIN=1280 automtu ...`
|
|
||||||
* The tool **deduplicates targets** and understands IPv4/IPv6 endpoints (e.g., `2a01:...`).
|
|
||||||
|
|||||||
11
Makefile
11
Makefile
@@ -1,4 +1,4 @@
|
|||||||
PY ?= python3
|
PYTHON ?= python3
|
||||||
|
|
||||||
.PHONY: test install help
|
.PHONY: test install help
|
||||||
|
|
||||||
@@ -9,11 +9,4 @@ help:
|
|||||||
@echo " make help - this help"
|
@echo " make help - this help"
|
||||||
|
|
||||||
test:
|
test:
|
||||||
$(PY) -m unittest -v test.py
|
@PYTHONPATH="$(CURDIR)/src" "$(PYTHON)" -m unittest discover -s tests/unit -p "test_*.py" -v
|
||||||
|
|
||||||
install:
|
|
||||||
@echo "Installation is provided via your package manager:"
|
|
||||||
@echo " pkgmgr install automtu"
|
|
||||||
@echo ""
|
|
||||||
@echo "Alternatively, run the tool directly:"
|
|
||||||
@echo " $(PY) main.py [--options]"
|
|
||||||
|
|||||||
98
README.md
98
README.md
@@ -1,95 +1,29 @@
|
|||||||
# wg-mtu-auto
|
# automtu
|
||||||
[](https://github.com/sponsors/kevinveenbirkenbach) [](https://www.patreon.com/c/kevinveenbirkenbach) [](https://buymeacoffee.com/kevinveenbirkenbach) [](https://s.veen.world/paypaldonate)
|
[](https://github.com/sponsors/kevinveenbirkenbach) [](https://www.patreon.com/c/kevinveenbirkenbach) [](https://buymeacoffee.com/kevinveenbirkenbach) [](https://s.veen.world/paypaldonate)
|
||||||
|
|
||||||
|
Auto-detect your egress interface, optionally probe Path MTU (PMTU) using DF-ping,
|
||||||
|
compute a WireGuard MTU, and apply MTU settings.
|
||||||
|
|
||||||
Automatically detect the optimal WireGuard MTU by analyzing your local egress interface and optionally probing the Path MTU (PMTU) to one or more remote hosts.
|
## Install (editable)
|
||||||
The tool ensures stable and efficient VPN connections by preventing fragmentation and latency caused by mismatched MTU settings.
|
pip install -e .
|
||||||
|
|
||||||
---
|
## Usage
|
||||||
|
|
||||||
## ✨ Features
|
Show only (no changes):
|
||||||
|
automtu --dry-run
|
||||||
|
|
||||||
- **Automatic Egress Detection** — Finds your primary internet interface automatically.
|
Probe PMTU to targets and apply on egress:
|
||||||
- **WireGuard MTU Calculation** — Computes `wg0` MTU based on egress MTU minus overhead (default 80 bytes).
|
sudo automtu --pmtu-target registry-1.docker.io --apply-egress-mtu
|
||||||
- **Optional Path MTU Probing** — Uses ICMP “Don’t Fragment” (`ping -M do`) to find the real usable MTU across network paths.
|
|
||||||
- **Multi-Target PMTU Support** — Test multiple remote hosts and choose an effective value via policy (`min`, `median`, `max`).
|
|
||||||
- **Dry-Run Mode** — Simulate changes without applying them.
|
|
||||||
- **Safe for Automation** — Integrates well with WireGuard systemd services or Ansible setups.
|
|
||||||
|
|
||||||
---
|
Auto-add WireGuard peer endpoints as targets and apply WG MTU:
|
||||||
|
sudo automtu --auto-pmtu-from-wg --apply-wg-mtu
|
||||||
|
|
||||||
## 🚀 Installation
|
Force WG MTU:
|
||||||
|
sudo automtu --set-wg-mtu 1372 --apply-wg-mtu
|
||||||
|
|
||||||
### Option 1 — Using [pkgmgr](https://github.com/kevinveenbirkenbach/package-manager)
|
Help:
|
||||||
|
automtu --help
|
||||||
|
|
||||||
If you use Kevin Veen-Birkenbach’s package manager (`pkgmgr`):
|
|
||||||
|
|
||||||
```bash
|
|
||||||
pkgmgr install automtu
|
|
||||||
````
|
|
||||||
|
|
||||||
This will automatically fetch and install `wg-mtu-auto` system-wide.
|
|
||||||
|
|
||||||
### Option 2 — Run directly from source
|
|
||||||
|
|
||||||
Clone this repository and execute the script manually:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git clone https://github.com/kevinveenbirkenbach/wg-mtu-auto.git
|
|
||||||
cd wg-mtu-auto
|
|
||||||
sudo python3 main.py --help
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 🧩 Usage Examples
|
|
||||||
|
|
||||||
### Basic detection (no PMTU)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu
|
|
||||||
```
|
|
||||||
|
|
||||||
### Specify egress interface and force MTU
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --egress-if eth0 --force-egress-mtu 1452
|
|
||||||
```
|
|
||||||
|
|
||||||
### Probe multiple PMTU targets (safe policy: `min`)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --pmtu-target 46.4.224.77 --pmtu-target 2a01:4f8:2201:4695::2
|
|
||||||
```
|
|
||||||
|
|
||||||
### Choose median or max policy
|
|
||||||
|
|
||||||
```bash
|
|
||||||
sudo automtu --pmtu-target 46.4.224.77,1.1.1.1 --pmtu-policy median
|
|
||||||
sudo automtu --pmtu-target 46.4.224.77,1.1.1.1 --pmtu-policy max
|
|
||||||
```
|
|
||||||
|
|
||||||
### Dry-run (no system changes)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
automtu --dry-run
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 🧪 Development
|
|
||||||
|
|
||||||
Run unit tests using:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
make test
|
|
||||||
```
|
|
||||||
|
|
||||||
To see installation guidance (does not install anything):
|
|
||||||
|
|
||||||
```bash
|
|
||||||
make install
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
326
main.py
326
main.py
@@ -1,326 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
wg_mtu_auto.py — Auto-detect egress IF, optionally probe Path MTU to one or more targets,
|
|
||||||
compute the correct WireGuard MTU, and apply it.
|
|
||||||
|
|
||||||
Examples:
|
|
||||||
sudo ./main.py
|
|
||||||
sudo ./main.py --force-egress-mtu 1452
|
|
||||||
sudo ./main.py --pmtu-target 46.4.224.77 --pmtu-target 2a01:4f8:2201:4695::2
|
|
||||||
sudo ./main.py --pmtu-target 46.4.224.77,2a01:4f8:2201:4695::2 --pmtu-policy min
|
|
||||||
sudo ./main.py --prefer-wg-egress --auto-pmtu-from-wg
|
|
||||||
./main.py --dry-run
|
|
||||||
"""
|
|
||||||
import argparse
|
|
||||||
import ipaddress
|
|
||||||
import os
|
|
||||||
import pathlib
|
|
||||||
import re
|
|
||||||
import statistics
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------- helpers -----------------
|
|
||||||
|
|
||||||
def run(cmd): # -> str
|
|
||||||
return subprocess.run(
|
|
||||||
cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True
|
|
||||||
).stdout.strip()
|
|
||||||
|
|
||||||
|
|
||||||
def rc(cmd): # -> int
|
|
||||||
return subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode
|
|
||||||
|
|
||||||
|
|
||||||
def exists_iface(iface): # -> bool
|
|
||||||
return pathlib.Path(f"/sys/class/net/{iface}").exists()
|
|
||||||
|
|
||||||
|
|
||||||
def read_mtu(iface): # -> int
|
|
||||||
with open(f"/sys/class/net/{iface}/mtu", "r") as f:
|
|
||||||
return int(f.read().strip())
|
|
||||||
|
|
||||||
|
|
||||||
def set_mtu(iface, mtu, dry):
|
|
||||||
if dry:
|
|
||||||
print(f"[wg-mtu] DRY-RUN: ip link set mtu {mtu} dev {iface}")
|
|
||||||
else:
|
|
||||||
subprocess.run(["ip", "link", "set", "mtu", str(mtu), "dev", iface], check=True)
|
|
||||||
|
|
||||||
|
|
||||||
def require_root(dry):
|
|
||||||
if not dry and os.geteuid() != 0:
|
|
||||||
print("[wg-mtu][ERROR] Please run as root (sudo) or use --dry-run.", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
def is_ipv6(addr): # -> bool
|
|
||||||
try:
|
|
||||||
return isinstance(ipaddress.ip_address(addr), ipaddress.IPv6Address)
|
|
||||||
except ValueError:
|
|
||||||
# best-effort for hostnames (contains ':')
|
|
||||||
return ":" in addr
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------- route & iface selection -----------------
|
|
||||||
|
|
||||||
def default_route_lines():
|
|
||||||
lines = []
|
|
||||||
for cmd in (["ip", "-4", "route", "show", "default"], ["ip", "-6", "route", "show", "default"]):
|
|
||||||
out = run(cmd)
|
|
||||||
if out:
|
|
||||||
lines.extend(out.splitlines())
|
|
||||||
return lines
|
|
||||||
|
|
||||||
|
|
||||||
def get_default_ifaces(ignore_vpn=True): # -> list[str]
|
|
||||||
devs = []
|
|
||||||
for line in default_route_lines():
|
|
||||||
m = re.search(r"\bdev\s+(\S+)", line)
|
|
||||||
if m:
|
|
||||||
devs.append(m.group(1))
|
|
||||||
# fallback via route get
|
|
||||||
if not devs:
|
|
||||||
for cmd in (["ip", "route", "get", "1.1.1.1"], ["ip", "-6", "route", "get", "2606:4700:4700::1111"]):
|
|
||||||
out = run(cmd)
|
|
||||||
m = re.search(r"\bdev\s+(\S+)", out)
|
|
||||||
if m:
|
|
||||||
devs.append(m.group(1))
|
|
||||||
uniq = []
|
|
||||||
for d in devs:
|
|
||||||
if not d or d == "lo" or not exists_iface(d):
|
|
||||||
continue
|
|
||||||
if ignore_vpn and re.match(r"^(wg|tun)\d*$", d):
|
|
||||||
continue
|
|
||||||
if d not in uniq:
|
|
||||||
uniq.append(d)
|
|
||||||
return uniq
|
|
||||||
|
|
||||||
|
|
||||||
def wg_default_is_active(wg_if: str) -> bool:
|
|
||||||
# check if any default route is via wg_if
|
|
||||||
return any(re.search(rf"\bdev\s+{re.escape(wg_if)}\b", line) for line in default_route_lines())
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------- PMTU probing -----------------
|
|
||||||
|
|
||||||
def ping_ok(payload, target, timeout_s): # -> bool
|
|
||||||
base = ["ping", "-M", "do", "-c", "1", "-s", str(payload), "-W", str(max(1, int(round(timeout_s))))]
|
|
||||||
if is_ipv6(target):
|
|
||||||
base.insert(1, "-6")
|
|
||||||
return rc(base + [target]) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def probe_pmtu(target, lo_payload=1200, hi_payload=1472, timeout=1.0): # -> int|None
|
|
||||||
"""Binary-search the largest payload that passes with DF. Return Path MTU (payload + hdr) or None.
|
|
||||||
Header: +28 (IPv4), +48 (IPv6)."""
|
|
||||||
hdr = 48 if is_ipv6(target) else 28
|
|
||||||
# ensure lower bound works; if not, try smaller floors
|
|
||||||
if not ping_ok(lo_payload, target, timeout):
|
|
||||||
for p in (1180, 1160, 1140):
|
|
||||||
if ping_ok(p, target, timeout):
|
|
||||||
lo_payload = p
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
lo, hi, best = lo_payload, hi_payload, None
|
|
||||||
while lo <= hi:
|
|
||||||
mid = (lo + hi) // 2
|
|
||||||
if ping_ok(mid, target, timeout):
|
|
||||||
best = mid
|
|
||||||
lo = mid + 1
|
|
||||||
else:
|
|
||||||
hi = mid - 1
|
|
||||||
return (best + hdr) if best is not None else None
|
|
||||||
|
|
||||||
|
|
||||||
def choose_effective(pmtus, policy="min"): # -> int
|
|
||||||
if not pmtus:
|
|
||||||
raise ValueError("no PMTUs to choose from")
|
|
||||||
if policy == "min":
|
|
||||||
return min(pmtus)
|
|
||||||
if policy == "max":
|
|
||||||
return max(pmtus)
|
|
||||||
if policy == "median":
|
|
||||||
return int(statistics.median(sorted(pmtus)))
|
|
||||||
raise ValueError(f"unknown policy {policy}")
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------- WireGuard helpers (opt-in) -----------------
|
|
||||||
|
|
||||||
def wg_is_active(wg_if: str) -> bool:
|
|
||||||
if not exists_iface(wg_if):
|
|
||||||
return False
|
|
||||||
return rc(["wg", "show", wg_if]) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def wg_peer_endpoints(wg_if: str) -> list[str]:
|
|
||||||
"""Return list of peer endpoints (hostnames/IPs) – port stripped."""
|
|
||||||
targets = []
|
|
||||||
|
|
||||||
# 1) Try: wg show <if> endpoints
|
|
||||||
out = run(["wg", "show", wg_if, "endpoints"])
|
|
||||||
for line in out.splitlines():
|
|
||||||
# format: <peer_public_key>\t<endpoint or (none)>
|
|
||||||
parts = line.strip().split()
|
|
||||||
if len(parts) >= 2 and parts[-1] != "(none)":
|
|
||||||
ep = parts[-1] # host:port
|
|
||||||
host = ep.rsplit(":", 1)[0]
|
|
||||||
# IPv6 endpoint may be like [2001:db8::1]:51820
|
|
||||||
host = host.strip("[]")
|
|
||||||
targets.append(host)
|
|
||||||
|
|
||||||
# 2) Fallback: wg showconf (root may be required)
|
|
||||||
if not targets:
|
|
||||||
conf = run(["wg", "showconf", wg_if])
|
|
||||||
if conf:
|
|
||||||
for m in re.finditer(r"^Endpoint\s*=\s*(.+)$", conf, flags=re.MULTILINE):
|
|
||||||
ep = m.group(1).strip()
|
|
||||||
host = ep.rsplit(":", 1)[0].strip("[]")
|
|
||||||
targets.append(host)
|
|
||||||
|
|
||||||
# dedupe & sanity
|
|
||||||
cleaned = []
|
|
||||||
for t in targets:
|
|
||||||
if t and t not in cleaned:
|
|
||||||
cleaned.append(t)
|
|
||||||
return cleaned
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------- main -----------------
|
|
||||||
|
|
||||||
def main():
|
|
||||||
ap = argparse.ArgumentParser(description="Compute/apply WireGuard MTU based on egress MTU and optional multi-target PMTU probing.")
|
|
||||||
ap.add_argument("--egress-if", help="Explicit egress interface (auto-detected if omitted).")
|
|
||||||
ap.add_argument("--prefer-wg-egress", action="store_true",
|
|
||||||
help="Allow/consider wg* as egress and prefer it if default route uses wg (default: disabled).")
|
|
||||||
ap.add_argument("--auto-pmtu-from-wg", action="store_true",
|
|
||||||
help="Automatically add WireGuard peer endpoints as PMTU targets (default: disabled).")
|
|
||||||
ap.add_argument("--wg-if", default=os.environ.get("WG_IF", "wg0"), help="WireGuard interface name (default: wg0).")
|
|
||||||
ap.add_argument("--wg-overhead", type=int, default=int(os.environ.get("WG_OVERHEAD", "80")), help="Bytes of WG overhead to subtract (default: 80).")
|
|
||||||
ap.add_argument("--wg-min", type=int, default=int(os.environ.get("WG_MIN", "1280")), help="Minimum allowed WG MTU (default: 1280).")
|
|
||||||
# PMTU (multi-target)
|
|
||||||
ap.add_argument("--pmtu-target", action="append", help="Target hostname/IP to probe PMTU. Can be given multiple times OR comma-separated.")
|
|
||||||
ap.add_argument("--pmtu-timeout", type=float, default=1.0, help="Timeout (seconds) per ping probe (default: 1.0).")
|
|
||||||
ap.add_argument("--pmtu-min-payload", type=int, default=1200, help="Lower bound payload for PMTU search (default: 1200).")
|
|
||||||
ap.add_argument("--pmtu-max-payload", type=int, default=1472, help="Upper bound payload for PMTU search (default: 1472 ~ 1500-28).")
|
|
||||||
ap.add_argument("--pmtu-policy", choices=["min", "median", "max"], default="min",
|
|
||||||
help="How to choose effective PMTU across multiple targets (default: min).")
|
|
||||||
ap.add_argument("--apply-egress-mtu", action="store_true",
|
|
||||||
help="Apply the effective Path MTU to the detected egress interface (e.g. eth0).")
|
|
||||||
|
|
||||||
ap.add_argument("--dry-run", action="store_true", help="Show actions without applying changes.")
|
|
||||||
# NEW: force a specific WireGuard MTU (overrides computed value)
|
|
||||||
ap.add_argument("--set-wg-mtu", type=int, help="Force a specific MTU to apply on the WireGuard interface (overrides computed value).")
|
|
||||||
# (legacy / optional) force egress MTU
|
|
||||||
ap.add_argument("--force-egress-mtu", type=int, help="Force this MTU on the egress interface before computing wg MTU.")
|
|
||||||
args = ap.parse_args()
|
|
||||||
|
|
||||||
require_root(args.dry_run)
|
|
||||||
|
|
||||||
# Egress detection (wg ignored by default)
|
|
||||||
if args.egress_if:
|
|
||||||
egress = args.egress_if
|
|
||||||
else:
|
|
||||||
ignore_vpn = not args.prefer_wg_egress
|
|
||||||
cands = get_default_ifaces(ignore_vpn=ignore_vpn)
|
|
||||||
# If we allow wg and default route is via wg-if, prefer it first
|
|
||||||
if args.prefer_wg_egress and wg_is_active(args.wg_if) and wg_default_is_active(args.wg_if):
|
|
||||||
if args.wg_if in cands:
|
|
||||||
cands.remove(args.wg_if)
|
|
||||||
cands.insert(0, args.wg_if)
|
|
||||||
egress = cands[0] if cands else None
|
|
||||||
|
|
||||||
if not egress:
|
|
||||||
print("[wg-mtu][ERROR] Could not detect egress interface (use --egress-if).", file=sys.stderr)
|
|
||||||
sys.exit(2)
|
|
||||||
if not exists_iface(egress):
|
|
||||||
print(f"[wg-mtu][ERROR] Interface {egress} does not exist.", file=sys.stderr)
|
|
||||||
sys.exit(3)
|
|
||||||
print(f"[wg-mtu] Detected egress interface: {egress}")
|
|
||||||
|
|
||||||
# Egress MTU
|
|
||||||
if args.prefer_wg_egress and egress == args.wg_if:
|
|
||||||
print(f"[wg-mtu] Using WireGuard interface {args.wg_if} as egress basis.")
|
|
||||||
if args.wg_if == egress and not wg_is_active(args.wg_if):
|
|
||||||
print(f"[wg-mtu][WARN] {args.wg_if} selected as egress but WireGuard is not active.", file=sys.stderr)
|
|
||||||
|
|
||||||
if args.force_egress_mtu:
|
|
||||||
print(f"[wg-mtu] Forcing egress MTU {args.force_egress_mtu} on {egress}")
|
|
||||||
set_mtu(egress, args.force_egress_mtu, args.dry_run)
|
|
||||||
base_mtu = args.force_egress_mtu
|
|
||||||
else:
|
|
||||||
base_mtu = read_mtu(egress)
|
|
||||||
print(f"[wg-mtu] Egress base MTU: {base_mtu}")
|
|
||||||
|
|
||||||
# Build PMTU target list
|
|
||||||
pmtu_targets = []
|
|
||||||
if args.pmtu_target:
|
|
||||||
for item in args.pmtu_target:
|
|
||||||
pmtu_targets.extend([x.strip() for x in item.split(",") if x.strip()])
|
|
||||||
|
|
||||||
if args.auto_pmtu_from_wg:
|
|
||||||
if wg_is_active(args.wg_if):
|
|
||||||
wg_targets = wg_peer_endpoints(args.wg_if)
|
|
||||||
if wg_targets:
|
|
||||||
print(f"[wg-mtu] Auto-added WG peer endpoints as PMTU targets: {', '.join(wg_targets)}")
|
|
||||||
pmtu_targets.extend(wg_targets)
|
|
||||||
else:
|
|
||||||
print("[wg-mtu] INFO: No WG peer endpoints discovered (wg show/showconf).")
|
|
||||||
else:
|
|
||||||
print(f"[wg-mtu] INFO: {args.wg_if} is not active; skipping auto PMTU targets from WG.")
|
|
||||||
|
|
||||||
# Deduplicate PMTU targets
|
|
||||||
if pmtu_targets:
|
|
||||||
pmtu_targets = list(dict.fromkeys(pmtu_targets))
|
|
||||||
|
|
||||||
# PMTU probing
|
|
||||||
effective_mtu = base_mtu
|
|
||||||
if pmtu_targets:
|
|
||||||
good = []
|
|
||||||
print(f"[wg-mtu] Probing Path MTU for: {', '.join(pmtu_targets)} (policy={args.pmtu_policy})")
|
|
||||||
for t in pmtu_targets:
|
|
||||||
p = probe_pmtu(t, args.pmtu_min_payload, args.pmtu_max_payload, args.pmtu_timeout)
|
|
||||||
print(f"[wg-mtu] - {t}: {p if p else 'probe failed'}")
|
|
||||||
if p:
|
|
||||||
good.append(p)
|
|
||||||
if good:
|
|
||||||
chosen = choose_effective(good, args.pmtu_policy)
|
|
||||||
print(f"[wg-mtu] Selected Path MTU (policy={args.pmtu_policy}): {chosen}")
|
|
||||||
effective_mtu = min(base_mtu, chosen)
|
|
||||||
else:
|
|
||||||
print("[wg-mtu] WARNING: All PMTU probes failed. Falling back to egress MTU.")
|
|
||||||
|
|
||||||
# Optionally apply the effective MTU to the egress interface
|
|
||||||
if args.apply_egress_mtu:
|
|
||||||
if egress == args.wg_if:
|
|
||||||
print(f"[wg-mtu] INFO: Skipping egress MTU apply because egress == {args.wg_if}.")
|
|
||||||
else:
|
|
||||||
print(f"[wg-mtu] Applying effective MTU {effective_mtu} to egress {egress}")
|
|
||||||
set_mtu(egress, effective_mtu, args.dry_run)
|
|
||||||
|
|
||||||
# Compute WG MTU
|
|
||||||
wg_mtu = max(args.wg_min, effective_mtu - args.wg_overhead)
|
|
||||||
print(f"[wg-mtu] Computed {args.wg_if} MTU: {wg_mtu} (overhead={args.wg_overhead}, min={args.wg_min})")
|
|
||||||
|
|
||||||
# --- NEW: override with --set-wg-mtu if provided
|
|
||||||
if args.set_wg_mtu is not None:
|
|
||||||
if args.set_wg_mtu < args.wg_min:
|
|
||||||
print(f"[wg-mtu][WARN] --set-wg-mtu {args.set_wg_mtu} is below wg-min {args.wg_min}; clamping to {args.wg_min}.")
|
|
||||||
args.set_wg_mtu = args.wg_min
|
|
||||||
wg_mtu = args.set_wg_mtu
|
|
||||||
print(f"[wg-mtu] Forcing WireGuard MTU (override): {wg_mtu}")
|
|
||||||
|
|
||||||
# Apply
|
|
||||||
if exists_iface(args.wg_if):
|
|
||||||
set_mtu(args.wg_if, wg_mtu, args.dry_run)
|
|
||||||
print(f"[wg-mtu] Applied: {args.wg_if} MTU {wg_mtu}")
|
|
||||||
else:
|
|
||||||
print(f"[wg-mtu] NOTE: {args.wg_if} not present yet. Start WireGuard first, then re-run this script.")
|
|
||||||
|
|
||||||
print(f"[wg-mtu] Done. Summary: egress={egress} mtu={base_mtu}, effective_mtu={effective_mtu}, {args.wg_if}_mtu={wg_mtu}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
23
pyproject.toml
Normal file
23
pyproject.toml
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=69"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "automtu"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Auto-detect egress interface, probe Path MTU, and apply MTU (WireGuard/egress)."
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.10"
|
||||||
|
license = { text = "MIT" }
|
||||||
|
authors = [{ name = "Kevin Veen-Birkenbach" }]
|
||||||
|
keywords = ["mtu", "wireguard", "pmtu", "networking"]
|
||||||
|
dependencies = []
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
automtu = "automtu.__main__:main"
|
||||||
|
|
||||||
|
[tool.setuptools]
|
||||||
|
package-dir = { "" = "src" }
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
where = ["src"]
|
||||||
2
src/automtu/__init__.py
Normal file
2
src/automtu/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
__all__ = ["__version__"]
|
||||||
|
__version__ = "0.1.0"
|
||||||
13
src/automtu/__main__.py
Normal file
13
src/automtu/__main__.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from .cli import build_parser
|
||||||
|
from .core import run_automtu
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
args = build_parser().parse_args()
|
||||||
|
return run_automtu(args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
93
src/automtu/cli.py
Normal file
93
src/automtu/cli.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def build_parser() -> argparse.ArgumentParser:
|
||||||
|
ap = argparse.ArgumentParser(
|
||||||
|
prog="automtu",
|
||||||
|
description="Probe Path MTU and compute/apply MTU for egress and/or WireGuard.",
|
||||||
|
)
|
||||||
|
|
||||||
|
ap.add_argument(
|
||||||
|
"--egress-if", help="Explicit egress interface (auto-detected if omitted)."
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--prefer-wg-egress",
|
||||||
|
action="store_true",
|
||||||
|
help="Allow wg* as egress and prefer it if default route uses wg (default: off).",
|
||||||
|
)
|
||||||
|
|
||||||
|
ap.add_argument(
|
||||||
|
"--pmtu-target",
|
||||||
|
action="append",
|
||||||
|
help="Target hostname/IP to probe PMTU. Repeatable or comma-separated.",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--pmtu-timeout",
|
||||||
|
type=float,
|
||||||
|
default=1.0,
|
||||||
|
help="Ping timeout seconds (default: 1.0).",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--pmtu-min-payload",
|
||||||
|
type=int,
|
||||||
|
default=1200,
|
||||||
|
help="PMTU lower payload bound (default: 1200).",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--pmtu-max-payload",
|
||||||
|
type=int,
|
||||||
|
default=1472,
|
||||||
|
help="PMTU upper payload bound (default: 1472).",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--pmtu-policy",
|
||||||
|
choices=["min", "median", "max"],
|
||||||
|
default="min",
|
||||||
|
help="Aggregate PMTU across targets (default: min).",
|
||||||
|
)
|
||||||
|
|
||||||
|
ap.add_argument(
|
||||||
|
"--apply-egress-mtu",
|
||||||
|
action="store_true",
|
||||||
|
help="Apply effective MTU to egress interface.",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--apply-wg-mtu", action="store_true", help="Apply MTU to WireGuard interface."
|
||||||
|
)
|
||||||
|
|
||||||
|
ap.add_argument(
|
||||||
|
"--wg-if",
|
||||||
|
default=os.environ.get("WG_IF", "wg0"),
|
||||||
|
help="WireGuard interface (default: wg0).",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--wg-overhead",
|
||||||
|
type=int,
|
||||||
|
default=int(os.environ.get("WG_OVERHEAD", "80")),
|
||||||
|
help="WG overhead.",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--wg-min",
|
||||||
|
type=int,
|
||||||
|
default=int(os.environ.get("WG_MIN", "1280")),
|
||||||
|
help="Minimum WG MTU.",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--auto-pmtu-from-wg",
|
||||||
|
action="store_true",
|
||||||
|
help="Add WG peer endpoints as PMTU targets.",
|
||||||
|
)
|
||||||
|
ap.add_argument("--set-wg-mtu", type=int, help="Force MTU for WireGuard interface.")
|
||||||
|
|
||||||
|
ap.add_argument(
|
||||||
|
"--force-egress-mtu",
|
||||||
|
type=int,
|
||||||
|
help="Force MTU on the egress interface before computing.",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--dry-run", action="store_true", help="Show actions without applying changes."
|
||||||
|
)
|
||||||
|
return ap
|
||||||
161
src/automtu/core.py
Normal file
161
src/automtu/core.py
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import statistics
|
||||||
|
import sys
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Iterable, Optional
|
||||||
|
|
||||||
|
from .net import (
|
||||||
|
default_route_uses_iface,
|
||||||
|
detect_egress_iface,
|
||||||
|
iface_exists,
|
||||||
|
read_iface_mtu,
|
||||||
|
require_root,
|
||||||
|
set_iface_mtu,
|
||||||
|
)
|
||||||
|
from .pmtu import probe_pmtu
|
||||||
|
from .wg import wg_is_active, wg_peer_endpoints
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Result:
|
||||||
|
egress: str
|
||||||
|
base_mtu: int
|
||||||
|
effective_mtu: int
|
||||||
|
wg_if: str
|
||||||
|
wg_mtu: int
|
||||||
|
|
||||||
|
|
||||||
|
def _split_targets(items: Optional[list[str]]) -> list[str]:
|
||||||
|
raw: list[str] = []
|
||||||
|
for item in items or []:
|
||||||
|
raw.extend([x.strip() for x in item.split(",") if x.strip()])
|
||||||
|
return list(dict.fromkeys(raw))
|
||||||
|
|
||||||
|
|
||||||
|
def _choose(values: Iterable[int], policy: str) -> int:
|
||||||
|
vals = sorted(values)
|
||||||
|
if policy == "min":
|
||||||
|
return vals[0]
|
||||||
|
if policy == "max":
|
||||||
|
return vals[-1]
|
||||||
|
if policy == "median":
|
||||||
|
return int(statistics.median(vals))
|
||||||
|
raise ValueError(f"unknown policy: {policy}")
|
||||||
|
|
||||||
|
|
||||||
|
def run_automtu(args) -> int:
|
||||||
|
require_root(args.dry_run)
|
||||||
|
|
||||||
|
egress = args.egress_if or detect_egress_iface(ignore_vpn=not args.prefer_wg_egress)
|
||||||
|
if not egress:
|
||||||
|
print(
|
||||||
|
"[automtu][ERROR] Could not detect egress interface (use --egress-if).",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
return 2
|
||||||
|
if not iface_exists(egress):
|
||||||
|
print(f"[automtu][ERROR] Interface {egress} does not exist.", file=sys.stderr)
|
||||||
|
return 3
|
||||||
|
|
||||||
|
if (
|
||||||
|
args.egress_if is None
|
||||||
|
and args.prefer_wg_egress
|
||||||
|
and iface_exists(args.wg_if)
|
||||||
|
and wg_is_active(args.wg_if)
|
||||||
|
and default_route_uses_iface(args.wg_if)
|
||||||
|
):
|
||||||
|
egress = args.wg_if
|
||||||
|
print(f"[automtu] Using WireGuard interface {args.wg_if} as egress basis.")
|
||||||
|
|
||||||
|
print(f"[automtu] Detected egress interface: {egress}")
|
||||||
|
|
||||||
|
if args.force_egress_mtu:
|
||||||
|
print(f"[automtu] Forcing egress MTU {args.force_egress_mtu} on {egress}")
|
||||||
|
set_iface_mtu(egress, args.force_egress_mtu, args.dry_run)
|
||||||
|
base_mtu = args.force_egress_mtu
|
||||||
|
else:
|
||||||
|
base_mtu = read_iface_mtu(egress)
|
||||||
|
print(f"[automtu] Egress base MTU: {base_mtu}")
|
||||||
|
|
||||||
|
targets = _split_targets(args.pmtu_target)
|
||||||
|
if args.auto_pmtu_from_wg:
|
||||||
|
if wg_is_active(args.wg_if):
|
||||||
|
peers = wg_peer_endpoints(args.wg_if)
|
||||||
|
if peers:
|
||||||
|
print(
|
||||||
|
f"[automtu] Auto-added WG peer endpoints as PMTU targets: {', '.join(peers)}"
|
||||||
|
)
|
||||||
|
targets = list(dict.fromkeys([*targets, *peers]))
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
f"[automtu] INFO: {args.wg_if} not active; skipping auto PMTU targets."
|
||||||
|
)
|
||||||
|
|
||||||
|
effective_mtu = base_mtu
|
||||||
|
if targets:
|
||||||
|
print(
|
||||||
|
f"[automtu] Probing Path MTU for: {', '.join(targets)} (policy={args.pmtu_policy})"
|
||||||
|
)
|
||||||
|
good: list[int] = []
|
||||||
|
for t in targets:
|
||||||
|
p = probe_pmtu(
|
||||||
|
t, args.pmtu_min_payload, args.pmtu_max_payload, args.pmtu_timeout
|
||||||
|
)
|
||||||
|
print(f"[automtu] - {t}: {p if p else 'probe failed'}")
|
||||||
|
if p:
|
||||||
|
good.append(p)
|
||||||
|
|
||||||
|
if good:
|
||||||
|
chosen = _choose(good, args.pmtu_policy)
|
||||||
|
print(f"[automtu] Selected Path MTU (policy={args.pmtu_policy}): {chosen}")
|
||||||
|
effective_mtu = min(base_mtu, chosen)
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
"[automtu] WARNING: All PMTU probes failed. Falling back to egress MTU."
|
||||||
|
)
|
||||||
|
|
||||||
|
if args.apply_egress_mtu:
|
||||||
|
if egress == args.wg_if:
|
||||||
|
print(
|
||||||
|
f"[automtu] INFO: Skipping egress MTU apply because egress == {args.wg_if}."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
f"[automtu] Applying effective MTU {effective_mtu} to egress {egress}"
|
||||||
|
)
|
||||||
|
set_iface_mtu(egress, effective_mtu, args.dry_run)
|
||||||
|
|
||||||
|
wg_mtu = max(int(args.wg_min), int(effective_mtu) - int(args.wg_overhead))
|
||||||
|
print(
|
||||||
|
f"[automtu] Computed {args.wg_if} MTU: {wg_mtu} (overhead={args.wg_overhead}, min={args.wg_min})"
|
||||||
|
)
|
||||||
|
|
||||||
|
if args.set_wg_mtu is not None:
|
||||||
|
forced = max(int(args.wg_min), int(args.set_wg_mtu))
|
||||||
|
if forced != int(args.set_wg_mtu):
|
||||||
|
print(
|
||||||
|
f"[automtu][WARN] --set-wg-mtu clamped to {forced} (wg-min={args.wg_min})."
|
||||||
|
)
|
||||||
|
wg_mtu = forced
|
||||||
|
print(f"[automtu] Forcing WireGuard MTU (override): {wg_mtu}")
|
||||||
|
|
||||||
|
if args.apply_wg_mtu:
|
||||||
|
if iface_exists(args.wg_if):
|
||||||
|
set_iface_mtu(args.wg_if, wg_mtu, args.dry_run)
|
||||||
|
print(f"[automtu] Applied: {args.wg_if} MTU {wg_mtu}")
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
f"[automtu] NOTE: {args.wg_if} not present yet. Start WireGuard first, then re-run."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print("[automtu] INFO: Not applying WireGuard MTU (use --apply-wg-mtu).")
|
||||||
|
|
||||||
|
_ = Result(
|
||||||
|
egress=egress,
|
||||||
|
base_mtu=base_mtu,
|
||||||
|
effective_mtu=effective_mtu,
|
||||||
|
wg_if=args.wg_if,
|
||||||
|
wg_mtu=wg_mtu,
|
||||||
|
)
|
||||||
|
return 0
|
||||||
82
src/automtu/net.py
Normal file
82
src/automtu/net.py
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
def _run(cmd: list[str]) -> str:
|
||||||
|
return subprocess.run(
|
||||||
|
cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True
|
||||||
|
).stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def iface_exists(iface: str) -> bool:
|
||||||
|
return pathlib.Path(f"/sys/class/net/{iface}").exists()
|
||||||
|
|
||||||
|
|
||||||
|
def read_iface_mtu(iface: str) -> int:
|
||||||
|
return int(pathlib.Path(f"/sys/class/net/{iface}/mtu").read_text().strip())
|
||||||
|
|
||||||
|
|
||||||
|
def set_iface_mtu(iface: str, mtu: int, dry: bool) -> None:
|
||||||
|
if dry:
|
||||||
|
print(f"[automtu] DRY-RUN: ip link set mtu {mtu} dev {iface}")
|
||||||
|
return
|
||||||
|
subprocess.run(["ip", "link", "set", "mtu", str(mtu), "dev", iface], check=True)
|
||||||
|
|
||||||
|
|
||||||
|
def require_root(dry: bool) -> None:
|
||||||
|
if not dry and os.geteuid() != 0:
|
||||||
|
print(
|
||||||
|
"[automtu][ERROR] Please run as root (sudo) or use --dry-run.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def detect_egress_iface(ignore_vpn: bool = True) -> Optional[str]:
|
||||||
|
devs: list[str] = []
|
||||||
|
for cmd in (
|
||||||
|
["ip", "-4", "route", "show", "default"],
|
||||||
|
["ip", "-6", "route", "show", "default"],
|
||||||
|
):
|
||||||
|
for line in _run(cmd).splitlines():
|
||||||
|
m = re.search(r"\bdev\s+(\S+)", line)
|
||||||
|
if m:
|
||||||
|
devs.append(m.group(1))
|
||||||
|
|
||||||
|
if not devs:
|
||||||
|
for cmd in (
|
||||||
|
["ip", "route", "get", "1.1.1.1"],
|
||||||
|
["ip", "-6", "route", "get", "2606:4700:4700::1111"],
|
||||||
|
):
|
||||||
|
m = re.search(r"\bdev\s+(\S+)", _run(cmd))
|
||||||
|
if m:
|
||||||
|
devs.append(m.group(1))
|
||||||
|
|
||||||
|
seen: set[str] = set()
|
||||||
|
for d in devs:
|
||||||
|
if not d or d == "lo" or not iface_exists(d):
|
||||||
|
continue
|
||||||
|
if ignore_vpn and re.match(r"^(wg|tun)\d*$", d):
|
||||||
|
continue
|
||||||
|
if d in seen:
|
||||||
|
continue
|
||||||
|
seen.add(d)
|
||||||
|
return d
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def default_route_uses_iface(iface: str) -> bool:
|
||||||
|
pat = rf"\bdev\s+{re.escape(iface)}\b"
|
||||||
|
for cmd in (
|
||||||
|
["ip", "-4", "route", "show", "default"],
|
||||||
|
["ip", "-6", "route", "show", "default"],
|
||||||
|
):
|
||||||
|
if re.search(pat, _run(cmd)):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
60
src/automtu/pmtu.py
Normal file
60
src/automtu/pmtu.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import ipaddress
|
||||||
|
import subprocess
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
def _is_ipv6(target: str) -> bool:
|
||||||
|
try:
|
||||||
|
return isinstance(ipaddress.ip_address(target), ipaddress.IPv6Address)
|
||||||
|
except ValueError:
|
||||||
|
return ":" in target # best-effort for hostnames
|
||||||
|
|
||||||
|
|
||||||
|
def _rc(cmd: list[str]) -> int:
|
||||||
|
return subprocess.run(
|
||||||
|
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||||
|
).returncode
|
||||||
|
|
||||||
|
|
||||||
|
def _ping_ok(payload: int, target: str, timeout_s: float) -> bool:
|
||||||
|
cmd = [
|
||||||
|
"ping",
|
||||||
|
"-M",
|
||||||
|
"do",
|
||||||
|
"-c",
|
||||||
|
"1",
|
||||||
|
"-s",
|
||||||
|
str(payload),
|
||||||
|
"-W",
|
||||||
|
str(max(1, int(round(timeout_s)))),
|
||||||
|
]
|
||||||
|
if _is_ipv6(target):
|
||||||
|
cmd.insert(1, "-6")
|
||||||
|
return _rc(cmd + [target]) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def probe_pmtu(
|
||||||
|
target: str, lo_payload: int = 1200, hi_payload: int = 1472, timeout: float = 1.0
|
||||||
|
) -> Optional[int]:
|
||||||
|
hdr = 48 if _is_ipv6(target) else 28
|
||||||
|
|
||||||
|
if not _ping_ok(lo_payload, target, timeout):
|
||||||
|
for p in (1180, 1160, 1140):
|
||||||
|
if _ping_ok(p, target, timeout):
|
||||||
|
lo_payload = p
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
lo, hi, best = lo_payload, hi_payload, None
|
||||||
|
while lo <= hi:
|
||||||
|
mid = (lo + hi) // 2
|
||||||
|
if _ping_ok(mid, target, timeout):
|
||||||
|
best = mid
|
||||||
|
lo = mid + 1
|
||||||
|
else:
|
||||||
|
hi = mid - 1
|
||||||
|
|
||||||
|
return (best + hdr) if best is not None else None
|
||||||
48
src/automtu/wg.py
Normal file
48
src/automtu/wg.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from .net import iface_exists
|
||||||
|
|
||||||
|
|
||||||
|
def _run(cmd: list[str]) -> str:
|
||||||
|
return subprocess.run(
|
||||||
|
cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True
|
||||||
|
).stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _rc(cmd: list[str]) -> int:
|
||||||
|
return subprocess.run(
|
||||||
|
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||||
|
).returncode
|
||||||
|
|
||||||
|
|
||||||
|
def wg_is_active(wg_if: str) -> bool:
|
||||||
|
return iface_exists(wg_if) and _rc(["wg", "show", wg_if]) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def wg_peer_endpoints(wg_if: str) -> List[str]:
|
||||||
|
targets: list[str] = []
|
||||||
|
|
||||||
|
out = _run(["wg", "show", wg_if, "endpoints"])
|
||||||
|
for line in out.splitlines():
|
||||||
|
parts = line.strip().split()
|
||||||
|
if len(parts) >= 2 and parts[-1] != "(none)":
|
||||||
|
ep = parts[-1]
|
||||||
|
host = ep.rsplit(":", 1)[0].strip("[]")
|
||||||
|
targets.append(host)
|
||||||
|
|
||||||
|
if not targets:
|
||||||
|
conf = _run(["wg", "showconf", wg_if])
|
||||||
|
for m in re.finditer(r"^Endpoint\s*=\s*(.+)$", conf, flags=re.MULTILINE):
|
||||||
|
ep = m.group(1).strip()
|
||||||
|
host = ep.rsplit(":", 1)[0].strip("[]")
|
||||||
|
targets.append(host)
|
||||||
|
|
||||||
|
dedup: list[str] = []
|
||||||
|
for t in targets:
|
||||||
|
if t and t not in dedup:
|
||||||
|
dedup.append(t)
|
||||||
|
return dedup
|
||||||
287
test.py
287
test.py
@@ -1,287 +0,0 @@
|
|||||||
import io
|
|
||||||
import sys
|
|
||||||
import unittest
|
|
||||||
from unittest.mock import patch
|
|
||||||
from contextlib import redirect_stdout
|
|
||||||
|
|
||||||
# Import the script as a module
|
|
||||||
import main as automtu
|
|
||||||
|
|
||||||
|
|
||||||
class TestWgMtuAutoExtended(unittest.TestCase):
|
|
||||||
# ---------- Baseline behavior (unchanged) ----------
|
|
||||||
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_no_pmtu_uses_egress_minus_overhead(
|
|
||||||
self, _req_root, mock_get_def, _exists, _read_mtu, mock_set_mtu
|
|
||||||
):
|
|
||||||
argv = ["main.py", "--dry-run"]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
self.assertIn("Detected egress interface: eth0", out)
|
|
||||||
self.assertIn("Egress base MTU: 1500", out)
|
|
||||||
self.assertIn("Computed wg0 MTU: 1420", out)
|
|
||||||
mock_set_mtu.assert_any_call("wg0", 1420, True)
|
|
||||||
# get_default_ifaces should be called with ignore_vpn=True by default
|
|
||||||
mock_get_def.assert_called_with(ignore_vpn=True)
|
|
||||||
|
|
||||||
# ---------- prefer-wg-egress selection ----------
|
|
||||||
|
|
||||||
@patch("main.wg_default_is_active", return_value=True)
|
|
||||||
@patch("main.wg_is_active", return_value=True)
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.read_mtu", return_value=1420)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0", "wg0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_prefer_wg_egress_picks_wg0_when_default_route_via_wg(
|
|
||||||
self, _req_root, mock_get_def, _exists, _read_mtu, _set_mtu, _wg_is_active, _wg_def_active
|
|
||||||
):
|
|
||||||
argv = ["main.py", "--dry-run", "--prefer-wg-egress", "--wg-if", "wg0"]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
# When prefer-wg is set AND wg default route is active, wg0 should be chosen as egress
|
|
||||||
self.assertIn("Detected egress interface: wg0", out)
|
|
||||||
self.assertIn("Using WireGuard interface wg0 as egress basis.", out)
|
|
||||||
# Computed MTU: base 1420 - 80 = 1340 (clamped by min=1280)
|
|
||||||
self.assertIn("Computed wg0 MTU: 1340", out)
|
|
||||||
# get_default_ifaces should be called with ignore_vpn=False (because prefer-wg)
|
|
||||||
mock_get_def.assert_called_with(ignore_vpn=False)
|
|
||||||
|
|
||||||
# ---------- auto-pmtu-from-wg adds peer endpoints ----------
|
|
||||||
|
|
||||||
@patch("main.wg_peer_endpoints", return_value=["46.4.224.77", "2a01:db8::1"])
|
|
||||||
@patch("main.wg_is_active", return_value=True)
|
|
||||||
@patch("main.probe_pmtu", side_effect=[1452, 1420]) # results for two peers
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_auto_pmtu_from_wg_adds_targets_and_uses_min_policy(
|
|
||||||
self, _req_root, _get_def, _exists, _read_mtu, _set_mtu, _probe_pmtu, _wg_active, _wg_peers
|
|
||||||
):
|
|
||||||
argv = ["main.py", "--dry-run", "--auto-pmtu-from-wg", "--wg-if", "wg0"]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
# Confirm WG peers were added
|
|
||||||
self.assertIn("Auto-added WG peer endpoints as PMTU targets: 46.4.224.77, 2a01:db8::1", out)
|
|
||||||
# The policy default is 'min', so chosen PMTU should be 1420
|
|
||||||
self.assertIn("Selected Path MTU (policy=min): 1420", out)
|
|
||||||
# Computed wg0 MTU: 1420 - 80 = 1340
|
|
||||||
self.assertIn("Computed wg0 MTU: 1340", out)
|
|
||||||
# Ensure probe was called twice (for both peers)
|
|
||||||
self.assertEqual(_probe_pmtu.call_count, 2)
|
|
||||||
|
|
||||||
# ---------- manual PMTU still works with prefer-wg-egress ----------
|
|
||||||
|
|
||||||
@patch("main.wg_default_is_active", return_value=True)
|
|
||||||
@patch("main.wg_is_active", return_value=True)
|
|
||||||
@patch("main.probe_pmtu", side_effect=[1472, 1452, 1500])
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_prefer_wg_egress_with_manual_targets_and_median_policy(
|
|
||||||
self, _req_root, _get_def, _exists, _read_mtu, _set_mtu, _probe_pmtu, _wg_is_active, _wg_def_active
|
|
||||||
):
|
|
||||||
argv = [
|
|
||||||
"main.py", "--dry-run",
|
|
||||||
"--prefer-wg-egress", "--wg-if", "wg0",
|
|
||||||
"--pmtu-target", "a", "--pmtu-target", "b", "--pmtu-target", "c",
|
|
||||||
"--pmtu-policy", "median"
|
|
||||||
]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
# As default route via wg is active, wg0 should be used
|
|
||||||
self.assertIn("Detected egress interface: wg0", out)
|
|
||||||
# PMTU values: 1472, 1452, 1500 -> median = 1472
|
|
||||||
self.assertIn("Selected Path MTU (policy=median): 1472", out)
|
|
||||||
# Computed WG MTU: 1472 - 80 = 1392
|
|
||||||
self.assertIn("Computed wg0 MTU: 1392", out)
|
|
||||||
self.assertEqual(_probe_pmtu.call_count, 3)
|
|
||||||
|
|
||||||
# ---------- PMTU all fail fallback ----------
|
|
||||||
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_pmtu_all_fail_falls_back_to_base(
|
|
||||||
self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu
|
|
||||||
):
|
|
||||||
with patch("main.probe_pmtu", side_effect=[None, None]):
|
|
||||||
argv = ["main.py", "--dry-run", "--pmtu-target", "bad1", "--pmtu-target", "bad2"]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
self.assertIn("WARNING: All PMTU probes failed. Falling back to egress MTU.", out)
|
|
||||||
self.assertIn("Computed wg0 MTU: 1420", out) # 1500 - 80
|
|
||||||
mock_set_mtu.assert_any_call("wg0", 1420, True)
|
|
||||||
|
|
||||||
# ---------- NEW: --set-wg-mtu overrides computed ----------
|
|
||||||
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_force_set_wg_mtu_overrides_computed(
|
|
||||||
self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
--set-wg-mtu must override the computed value.
|
|
||||||
Base=1500 -> computed 1420 (1500-80), but we force 1300.
|
|
||||||
"""
|
|
||||||
argv = ["main.py", "--dry-run", "--set-wg-mtu", "1300"]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
# Computation is printed first
|
|
||||||
self.assertIn("Computed wg0 MTU: 1420", out)
|
|
||||||
# Then override message appears and applied value is 1300
|
|
||||||
self.assertIn("Forcing WireGuard MTU (override): 1300", out)
|
|
||||||
mock_set_mtu.assert_any_call("wg0", 1300, True)
|
|
||||||
|
|
||||||
# also test clamping below wg-min
|
|
||||||
argv2 = ["main.py", "--dry-run", "--set-wg-mtu", "1200"] # below default wg_min=1280
|
|
||||||
with patch.object(sys, "argv", argv2):
|
|
||||||
out2 = io.StringIO()
|
|
||||||
with redirect_stdout(out2):
|
|
||||||
automtu.main()
|
|
||||||
s = out2.getvalue()
|
|
||||||
self.assertIn("[wg-mtu][WARN] --set-wg-mtu 1200 is below wg-min 1280; clamping to 1280.", s)
|
|
||||||
self.assertIn("Forcing WireGuard MTU (override): 1280", s)
|
|
||||||
mock_set_mtu.assert_any_call("wg0", 1280, True)
|
|
||||||
|
|
||||||
# ---------- --apply-egress-mtu: setzt egress vor wg ----------
|
|
||||||
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.probe_pmtu", return_value=1452)
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_apply_egress_mtu_sets_egress_then_wg(
|
|
||||||
self, _req_root, _get_def, _exists, _read_mtu, _probe_pmtu, mock_set_mtu
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
--apply-egress-mtu setzt egress=eth0 auf effective MTU (1452) und danach wg0 auf 1452-80=1372.
|
|
||||||
Reihenfolge der set_mtu-Aufrufe: erst eth0, dann wg0.
|
|
||||||
"""
|
|
||||||
argv = ["main.py", "--dry-run", "--apply-egress-mtu", "--pmtu-target", "46.4.224.77"]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
self.assertIn("Applying effective MTU 1452 to egress eth0", out)
|
|
||||||
self.assertIn("Computed wg0 MTU: 1372", out)
|
|
||||||
|
|
||||||
# Call order prüfen
|
|
||||||
calls = [
|
|
||||||
unittest.mock.call("eth0", 1452, True), # egress zuerst
|
|
||||||
unittest.mock.call("wg0", 1372, True), # dann wg0
|
|
||||||
]
|
|
||||||
mock_set_mtu.assert_has_calls(calls, any_order=False)
|
|
||||||
self.assertEqual(mock_set_mtu.call_count, 2)
|
|
||||||
|
|
||||||
# ---------- --apply-egress-mtu: egress == wg_if -> skip apply auf egress ----------
|
|
||||||
|
|
||||||
@patch("main.wg_default_is_active", return_value=True)
|
|
||||||
@patch("main.wg_is_active", return_value=True)
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.probe_pmtu", return_value=1500)
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["wg0"]) # wg0 wird als egress gewählt
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_apply_egress_mtu_skips_when_egress_is_wg(
|
|
||||||
self, _req_root, _get_def, _exists, _read_mtu, _probe, mock_set_mtu, _wg_active, _wg_def_active
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Wenn egress == wg0, soll das Skript den egress-Apply überspringen, aber wg0 ganz normal setzen.
|
|
||||||
"""
|
|
||||||
argv = ["main.py", "--dry-run", "--apply-egress-mtu", "--prefer-wg-egress", "--wg-if", "wg0"]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
out = io.StringIO()
|
|
||||||
with redirect_stdout(out):
|
|
||||||
automtu.main()
|
|
||||||
s = out.getvalue()
|
|
||||||
|
|
||||||
self.assertIn("Detected egress interface: wg0", s)
|
|
||||||
self.assertIn("Skipping egress MTU apply because egress == wg0", s)
|
|
||||||
# wg0 wird trotzdem gesetzt (1500-80=1420)
|
|
||||||
mock_set_mtu.assert_any_call("wg0", 1420, True)
|
|
||||||
# Es darf nur ein set_mtu-Aufruf erfolgen (kein separater egress-Apply)
|
|
||||||
self.assertEqual(mock_set_mtu.call_count, 1)
|
|
||||||
|
|
||||||
# ---------- --apply-egress-mtu plus --set-wg-mtu override ----------
|
|
||||||
|
|
||||||
@patch("main.set_mtu")
|
|
||||||
@patch("main.probe_pmtu", return_value=1452)
|
|
||||||
@patch("main.read_mtu", return_value=1500)
|
|
||||||
@patch("main.exists_iface", return_value=True)
|
|
||||||
@patch("main.get_default_ifaces", return_value=["eth0"])
|
|
||||||
@patch("main.require_root", return_value=None)
|
|
||||||
def test_apply_egress_mtu_with_forced_wg_override(
|
|
||||||
self, _req_root, _get_def, _exists, _read_mtu, _probe_pmtu, mock_set_mtu
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Egress wird auf 1452 gesetzt, aber wg0 wird mit --set-wg-mtu (z. B. 1300) überschrieben,
|
|
||||||
unabhängig vom berechneten Wert (1372).
|
|
||||||
"""
|
|
||||||
argv = [
|
|
||||||
"main.py", "--dry-run",
|
|
||||||
"--apply-egress-mtu",
|
|
||||||
"--pmtu-target", "1.1.1.1",
|
|
||||||
"--set-wg-mtu", "1300",
|
|
||||||
]
|
|
||||||
with patch.object(sys, "argv", argv):
|
|
||||||
buf = io.StringIO()
|
|
||||||
with redirect_stdout(buf):
|
|
||||||
automtu.main()
|
|
||||||
|
|
||||||
out = buf.getvalue()
|
|
||||||
self.assertIn("Applying effective MTU 1452 to egress eth0", out)
|
|
||||||
self.assertIn("Computed wg0 MTU: 1372", out) # erst berechnet
|
|
||||||
self.assertIn("Forcing WireGuard MTU (override): 1300", out) # dann überschrieben
|
|
||||||
|
|
||||||
calls = [
|
|
||||||
unittest.mock.call("eth0", 1452, True),
|
|
||||||
unittest.mock.call("wg0", 1300, True),
|
|
||||||
]
|
|
||||||
mock_set_mtu.assert_has_calls(calls, any_order=False)
|
|
||||||
self.assertEqual(mock_set_mtu.call_count, 2)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main(verbosity=2)
|
|
||||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
0
tests/unit/__init__.py
Normal file
0
tests/unit/__init__.py
Normal file
14
tests/unit/test___init__.py
Normal file
14
tests/unit/test___init__.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
import automtu
|
||||||
|
|
||||||
|
|
||||||
|
class TestInit(unittest.TestCase):
|
||||||
|
def test_version_is_exposed(self) -> None:
|
||||||
|
self.assertTrue(hasattr(automtu, "__version__"))
|
||||||
|
self.assertIsInstance(automtu.__version__, str)
|
||||||
|
self.assertRegex(automtu.__version__, r"^\d+\.\d+\.\d+$")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
26
tests/unit/test___main__.py
Normal file
26
tests/unit/test___main__.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
import unittest
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
import automtu.__main__ as entry
|
||||||
|
|
||||||
|
|
||||||
|
class TestMain(unittest.TestCase):
|
||||||
|
def test_main_calls_parser_and_core(self) -> None:
|
||||||
|
fake_args = object()
|
||||||
|
fake_parser = Mock()
|
||||||
|
fake_parser.parse_args.return_value = fake_args
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("automtu.__main__.build_parser", return_value=fake_parser) as p_build,
|
||||||
|
patch("automtu.__main__.run_automtu", return_value=0) as p_run,
|
||||||
|
):
|
||||||
|
rc = entry.main()
|
||||||
|
|
||||||
|
self.assertEqual(rc, 0)
|
||||||
|
p_build.assert_called_once_with()
|
||||||
|
fake_parser.parse_args.assert_called_once_with()
|
||||||
|
p_run.assert_called_once_with(fake_args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
62
tests/unit/test_cli.py
Normal file
62
tests/unit/test_cli.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
from automtu.cli import build_parser
|
||||||
|
|
||||||
|
|
||||||
|
class TestCli(unittest.TestCase):
|
||||||
|
def test_build_parser_accepts_known_args(self) -> None:
|
||||||
|
p = build_parser()
|
||||||
|
|
||||||
|
# Parse only; no side effects.
|
||||||
|
args = p.parse_args(
|
||||||
|
[
|
||||||
|
"--egress-if",
|
||||||
|
"eth0",
|
||||||
|
"--prefer-wg-egress",
|
||||||
|
"--pmtu-target",
|
||||||
|
"1.1.1.1,8.8.8.8",
|
||||||
|
"--pmtu-timeout",
|
||||||
|
"2.0",
|
||||||
|
"--pmtu-min-payload",
|
||||||
|
"1200",
|
||||||
|
"--pmtu-max-payload",
|
||||||
|
"1472",
|
||||||
|
"--pmtu-policy",
|
||||||
|
"median",
|
||||||
|
"--apply-egress-mtu",
|
||||||
|
"--apply-wg-mtu",
|
||||||
|
"--wg-if",
|
||||||
|
"wg0",
|
||||||
|
"--wg-overhead",
|
||||||
|
"80",
|
||||||
|
"--wg-min",
|
||||||
|
"1280",
|
||||||
|
"--auto-pmtu-from-wg",
|
||||||
|
"--set-wg-mtu",
|
||||||
|
"1372",
|
||||||
|
"--force-egress-mtu",
|
||||||
|
"1452",
|
||||||
|
"--dry-run",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(args.egress_if, "eth0")
|
||||||
|
self.assertTrue(args.prefer_wg_egress)
|
||||||
|
self.assertEqual(args.pmtu_target, ["1.1.1.1,8.8.8.8"])
|
||||||
|
self.assertEqual(args.pmtu_timeout, 2.0)
|
||||||
|
self.assertEqual(args.pmtu_min_payload, 1200)
|
||||||
|
self.assertEqual(args.pmtu_max_payload, 1472)
|
||||||
|
self.assertEqual(args.pmtu_policy, "median")
|
||||||
|
self.assertTrue(args.apply_egress_mtu)
|
||||||
|
self.assertTrue(args.apply_wg_mtu)
|
||||||
|
self.assertEqual(args.wg_if, "wg0")
|
||||||
|
self.assertEqual(args.wg_overhead, 80)
|
||||||
|
self.assertEqual(args.wg_min, 1280)
|
||||||
|
self.assertTrue(args.auto_pmtu_from_wg)
|
||||||
|
self.assertEqual(args.set_wg_mtu, 1372)
|
||||||
|
self.assertEqual(args.force_egress_mtu, 1452)
|
||||||
|
self.assertTrue(args.dry_run)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
94
tests/unit/test_core.py
Normal file
94
tests/unit/test_core.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
import io
|
||||||
|
import unittest
|
||||||
|
from contextlib import redirect_stdout
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from automtu.core import run_automtu
|
||||||
|
|
||||||
|
|
||||||
|
class TestCore(unittest.TestCase):
|
||||||
|
def test_run_automtu_happy_path_all_mocked(self) -> None:
|
||||||
|
args = SimpleNamespace(
|
||||||
|
dry_run=True,
|
||||||
|
egress_if=None,
|
||||||
|
prefer_wg_egress=False,
|
||||||
|
force_egress_mtu=None,
|
||||||
|
pmtu_target=["1.1.1.1,8.8.8.8"],
|
||||||
|
auto_pmtu_from_wg=False,
|
||||||
|
pmtu_min_payload=1200,
|
||||||
|
pmtu_max_payload=1472,
|
||||||
|
pmtu_timeout=1.0,
|
||||||
|
pmtu_policy="min",
|
||||||
|
apply_egress_mtu=True,
|
||||||
|
apply_wg_mtu=True,
|
||||||
|
wg_if="wg0",
|
||||||
|
wg_overhead=80,
|
||||||
|
wg_min=1280,
|
||||||
|
set_wg_mtu=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# PMTU probes: 1452 and 1500 -> min policy => 1452, effective=min(base(1500),1452)=1452
|
||||||
|
# wg_mtu = 1452-80 = 1372
|
||||||
|
with (
|
||||||
|
patch("automtu.core.require_root", return_value=None),
|
||||||
|
patch("automtu.core.detect_egress_iface", return_value="eth0"),
|
||||||
|
patch("automtu.core.iface_exists", return_value=True),
|
||||||
|
patch("automtu.core.read_iface_mtu", return_value=1500),
|
||||||
|
patch("automtu.core.probe_pmtu", side_effect=[1452, 1500]),
|
||||||
|
patch("automtu.core.set_iface_mtu") as mock_set,
|
||||||
|
patch("automtu.core.wg_is_active", return_value=False),
|
||||||
|
patch("automtu.core.wg_peer_endpoints", return_value=[]),
|
||||||
|
patch("automtu.core.default_route_uses_iface", return_value=False),
|
||||||
|
):
|
||||||
|
buf = io.StringIO()
|
||||||
|
with redirect_stdout(buf):
|
||||||
|
rc = run_automtu(args)
|
||||||
|
|
||||||
|
self.assertEqual(rc, 0)
|
||||||
|
s = buf.getvalue()
|
||||||
|
self.assertIn("Detected egress interface: eth0", s)
|
||||||
|
self.assertIn("Egress base MTU: 1500", s)
|
||||||
|
self.assertIn("Selected Path MTU (policy=min): 1452", s)
|
||||||
|
self.assertIn("Computed wg0 MTU: 1372", s)
|
||||||
|
|
||||||
|
# apply egress and wg are both true -> two calls in order
|
||||||
|
mock_set.assert_any_call("eth0", 1452, True)
|
||||||
|
mock_set.assert_any_call("wg0", 1372, True)
|
||||||
|
|
||||||
|
def test_run_automtu_does_not_apply_wg_without_flag(self) -> None:
|
||||||
|
args = SimpleNamespace(
|
||||||
|
dry_run=True,
|
||||||
|
egress_if="eth0",
|
||||||
|
prefer_wg_egress=False,
|
||||||
|
force_egress_mtu=None,
|
||||||
|
pmtu_target=None,
|
||||||
|
auto_pmtu_from_wg=False,
|
||||||
|
pmtu_min_payload=1200,
|
||||||
|
pmtu_max_payload=1472,
|
||||||
|
pmtu_timeout=1.0,
|
||||||
|
pmtu_policy="min",
|
||||||
|
apply_egress_mtu=False,
|
||||||
|
apply_wg_mtu=False,
|
||||||
|
wg_if="wg0",
|
||||||
|
wg_overhead=80,
|
||||||
|
wg_min=1280,
|
||||||
|
set_wg_mtu=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("automtu.core.require_root", return_value=None),
|
||||||
|
patch("automtu.core.iface_exists", return_value=True),
|
||||||
|
patch("automtu.core.read_iface_mtu", return_value=1500),
|
||||||
|
patch("automtu.core.set_iface_mtu") as mock_set,
|
||||||
|
):
|
||||||
|
buf = io.StringIO()
|
||||||
|
with redirect_stdout(buf):
|
||||||
|
rc = run_automtu(args)
|
||||||
|
|
||||||
|
self.assertEqual(rc, 0)
|
||||||
|
mock_set.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
49
tests/unit/test_net.py
Normal file
49
tests/unit/test_net.py
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import automtu.net as net
|
||||||
|
|
||||||
|
|
||||||
|
class TestNet(unittest.TestCase):
|
||||||
|
def test_detect_egress_iface_parses_default_route_and_ignores_vpn(self) -> None:
|
||||||
|
# Simulate: ip route show default -> dev wg0 first, then eth0
|
||||||
|
ip4_default = (
|
||||||
|
"default via 10.0.0.1 dev wg0 proto dhcp src 10.0.0.2 metric 100\n"
|
||||||
|
)
|
||||||
|
ip6_default = "default via fe80::1 dev eth0 proto ra metric 100\n"
|
||||||
|
|
||||||
|
def fake_run(cmd: list[str]) -> str:
|
||||||
|
if cmd[:4] == ["ip", "-4", "route", "show"]:
|
||||||
|
return ip4_default.strip()
|
||||||
|
if cmd[:4] == ["ip", "-6", "route", "show"]:
|
||||||
|
return ip6_default.strip()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("automtu.net._run", side_effect=fake_run),
|
||||||
|
patch("automtu.net.iface_exists", return_value=True),
|
||||||
|
):
|
||||||
|
# ignore_vpn=True -> should skip wg0 and pick eth0
|
||||||
|
self.assertEqual(net.detect_egress_iface(ignore_vpn=True), "eth0")
|
||||||
|
|
||||||
|
# ignore_vpn=False -> first seen is wg0
|
||||||
|
self.assertEqual(net.detect_egress_iface(ignore_vpn=False), "wg0")
|
||||||
|
|
||||||
|
def test_default_route_uses_iface_true_false(self) -> None:
|
||||||
|
ip4_default = "default via 10.0.0.1 dev eth0\n"
|
||||||
|
ip6_default = ""
|
||||||
|
|
||||||
|
def fake_run(cmd: list[str]) -> str:
|
||||||
|
if cmd[:4] == ["ip", "-4", "route", "show"]:
|
||||||
|
return ip4_default.strip()
|
||||||
|
if cmd[:4] == ["ip", "-6", "route", "show"]:
|
||||||
|
return ip6_default.strip()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
with patch("automtu.net._run", side_effect=fake_run):
|
||||||
|
self.assertTrue(net.default_route_uses_iface("eth0"))
|
||||||
|
self.assertFalse(net.default_route_uses_iface("wg0"))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
36
tests/unit/test_pmtu.py
Normal file
36
tests/unit/test_pmtu.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import automtu.pmtu as pmtu
|
||||||
|
|
||||||
|
|
||||||
|
class TestPmtu(unittest.TestCase):
|
||||||
|
def test_probe_pmtu_binary_search_and_hdr_addition(self) -> None:
|
||||||
|
# Mock _is_ipv6 -> IPv4, so hdr = 28.
|
||||||
|
# Mock _ping_ok so that payload <= 1400 works, >1400 fails.
|
||||||
|
def fake_ping_ok(payload: int, target: str, timeout_s: float) -> bool:
|
||||||
|
return payload <= 1400
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("automtu.pmtu._is_ipv6", return_value=False),
|
||||||
|
patch("automtu.pmtu._ping_ok", side_effect=fake_ping_ok),
|
||||||
|
):
|
||||||
|
# lo=1200 works, hi=1472 partially works -> best = 1400 -> mtu = 1400+28 = 1428
|
||||||
|
mtu = pmtu.probe_pmtu(
|
||||||
|
"1.1.1.1", lo_payload=1200, hi_payload=1472, timeout=1.0
|
||||||
|
)
|
||||||
|
self.assertEqual(mtu, 1428)
|
||||||
|
|
||||||
|
def test_probe_pmtu_returns_none_if_even_floor_fails(self) -> None:
|
||||||
|
with (
|
||||||
|
patch("automtu.pmtu._is_ipv6", return_value=False),
|
||||||
|
patch("automtu.pmtu._ping_ok", return_value=False),
|
||||||
|
):
|
||||||
|
mtu = pmtu.probe_pmtu(
|
||||||
|
"1.1.1.1", lo_payload=1200, hi_payload=1472, timeout=1.0
|
||||||
|
)
|
||||||
|
self.assertIsNone(mtu)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
58
tests/unit/test_wg.py
Normal file
58
tests/unit/test_wg.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import automtu.wg as wg
|
||||||
|
|
||||||
|
|
||||||
|
class TestWg(unittest.TestCase):
|
||||||
|
def test_wg_peer_endpoints_from_wg_show(self) -> None:
|
||||||
|
# wg show wg0 endpoints output
|
||||||
|
out = "abcde12345\t46.4.224.77:51820\nfffff00000\t[2a01:db8::1]:51820\n"
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch(
|
||||||
|
"automtu.wg._run",
|
||||||
|
side_effect=lambda cmd: out if cmd[:3] == ["wg", "show", "wg0"] else "",
|
||||||
|
),
|
||||||
|
patch("automtu.wg.iface_exists", return_value=True),
|
||||||
|
patch("automtu.wg._rc", return_value=0),
|
||||||
|
):
|
||||||
|
eps = wg.wg_peer_endpoints("wg0")
|
||||||
|
|
||||||
|
self.assertEqual(eps, ["46.4.224.77", "2a01:db8::1"])
|
||||||
|
|
||||||
|
def test_wg_peer_endpoints_fallback_showconf(self) -> None:
|
||||||
|
show_endpoints_empty = ""
|
||||||
|
showconf = (
|
||||||
|
"[Interface]\n"
|
||||||
|
"PrivateKey = x\n"
|
||||||
|
"[Peer]\n"
|
||||||
|
"Endpoint = 46.4.224.77:51820\n"
|
||||||
|
"[Peer]\n"
|
||||||
|
"Endpoint = [2a01:db8::1]:51820\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
def fake_run(cmd: list[str]) -> str:
|
||||||
|
if cmd == ["wg", "show", "wg0", "endpoints"]:
|
||||||
|
return show_endpoints_empty
|
||||||
|
if cmd == ["wg", "showconf", "wg0"]:
|
||||||
|
return showconf
|
||||||
|
return ""
|
||||||
|
|
||||||
|
with patch("automtu.wg._run", side_effect=fake_run):
|
||||||
|
eps = wg.wg_peer_endpoints("wg0")
|
||||||
|
|
||||||
|
self.assertEqual(eps, ["46.4.224.77", "2a01:db8::1"])
|
||||||
|
|
||||||
|
def test_wg_is_active_uses_iface_exists_and_wg_show_rc(self) -> None:
|
||||||
|
with (
|
||||||
|
patch("automtu.wg.iface_exists", return_value=True),
|
||||||
|
patch("automtu.wg._rc", return_value=0),
|
||||||
|
):
|
||||||
|
self.assertTrue(wg.wg_is_active("wg0"))
|
||||||
|
with patch("automtu.wg.iface_exists", return_value=False):
|
||||||
|
self.assertFalse(wg.wg_is_active("wg0"))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
Reference in New Issue
Block a user