From e5d553f7834da3746c134ec92df2f9bd5d342009 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Wed, 15 Oct 2025 20:46:28 +0200 Subject: [PATCH] feat(wg-mtu-auto): add --prefer-wg-egress, --auto-pmtu-from-wg, and --set-wg-mtu; refine egress detection & PMTU logic Refactor helpers; allow preferring wg* as egress when default route uses WireGuard; auto-discover peer endpoints from `wg show`/showconf as PMTU targets; add explicit `--set-wg-mtu` override with clamping; improve default-route parsing and dedup of targets. Update unit tests to cover prefer-wg egress selection, auto-pmtu-from-wg, median/min policies, all-fail fallback, and explicit override behavior. Conversation context: https://chatgpt.com/share/68efc179-1a10-800f-9656-1e8731b40546 --- __pycache__/main.cpython-313.pyc | Bin 12383 -> 17752 bytes __pycache__/test.cpython-313.pyc | Bin 6833 -> 10096 bytes main.py | 225 +++++++++++++++++++++++++------ test.py | 183 ++++++++++++++++--------- 4 files changed, 299 insertions(+), 109 deletions(-) diff --git a/__pycache__/main.cpython-313.pyc b/__pycache__/main.cpython-313.pyc index 7bf40492f4a258f8e1e3a1c2bf2aed01b539dade..a142012f92f594175fa6e9b0e7957444379e78c6 100644 GIT binary patch delta 8718 zcmahuYfu}>nKRPpf!;zAAfCg^Ah38W_yIPyv9Yl+4_kwbZDGqo8nD(Nu`>cMT5rTA zx3$>2SsmAzalm#s_Hfuh$Sz?*8lVE>|`Y#BcBqsVLHoPye`B z6OH9D+Ct(*k~1^9;ao;PY-yz!1N2awl`$3)oQ=t6Od>=cQ^1(TR(=+2t_WL@?y(Q& zV)TH+BK8+DR=`m-T+GwBa?Yz`PIioAc^xt!@6mjRPBM?ZU2i3h$QPDf>y`dfUlF z+sU}=%wxGsr~DVC?zPc*_jUK1Lt6@&!iM^&ha`=5DxQ#4s7%?27|Mq7wNDl0fUp*s zbao}Pmeag?8a3+dsu$LCn&BS}=su(bS#T0L^&R96bVXmLsRwu`=A;9CRbN{$y3DS) ze|Y3w`TgVXvOjaj^H0W^p``jOnlu)nf7AcF^S2}bJCuSf%nPb9Ho{H?16(ci337gh z=ajfbu`dTB4w9=wCSwWNdcVQgOXhaKNIN|I0jOq)Eg6O0GVM4gQxrWi<}L)Tzw*Ai z5Q|pyN8Sh!UvxGRtHw^5{5_I{+7Dfsf)`p2&!=xg^);fA2*JaUr%9+~r^!~Cg*d(& z#gxTX%or{K8A0^r6NH;^$x))%LdsBQp%wj1p{93XfuPVt-(G^y!rM($5e|93L%~%- zKN8d>G2Hm0%(xjU)?$Teb^eaS6%Y@@JF>8gP>?m8d*9B2`uyBSTJAVY{2m zz?~=XI>^O!pwjEm54b3gHiZ>%P?EkwiZo5dpQYLGJet2(mH3rz5*YUaYL#0hk@w{R zxQbEW?H(`@gh!tts*;FGu%bOw06Zb}_)1|$T6ltR8PIz&YXhTnt0e9+fB@77>vVQ{ zA81oV1TwmnDcyK7q`;nhX?s$FJ*oPm`r(lPJ3Y+T*A6rF4z3Z{b5Np&-p(oN7A=p%PDD^l_#*zx4z-{Oj)lS;>z$qq2^7%c z1q$1hKu(BO$Q{Gb)Kn<=l0XJ{HT+C}YB=k4xW-OHWd?0I0Sd7ML7C&(=gx~H+mhF8n)dhd8Y)L9>D zZQrW6UHgsNcxB7#>Gi@riNZb5y`j~8xg$gJ?f%Dl!jQXV07|nbIaq2?!PPNua8yuo zY-EZH`7?G7V;OIO9nVGB3ER8CO|+5>c@6@G+JiTzSfmhP$!rqIdSDr4{mfwpXa5)*Vqc!* z4hS_d(2nG8g1pz~j|8XLhW+DVzi*sB(2#)~!&q=g`onL*4B`g=`+Xxn8+?nKpSdx! zbo?FH-E-eQw=$Ki-W9Lfy*d)Nx4qlCZafq>9s;=k;3=5+rTqs$rM141q`{xm`s0*eTr`Y6)(d5ywxc2h>Z|Ay6GKT(OISg@k zpS8G5bDZ@BSgxI}e_=0Od*ak|t79i!-oB^6A!?w)N1Xhs zCB?e7G@&isFj_aY`nkx$nI(7JP?^+Lt&j<=BThL|5vw!~v_2oP7?P2>A-XhDXkJ;b znn}edDU72rZSgS(G&mfKWjoM-G(Sl@>KDLKQ;Y(81V-6@HQfuVBlU(9ZUy?4%4SnB zYH5EmIT${qbSr_F&8U&I1U;lnj{_q5v08WXm9(?p4u@Zbh=R>tEWo}tTgT5tt{6b# zV}e**CLDWAjLxmf>dZy=N-Rcwn;d)#?5L31t#;XPpIV(}2puZ4;S&ud31RxY5CD<) z=lE|sm*O7|Q#oREtpXw%wGFBT3hgb&(PLvP>D~GK$D)U?r)cob}Rev}%8nsC+feVQa*%>zblf+_JG7~g+J$j|q~Jot3P7a6A;Z z*Ye4@p(&|po;|W*vfMN*8XlPFIqDHaWX+-m>DEmZ2~$PVR5hpAgeg;RwlA5#d~j)M z*>!7X<@g6>tsCaN1#ZzkcX-2MyVv(BWj1cvnKU}*sE1nPrY--b=PRCgIYf}3 zbN_hly=!Z>!3A=|mUnY#acJo@I=^nKPS~oGwweXmM!x;#%;LLGmus8LLA>$s5 zj0uC<%pPnsV%a`=N>d7$T*e|zkvs@vWo+m@dyy^g|DKx96#U6ldupnxkSSt{Qw*9* zw*3WD%9N!b%=9*Lrktrr_f}^0Rx#D7Ui}U@zt@~5nOerd)M3_C^`L;ADf~uf)|>{W z5ld}s%Ia%|zI%lMG*OZx(-Y{w3-i?>*}xCMO%|YVabaOSTq`_9l9JqV=7lygM5Tv| z%-}&iC7DsuRf?MB7W7}V$qZ)$PxnR*5eSqIhxhkQ@Mv55pX&+4E z{%Y(p6)oZ%jQyu&sxCYrxj7&BdI$zB*yRt94BU*|GHEOT_Yn_{W@2GlJ<5i_ zyV_(lk=45sJ8}cFTUu`zY()Q5R93c)WRF`0=fS-#@~mn92eO?^TNbiyH0>BoOXi3P z5~Xxr)C@eHEqCEV!{>^a6|p}f(LPZvG1_L{-;y~Dh`2-r=gY)7hhWb0mdR)hfalLr zP=|!hc;HXv%Mq0Y5KUkjWyWn$4vKxiMA;!xsLX{CwAUiZ03tQP3hz4IB=u~+Ly7^d zM>@n4aHUPJe@KzVTc=y`timAf;ds6qxG7Ry1|z`jS4->)66BywUPMc-ktE4+=Sa5agCk~^Tt}EL*AdKZQI5prvvNw(?8=PO#6WcxhNIFv z=(lI}9}~HNb!qN?#yFH2PQb`KMS zk4w^MGP4b6p}e$GAt3?lRD$wH;IN|e;Mgpdv+ud!1Pu=0lvuk&N1`iJpVVg|Kk3%N z*6VfaGXlVw!L5f~-v=toGP`XE@wR^&ptLSG7OGLd%|~A#y|~&8zWtyOd=NpkbMYrS|sw>>bxd zG(eY`jaldZH&JZu9nb7_JXs>X{SIY|b`A@a?GQcqDkc_~ zEkzbM`&!G&~{oDB&1E#)`jD&MwksLF{`>yY4440U@?924YMQCnrB5_?GQ8z@wHMlVp2nMqd6%0f@Pa**sp)m3Ni z->6cMrfV4PRa}kY>KeLT?Y5>S!|!O0(;47qoZM6hS$EXNqB^IOXCs&*WbdPc$HJVS zrF}F%$@+t%K|d|B8>W4elOS9;L`N>OS@OZ1hUG?mFr_v;&2k(p08jhF6O)kU4baoR z@hKMTk-LLg5p|gBbdF*gsc#1;S<$T|xl=HV_V4IbIx*Z!= zmuS?SwbCO#KFBvjjnX{NP^YW^0v&Cld3K!jV}`cPVq_|kjghBA;Rv116-A2y1^V5N zgRwmlE*dYg6V?`x40ZoM%i2yyWjpEU;mqU{{l^B{>HhF@rh>KkvE~48Etny`zcosi zI7fGuG;|6{w1p04GrN!GufV+#D+qssdDgt}w-}(=fSffIh^e8`)tY-TEF50z3u*mi zuRje61@t2K^7ZEp@|bH((FcncL1YdA4FfpvNrkKq8tK`w+4Ki6M0@q_0zrxP&X?&C6@t}W( z!znM|8B+jFdMAA|AXq>!p_sdcOmUb5nGbHxSboTY zieK8qjUp)g;M90Q4F=C*NI{n+1f;>?o<1RvFAEw#l=d`&o*f;9JpidtaRxOO4hOie zW9rkGNu?hy=xoR+C{x6F6ZFzFx=~lCO+fQKc=#zL`a#|9`qknMJHI6-6xPQCshA=k zQOY+4UK@y89jiMx>@Q|U#>q!$tInpIO4b~W_fF3aB&e&%Twj=X?m}|k#pE9E7S7C_ zmCX)3hQ#$b)LCC-n@;XNlYC(aM!e+l^vDVHQhg|P^cPbbc5eo`Pa?Z;azz*4=f~Is zX-*dUzh?mqtvzW3CuJEHYrn_G+oBlw6-fNgcU8L%m>rOS?(s z#C1)pd8_KUrfrRC7ujA{6~q5>-VfTp+ny}ng(sR?R=a-GbFXK0fBfL7xZ(7g+O?t5 zz7c&bx^O0`Dtf1P)%v5tdxh&&`x90BVZwrOnYydFqglEZulK|a!)yN0b^k=dKk>dg zB!8sRqPsi225a7IFJM_$<-`95vO7CtY5FDWsuIY4E>X8Nw>0b8%7nIZ(_XjIDsp7s zJ=gQlXx}v1;spcoQ-kZL&LvJk`Wfe*;{&7dysas?z-D^r)PP8ym2I+>tE9aK7+-wZsr$#%ieXn?^fT+&SdePWd7bc9k637 zneBV1vY>#|-Mk62dza12^5qx4-+afn+7PeqjOQL+(;fNPXqtN|^U>z(2KZK!Tl$#> zed@fKlIe}uYq5l?Vnb`3KY0BhESu++C#kIkjB#rdD6i~ZS-hz;-sysG$Lq|+1amR& z^2WUr@esEjnn{Fa;uFz$^p$w-wKd(V8>JPuuiUya*Y(x_Hm|b9vN@%=O8eeD_U_)e zp=(Wj6cSo$>q6Jf9)udc6uUwakc05OfeJoHW1r$E5ma7pAnf;gx$ohzy|`+^RUWRyk1SunEj6x0Cy8^)sf>3aZuMa& z$%*a~e=12O_q%aRbi8m61a~+R4v&kTS4{hhUXK$U+v^zc$GF0NlS`l(*K~7qq=oDnp6RVOmKh3p~rmZF!86cPCTLgZuP*27869Yw7JvCV2ru(!Zhiu*| zmXmc$00ggPfPvSRu9UPaFu_pvMA z@1Do^_?_>3=X{@6pU0n^DSB))>JU8tUi;TjbUQ+Sp&j{QHXD!c%oa^L^_hq_b0*G^ ziH0$6;mn*-YNc>VFk|gY<&x`q5b|me>a*sdJ)6QdG{_7xcFXv$Nilvk|NNk5G(7I{ zj)y~KV;6Ncn=tMR*~)e&jFUg(LVaRi&m8?)KZl%WN{wHUTKtrG5jP}*RiO&V6k5}# zzSQ)5(|rDd`%c}P&967#t6MpGYWe7?h;L|FA6U_kE$hct^b^baiKzZUguOt1$$pG2 zWLmj_Q^_UOZs+C-q>mb?0^Y~N;FX`E-6#kzhr2LPrVCdyDX7hlP!RWGSDVCaW{WqueK6LMkB}R=wB%fIih_0N$+^4~54&emq5QR> zj)Ms45qt+@UpLy)auQKkN(q)~6V|b>!;&=xnM7F3|AkfdO<0}Z2J7h<9Po|Ojldt> zU|8i=$>6+|phv~2X_WYOA=Jyto^lz;a!zW8#;uYu$TB_?@X<|`9+=;B#@0M_TN6S( z{(6{^oCwZwE5V7*UD-q6BQQpN7%4zI8dWD2e7jZ>|nsrr7+ z$!K!#v|=sEJd^*z(6nOJnl^WA_Si#f!F21I$ug(?u{M%fabq6~~BS=LoxYw9Q|&(0$6s6KDq3rj1ZYuqQTaZPKSNnTLSpI`7rl1gv3JkXZk zcCIB`W-8`tW^2g4xu)5sXi9!Gx!?;0(p9Wz%i%}9Q2#Nu!Q_Fav7{a}i3%ToQJ^Q= zX0!1$(nPi357SGE-w!>AeIzuHdTnu9_rk%Og?ALUTNX7-sY~OLw1an#M>Vd+OIj1& z#^(Z~T_rLhK~xX=!~XGrk8dQ`blY$a;w#A3Kc5LYawpywaf)=)Fol%f#!_FeC z%vuHTzAKmRYckgr(EU=NV7})lN4JxmjN(0vfi4bOQ*kjh6$>xMy%+du z+M8#Q$5elA798{@ZGAqdH%JACH8IZ{gijR?kVma zjJSHE?S0XE!cJZNz$0^ z!1?6a^eW3R48b`>!q0$Aq0dZdi*KaQN#-#5eQqj~q$3~Y`jUE8FfKiun$wVv^0O4$UJXgo zWiu`#y?B##oL;_-I0I*-EZQwGB4(6qGNvVSCYe#bo*+3|h!X@-WIKrhRxb4$0^4E& z>NGB$%iuDpHFH)kJ^e9|*<4PH!@ONqi4eDgvu%#%CXD8B`LR)B0VIRMF3j!ZinwA5 zNNwjzxKhyn-vZy|M!a>6!Zl|@y&1AOIeE+TUL#z?-AfgDE_#{9AF@!Uv~Xp$3|Nly*k~#+?v_0XbNS>{(T<$B#&S24gt&?-MO=!^ zmRbLgs?y2?RdFR%R7n*RpL1hZ0WsTmB*%HGrLpDW)E#>y?yZ~$po-*m^yGpZcBGPN zyDg(Y9*uv4icWp6oxEpHu`3eXQsY*9&AuYYN|_V8N=R2>mU;;C7;R^jWGGdT(HxM` ztc+_n4bGgsGVm1CtrnQwNT3CX9ih;9R|PSvGs)EwvqHVu{-DHItCl4oC&Iyqi||Y0k;&^S8a{c3=0V0v;}p&^sbhko@=gs z)H%3Lq78s7C7+GW1@%2kZXNZbTfc?F3_Y!@nc58KWt-sc_@*?tb%3EUK?Z`5YjW!y zaH1g=o58IK8`TlmT&s4=Mgu~<&KL$L9A;rHfQtM`ueJmY35NE|%e?G#8@{Tvw`EJb zK|aYr7WO#~^FRW4dJ{?yXgnmhTMnqpW4f*2Q)7ba7Ep79`d{DbHi9n> zLcfjnm4~PS+RCc0(k1}Nmz7GAxjXX^_VQ^CGa6lE{;Kzeds)a+m-xKD?{p8g>#pG&DyXpWH^pbznoFc#5 z+d}@ZFPC9*XQ!s@BG1k68%Ij|9o#an-b}LQ5d@B%9?GH`_cz}^dw%4bX(Jh8D zlfUSpX0-l69uk}E|jc=rc#s*LG68{vLt@acAVtktui&Q@k!ajX+fvt%080?rK-fPj;tQf0mIk#nj11aBA)koD z`~U^1>K_UT{$R|0cwC^_jo`*i{S9bP4q=!jI~!{*-x4CVlT)3`?4?y!^@GmucFr8W z&fl24HhJT@YtP*tT=GYYS|dltDaVA&(RnNXj_`KnJ9Tf>z0>qo(_KfTr!QK6D!TVH z6vYGh1T)q72q9$;BMTh4&fUqsbN0>Q*N5K>z8-wH=k7qX#vQHfUWa6N8b7ru&_*;( zK}E}5H@Vo9+X$ugOO00=Uud3c{fsruSm&~5vmdbAp;%3_ymb1?=@sM22lX8*^_|Q0 zoeyl4QCs!xuKTvV5%)7whdw5w%`M3fPMlmh;kkdp6FK93aN;caZF3gcxBr=x)SRh~ z)s)n!_EoihMV$t}Psz*sow_F~q_M7AtkY_8!RfAB&B&Z_UODn{vT4<1ozu_i=bb-E zS~Hku+84|ViiOJG%syJ zn-5FUk2AV^Dl1S?r+4Bvu2hF1sy zfvOneN30IBUu;A8FkV4zvEK%Z@kP9$!uX6t8#YuXY}zPbaN&lA#T?!+D6n-y$KtFF e1B(kjH>BX2&r5W;Vk7-9#+UJWH$$8U)BXqI^x^aX diff --git a/__pycache__/test.cpython-313.pyc b/__pycache__/test.cpython-313.pyc index 24c8d261cd369edc17f6a2d541c88dcde87706c8..953acee2e51d7556262e3c18fcdb33e57bdf26db 100644 GIT binary patch literal 10096 zcmeI2U2GFa9>8~Puf1N|alQ!PFi2F5u1p0)|1>Tc4DV__N0UG70ga1WZS}oxQmvaVx#ly{eV@^6XBvO zWN-Z$!-R_%2HF;6+ZZ|M$r0jPSdkfE2iRKYW31?|Wi@U%tVRYQ<0W!5M&yVpDQYyC zQX|Hzcz0(C^RK~}*O`k9eg%91ORN`d(6{9PtGA#fYsePjS}ZhghjEQOA9B{QW=ROS zFduSnVGd-vU+qpf+NUR{&w`x%QYpOVSWBb`71mkIIzx`4wac+oPGYBW6^BYfr8zz4 z%2pOC4|$<(Q)XV4%DSEztJt`thUCPt;!Kv+F`><8+2Kpfb@ZX&BGl8{Ei@syk{qHW|WF-29RSbsPopG$9|&jz5EsN5e;#Z|pR z(QL9h7|6UrnFmW2_S3uf;p)jOi_EUEK=~x@v|;7o0v{l#KdF zDxulJWMCw{kkA13+2J& z&~OUhyF3ucJa23WXjUjb{tc|=b9Y!xhz%r?untFt6SZ!F>rS&L`+foAX;x)a(JZl~ z#)niAOAMSiO>m8AjxcN-*b$lyMb>y&>2xz_d{|LrqMk@-UOgv;6Qfcj9FJ?onFm;q zqtfM=I#}z}Y>FK3*PIe;2D%Y6_fRr&Ny=;;%^~Rn_P7vBZchC;DRK5~ShCU|o+qoK#|JlEh@CZUBTy_4y;op}cJ` z5BT6?efZIJDzL*pJgU`TMoJ{5VW^T)imXWbZzB!G5-CMWj>u#Xz@fG0rHy}$ujA(_ zpF{F4Gh4_MZMoSt>8%;F&3KBhUi|gNk6*Ysa!Z}|?3*brz479;7eDlk1!s!HTP4#) zyJiYE-Ed!Xe^l_)$`tQ@$}t7LS%L8uUyZ*X{~$4D{f2j6^}O$yEN{A7@{n&gi@w{+ zLw>(m^p9H~@{Ql{rH}aPDZctv@9ksb)erfm@9{~^6kjvmILX(1#Wz1Kfni|W1v(VS zw4l5Ia(_}4+-bdUDGwHL_bYhFPZSBk9nOiZ4$SWqf&oW*GhO(X2ePzOq;N!yjTm%= zs+jSJAU#MSB#~T}nasQgJMcOKij_UfCirzQ(*k6`)eqOYnCZ-6rt>FargKGR3d``& z+AFNeOxqgTaEsbl@-h=lzg1}%UzLWb3=R42T8^N~As$E;ihaIeBKOP02U}6!W(y7< z7GnK$DNqb7Y+nJBK)kfSR{@BL=RxZpPlZV|i&4tQuwvj9Fd9?*dGWcwG0;WOOQ>~G zR+3Vb614~^#{@M6g(RuK+-6L+V6qjHZIHl=+c6LB3`0arsxa9JiN;GAu4x`AchPGF zSOjAtvoGlPl4^W!7bFU5ZG(DnC%dt;7RnfX;_&qhpN3o z3qzH5dIe~0q(hWP`8_1>GC!0ug=g63g0SxmguQeZAa&4WR@a}Q_^Mf}BPMdoM#e$0E|DOMKMK@S#=V>nF(}fE;-d#K>YU-Gy$8FB}D@MLd68#5E|IYfuD= zodF9_RKaqTRi#zTS(E$3oHd!Q4#{!dsyq``2hltf_Y5cFvB+pU*f6zVLxQbHd@%T9 zwC3;V#^Jhvr@3^iuk-CxIIan3`$>^xDxrdWSDLi zfH??x{BUI{2~la}kT8S7CW=Bd;BV_PAy`)csc4`F>uD`n@ zAe{0CNC1Q7OySw*jKMc2--87PHoX=QqmDGB3Q5aTQCtN~svvZ=%5rllD5ZvpiwZ`rN%Sa=97D0>b{}_WwfGhDo zhA~gpU?Y3s`?J_u@PPe>K+|`c#)io;XrQrunR_H{w8(5b`w&D)!wG$J?}N7KniXJ0 z+frUNHKg^cWP5X;dhqU9fI!pqXpbM5`YUKw^TYg&$y)60-1KpnK zgXfR*bf4%x-YOo6$94OMS`LcXAC5t|$uD++>4DB41b;4xYSQrZ!07Q4FAj$h1sYp8 zcV{D>T_sBE%o;UI$%5K+DM}Uvt8|}jm(CH4DWR=|(nAuDD-sGWf&GFZKCNvfVyyD= zJ0OYj86@vAPYMAFlO@&Tjdx4O_dW0&(n{VKJ2q2PexveQ<;Rg*FWqjOE(*+)SKPRC z?b3%szq8JiRo@Ctm(|XcZoTp9wO2pthTu~9UWoV<*JIe{Fgsh2FT`|i^0n6=@;^5n zGy@Qef=bhT(^C)h2Hj%}skk17rop7c?6l|5g3<~i|F$YvWBsfuSkB$|mIpb<{arlf zeLOs#;Dlg_Yr^Y*CllpDu-Z8xI`Dao5Io>m1Q^q~dKoZd28-5%5zD_3Bi0c7%Q_-h z$}MjY8|x3S=MSFa8K4}w9x?i2e`s(4uz`;#Ohi}m{h0l!dd%PNKi zckZZ~4O>vuc2lyacXz>yivxeu*EJ+V%Mo8e4G=5EQ<2=>NeCW8rwxHX`sEAym%VWT z_01b%?zkeh5MzqiC&!bQA)x33HxeE;2in9)JUldv5`f~tC;m0b1Px;hx>lz_f|ab+ zJi0~W@ZhLHvq4m^L1PsqzuCHnsvA17H1Y^uHEIuIW)yJN^&d>@SV7~g;r;9I3CcDw zVwl`tf|Z1n(RZT+bnn7W5MZOGB)bA8t|~cHqJ-B zhVzl9B2!!gLs2E?2Qe0u<1kI1RCNZ>=kDB&J~!kiyj5K#)`>uQmxcS2ornAv+l5Y_ z^NT%L`$a(LJmmP&B6JlxzjQmG_RA8XtIBZ)mJCEzq(pc~mL!OtNJGhJDvo)#Bw;)z zQ{$DS{uohIFpp$7&yw`8b`3~E>M=Qt$t#$g!Q^#Fv@!|)d?l>LB2rj|lR?;^aQ-C` zj84wOe})<}1W%o5g5hc9QwSpcl9{tJcGpV%uy1G@RQ*2Y+F_{hn{K;mj0wHm56^t-RehUV3}q6xXyS zfu5a!BCgRKbm;_OMYCDk!_F2$>$$zlPROLI@~gcEr?~bF%2Kw9EFGpSZL@{Yc&=fC zzq*IZe%8{Do)w^WZs!Jj-nK%|^8(b8u(5n`8)5t&R(LZ!gFgXrf>P_l^W#zAGClXf z7_!Ev5-}B#s(Fl@AJ4PtmV)qS5{~V!z((uSPl)K@5FQh0Jkm7?%23}k7jb0VOu7v8 zG>1Q^@81qG2o>lOD<>eCwX!VxE#v%_5x!---!VJBV|LHm8FteXd)&e{-+cK|rEjXz n_k_XH?3)~8bC0DTST}!TbN}kpd#7$5d0^WQ literal 6833 zcmdU!TTC3+8Gz4TXLfd%HMa$WWe*MnHf9%CgE0i`I=gmaJEjQ(byFja_A)G6FT3Q- zEDqb3qz{dow2DO%*-9eCmGXe4wnBY!A1c|!4}G+YQPK%j)b^qBQ*p2qiKqVmnZ1C$ zu*OwYbyl3~KmWOZ|38Oyw_70a41V*kDQ_Ji|G-S)vE>u%*C6pRk%(kEOQ;c>sEOsx z)C{rttmTZATF=<1O~iW3S^F7|ax85+%Tq@qIY=aH6On8a=22+p@u$yXlu+k{OR^9C zkdR0$aJQ7Ms$ou}CWAGwB|x#FJEsX|k%t zGg&RRS(PK7S-)w*u}u&h4~)e`EfZE~!Q2Ps_^_NsvP!lAGfUf{^+KAL99^bjPLQ0K zmRwDw&v?0FOl@UatK5RSatqX6M)U5*QB}E>!Z<1SAGhY&MQgmYhdL5YU$yQYQ&jDI z;+&Q}k<~H}oG|Y>*K{PA4ycMIPixr?H_K5a636U=RoK-xOuCCIT9&5e*+?p@WvSR5>{ri78{b23D2;EpcTsl1*twg{oT>ZK^x}3RT|B!XA@pCZmnS?DB3XuQrgdwCCT2%+2f~U5xC7lvf-0&iCexZiaW@Z&Y|qx8G|I7G*wsN1Tio!5GSf3z zeD6}CJ6{fs=@!U7_ywGou^Uc`ds(+gHrd=OvB8*iV0-qOs7q2pmG_ ztZv0#>O7ntcEEH#qN)njPN#K`k&+|nIT=BH!=YPMB{iu#WH?OhAnClEKXy7N8;6qB z8YftHO=n^k<^1ucxDRzpG&`wVU`15qx>$JhH@c=W(@NJxWj2|fRg`ozNiRYeiA{Bl zDl-{1sby$VQM(ebp4n(1mYFUqdnw_ELHTiTT^jHvFf*sOV?s`6!YggETrc7?`)*Xza|h*%?GKUr`}IrwtU6A zK5+lseb?K6v-UneT+I3x)cgG5Vm7d7xzG2m*@>h6OTP6U-+Fy~A-veS!uNky3woiU z4W=79v7tT(9sN~HsMd1Z>YT;r9j{?LAt`r00+klym z$ptu)#)GJoECXhStWqghV9DNPmN=uPi%9(IrnGGrKy{4U3P9KuQ0<$5YJWzc3KINE z@Wb40^5^Y40+-tcrcN*bMTl=B<9Z4i_fyDJZIQ8-g|bK5BUKmH0H7@FfZX|tQ2w-{ z#1|DMev=1#P3|3`i0jD=jVW;;ZeQ?4gYke<@BmZ*CX{tT80;4P$HbXTDjAy_h7&Ii zA2Uj0QyEoB2Lxqu5=fbxRj{^$y@qf37{_1S40Zp}?%|%pz1;%EBa!s96GG8~Z+HyU zgLzRHWgh$*&JRWJ85|g=nv5&5(TY!?9w?>gEaz(ce%61#fg>cBZpHRD+Ehnj>GXRZ>!hg(}K!uE*&OcIV33BbXnGE=6((?GZ(b@8hWm5pBWXAO?sb z+J*sI1^PS&Fq=e0hpAu`C^`Z9AmcEbs3Nz4U&q4_I-ZnlS7~SuY^|jb}ZDc?DelU9{Ak#iR&|je^USe?d?NchfR>L z>R;Rc6n5amt;6^Emy32_=w|Fb|57n)n1!$Sy2p(|HM9pEK%?O8gU)Rj?XFaZHdxy$ z?jsxO>oC^$-Qh0FwU+RH+wWV#ZtixIH#Fq9-OXeAC=bPdBs@qg4GCeveaFs0`5m_q zZg$>j;Gp)-ej(iHxYNOb%$+VFJj`uzyK$wNRjGTzs({5Xziuj67RXt_e(c5}xBU+t za=Qe-63a2SQ~ntp^sSC{c_*IV;(V8vacq$(@1Wz?vTKtqusI zLh*)$`XNZ(LVyRbbQ7@+2%94LIGT@Rv=;<*yU~k0jQ!YL4?vf;`q&0e8yHTRD{MegHoC31m0{(RVy~duKR^H=Yy=SA zt!@2tFud$O3GAz@x*P`f)ihk)H^1+**!3d|y(`t-tKP<|7w0d2IDN&kTHktITB-M~ z)-_*!b^g_lMu0co7l1=GJ;0!L|j<*;}17Wlhoi(or`3%ucz#q746^MGz>3^`ZR z106xmK^rUXlN;(FjNq3op#jTpTS6VS+a7PIfxF$tW7^L{@lt~jYI82Na8R<;A%u=N zmVz9X4+vpy8;-4k2mFM03|9&NryMhbZ*Shju?qQTU$>mEHCp>J$Cm8Z`|j9RGB00Cpf~iuDFjEIns_C@iLCs7)Kxt9BH>K z*@RH7bIHR&?UGjrwK|qW4$IqxP#<>#tP5PB$mz(mBFnlU%hQ>77F=0bcggab*+?p1 zHE1;g2vQ zKsiidfL|}_8vw4Kk-yo5%g2^&O@D#-_(I)1TL4mbo%cUj>ngF#AUm$j8L33q&&1Lm9ukldvaB!2V-E?wqS*3+eLhZvN zY62hHmO)Y3DBxyF^o??hRCfM5FXe-8&-HAeAnYtHAUjVRXufzD^sNe$1N?k=PP zc#~$Vx)nu)SO#tk-+~PhW%qfs8Q*SVZ{raR!C^F3`6O20@*5ptpH%#ivCodtQK-O+ z8}&2!Fzd diff --git a/main.py b/main.py index 2ee1927..267d2e9 100755 --- a/main.py +++ b/main.py @@ -4,72 +4,119 @@ wg_mtu_auto.py — Auto-detect egress IF, optionally probe Path MTU to one or mo compute the correct WireGuard MTU, and apply it. Examples: - sudo ./wg_mtu_auto.py - sudo ./wg_mtu_auto.py --force-egress-mtu 1452 - sudo ./wg_mtu_auto.py --pmtu-target 46.4.224.77 --pmtu-target 2a01:4f8:2201:4695::2 - sudo ./wg_mtu_auto.py --pmtu-target 46.4.224.77,2a01:4f8:2201:4695::2 --pmtu-policy min - ./wg_mtu_auto.py --dry-run + sudo ./main.py + sudo ./main.py --force-egress-mtu 1452 + sudo ./main.py --pmtu-target 46.4.224.77 --pmtu-target 2a01:4f8:2201:4695::2 + sudo ./main.py --pmtu-target 46.4.224.77,2a01:4f8:2201:4695::2 --pmtu-policy min + sudo ./main.py --prefer-wg-egress --auto-pmtu-from-wg + ./main.py --dry-run """ -import argparse, os, re, subprocess, sys, pathlib, ipaddress, statistics +import argparse +import ipaddress +import os +import pathlib +import re +import statistics +import subprocess +import sys + + +# ----------------- helpers ----------------- def run(cmd): # -> str - return subprocess.run(cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True).stdout.strip() + return subprocess.run( + cmd, check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True + ).stdout.strip() + def rc(cmd): # -> int return subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode + def exists_iface(iface): # -> bool return pathlib.Path(f"/sys/class/net/{iface}").exists() -def get_default_ifaces(): # -> list[str] - devs = [] - for cmd in (["ip","-4","route","show","default"], ["ip","-6","route","show","default"]): - out = run(cmd) - for line in out.splitlines(): - m = re.search(r"\bdev\s+(\S+)", line) - if m: devs.append(m.group(1)) - if not devs: - for cmd in (["ip","route","get","1.1.1.1"], ["ip","-6","route","get","2606:4700:4700::1111"]): - out = run(cmd) - m = re.search(r"\bdev\s+(\S+)", out) - if m: devs.append(m.group(1)) - uniq = [] - for d in devs: - if not d or d == "lo" or re.match(r"^(wg|tun)\d*$", d) or not exists_iface(d): continue - if d not in uniq: uniq.append(d) - return uniq def read_mtu(iface): # -> int - with open(f"/sys/class/net/{iface}/mtu","r") as f: + with open(f"/sys/class/net/{iface}/mtu", "r") as f: return int(f.read().strip()) + def set_mtu(iface, mtu, dry): if dry: print(f"[wg-mtu] DRY-RUN: ip link set mtu {mtu} dev {iface}") else: - subprocess.run(["ip","link","set","mtu",str(mtu),"dev",iface], check=True) + subprocess.run(["ip", "link", "set", "mtu", str(mtu), "dev", iface], check=True) + def require_root(dry): if not dry and os.geteuid() != 0: print("[wg-mtu][ERROR] Please run as root (sudo) or use --dry-run.", file=sys.stderr) sys.exit(1) + def is_ipv6(addr): # -> bool try: return isinstance(ipaddress.ip_address(addr), ipaddress.IPv6Address) except ValueError: + # best-effort for hostnames (contains ':') return ":" in addr + +# ----------------- route & iface selection ----------------- + +def default_route_lines(): + lines = [] + for cmd in (["ip", "-4", "route", "show", "default"], ["ip", "-6", "route", "show", "default"]): + out = run(cmd) + if out: + lines.extend(out.splitlines()) + return lines + + +def get_default_ifaces(ignore_vpn=True): # -> list[str] + devs = [] + for line in default_route_lines(): + m = re.search(r"\bdev\s+(\S+)", line) + if m: + devs.append(m.group(1)) + # fallback via route get + if not devs: + for cmd in (["ip", "route", "get", "1.1.1.1"], ["ip", "-6", "route", "get", "2606:4700:4700::1111"]): + out = run(cmd) + m = re.search(r"\bdev\s+(\S+)", out) + if m: + devs.append(m.group(1)) + uniq = [] + for d in devs: + if not d or d == "lo" or not exists_iface(d): + continue + if ignore_vpn and re.match(r"^(wg|tun)\d*$", d): + continue + if d not in uniq: + uniq.append(d) + return uniq + + +def wg_default_is_active(wg_if: str) -> bool: + # check if any default route is via wg_if + return any(re.search(rf"\bdev\s+{re.escape(wg_if)}\b", line) for line in default_route_lines()) + + +# ----------------- PMTU probing ----------------- + def ping_ok(payload, target, timeout_s): # -> bool - base = ["ping","-M","do","-c","1","-s",str(payload),"-W",str(max(1, int(round(timeout_s))))] + base = ["ping", "-M", "do", "-c", "1", "-s", str(payload), "-W", str(max(1, int(round(timeout_s))))] if is_ipv6(target): base.insert(1, "-6") return rc(base + [target]) == 0 + def probe_pmtu(target, lo_payload=1200, hi_payload=1472, timeout=1.0): # -> int|None - """Binary-search the largest payload that passes with DF. Return Path-MTU (payload + hdr) or None.""" + """Binary-search the largest payload that passes with DF. Return Path MTU (payload + hdr) or None. + Header: +28 (IPv4), +48 (IPv6).""" hdr = 48 if is_ipv6(target) else 28 - # ensure the lower bound works; if not, try slightly smaller floors + # ensure lower bound works; if not, try smaller floors if not ping_ok(lo_payload, target, timeout): for p in (1180, 1160, 1140): if ping_ok(p, target, timeout): @@ -87,8 +134,8 @@ def probe_pmtu(target, lo_payload=1200, hi_payload=1472, timeout=1.0): # -> int hi = mid - 1 return (best + hdr) if best is not None else None + def choose_effective(pmtus, policy="min"): # -> int - """Pick an effective PMTU from a list of successful PMTUs.""" if not pmtus: raise ValueError("no PMTUs to choose from") if policy == "min": @@ -99,35 +146,103 @@ def choose_effective(pmtus, policy="min"): # -> int return int(statistics.median(sorted(pmtus))) raise ValueError(f"unknown policy {policy}") + +# ----------------- WireGuard helpers (opt-in) ----------------- + +def wg_is_active(wg_if: str) -> bool: + if not exists_iface(wg_if): + return False + return rc(["wg", "show", wg_if]) == 0 + + +def wg_peer_endpoints(wg_if: str) -> list[str]: + """Return list of peer endpoints (hostnames/IPs) – port stripped.""" + targets = [] + + # 1) Try: wg show endpoints + out = run(["wg", "show", wg_if, "endpoints"]) + for line in out.splitlines(): + # format: \t + parts = line.strip().split() + if len(parts) >= 2 and parts[-1] != "(none)": + ep = parts[-1] # host:port + host = ep.rsplit(":", 1)[0] + # IPv6 endpoint may be like [2001:db8::1]:51820 + host = host.strip("[]") + targets.append(host) + + # 2) Fallback: wg showconf (root may be required) + if not targets: + conf = run(["wg", "showconf", wg_if]) + if conf: + for m in re.finditer(r"^Endpoint\s*=\s*(.+)$", conf, flags=re.MULTILINE): + ep = m.group(1).strip() + host = ep.rsplit(":", 1)[0].strip("[]") + targets.append(host) + + # dedupe & sanity + cleaned = [] + for t in targets: + if t and t not in cleaned: + cleaned.append(t) + return cleaned + + +# ----------------- main ----------------- + def main(): ap = argparse.ArgumentParser(description="Compute/apply WireGuard MTU based on egress MTU and optional multi-target PMTU probing.") ap.add_argument("--egress-if", help="Explicit egress interface (auto-detected if omitted).") - ap.add_argument("--force-egress-mtu", type=int, help="Force this MTU on the egress interface before computing wg MTU.") - ap.add_argument("--wg-if", default=os.environ.get("WG_IF","wg0"), help="WireGuard interface name (default: wg0).") - ap.add_argument("--wg-overhead", type=int, default=int(os.environ.get("WG_OVERHEAD","80")), help="Bytes of WG overhead to subtract (default: 80).") - ap.add_argument("--wg-min", type=int, default=int(os.environ.get("WG_MIN","1280")), help="Minimum allowed WG MTU (default: 1280).") + ap.add_argument("--prefer-wg-egress", action="store_true", + help="Allow/consider wg* as egress and prefer it if default route uses wg (default: disabled).") + ap.add_argument("--auto-pmtu-from-wg", action="store_true", + help="Automatically add WireGuard peer endpoints as PMTU targets (default: disabled).") + ap.add_argument("--wg-if", default=os.environ.get("WG_IF", "wg0"), help="WireGuard interface name (default: wg0).") + ap.add_argument("--wg-overhead", type=int, default=int(os.environ.get("WG_OVERHEAD", "80")), help="Bytes of WG overhead to subtract (default: 80).") + ap.add_argument("--wg-min", type=int, default=int(os.environ.get("WG_MIN", "1280")), help="Minimum allowed WG MTU (default: 1280).") # PMTU (multi-target) ap.add_argument("--pmtu-target", action="append", help="Target hostname/IP to probe PMTU. Can be given multiple times OR comma-separated.") ap.add_argument("--pmtu-timeout", type=float, default=1.0, help="Timeout (seconds) per ping probe (default: 1.0).") ap.add_argument("--pmtu-min-payload", type=int, default=1200, help="Lower bound payload for PMTU search (default: 1200).") ap.add_argument("--pmtu-max-payload", type=int, default=1472, help="Upper bound payload for PMTU search (default: 1472 ~ 1500-28).") - ap.add_argument("--pmtu-policy", choices=["min","median","max"], default="min", + ap.add_argument("--pmtu-policy", choices=["min", "median", "max"], default="min", help="How to choose effective PMTU across multiple targets (default: min).") ap.add_argument("--dry-run", action="store_true", help="Show actions without applying changes.") + # NEW: force a specific WireGuard MTU (overrides computed value) + ap.add_argument("--set-wg-mtu", type=int, help="Force a specific MTU to apply on the WireGuard interface (overrides computed value).") + # (legacy / optional) force egress MTU + ap.add_argument("--force-egress-mtu", type=int, help="Force this MTU on the egress interface before computing wg MTU.") args = ap.parse_args() require_root(args.dry_run) - # Detect egress - egress = args.egress_if or (get_default_ifaces()[0] if get_default_ifaces() else None) + # Egress detection (wg ignored by default) + if args.egress_if: + egress = args.egress_if + else: + ignore_vpn = not args.prefer_wg_egress + cands = get_default_ifaces(ignore_vpn=ignore_vpn) + # If we allow wg and default route is via wg-if, prefer it first + if args.prefer_wg_egress and wg_is_active(args.wg_if) and wg_default_is_active(args.wg_if): + if args.wg_if in cands: + cands.remove(args.wg_if) + cands.insert(0, args.wg_if) + egress = cands[0] if cands else None + if not egress: print("[wg-mtu][ERROR] Could not detect egress interface (use --egress-if).", file=sys.stderr) sys.exit(2) if not exists_iface(egress): - print(f"[wg-mtu][ERROR] Interface {egress} does not exist.", file=sys.stderr); sys.exit(3) + print(f"[wg-mtu][ERROR] Interface {egress} does not exist.", file=sys.stderr) + sys.exit(3) print(f"[wg-mtu] Detected egress interface: {egress}") # Egress MTU + if args.prefer_wg_egress and egress == args.wg_if: + print(f"[wg-mtu] Using WireGuard interface {args.wg_if} as egress basis.") + if args.wg_if == egress and not wg_is_active(args.wg_if): + print(f"[wg-mtu][WARN] {args.wg_if} selected as egress but WireGuard is not active.", file=sys.stderr) + if args.force_egress_mtu: print(f"[wg-mtu] Forcing egress MTU {args.force_egress_mtu} on {egress}") set_mtu(egress, args.force_egress_mtu, args.dry_run) @@ -136,24 +251,37 @@ def main(): base_mtu = read_mtu(egress) print(f"[wg-mtu] Egress base MTU: {base_mtu}") - # PMTU over multiple targets - effective_mtu = base_mtu + # Build PMTU target list pmtu_targets = [] if args.pmtu_target: - # flatten comma-separated + repeated flags for item in args.pmtu_target: pmtu_targets.extend([x.strip() for x in item.split(",") if x.strip()]) + if args.auto_pmtu_from_wg: + if wg_is_active(args.wg_if): + wg_targets = wg_peer_endpoints(args.wg_if) + if wg_targets: + print(f"[wg-mtu] Auto-added WG peer endpoints as PMTU targets: {', '.join(wg_targets)}") + pmtu_targets.extend(wg_targets) + else: + print("[wg-mtu] INFO: No WG peer endpoints discovered (wg show/showconf).") + else: + print(f"[wg-mtu] INFO: {args.wg_if} is not active; skipping auto PMTU targets from WG.") + + # Deduplicate PMTU targets + if pmtu_targets: + pmtu_targets = list(dict.fromkeys(pmtu_targets)) + + # PMTU probing + effective_mtu = base_mtu if pmtu_targets: - results = {} good = [] print(f"[wg-mtu] Probing Path MTU for: {', '.join(pmtu_targets)} (policy={args.pmtu_policy})") for t in pmtu_targets: p = probe_pmtu(t, args.pmtu_min_payload, args.pmtu_max_payload, args.pmtu_timeout) - results[t] = p + print(f"[wg-mtu] - {t}: {p if p else 'probe failed'}") if p: good.append(p) - print(f"[wg-mtu] - {t}: {'%s' % p if p else 'probe failed'}") if good: chosen = choose_effective(good, args.pmtu_policy) print(f"[wg-mtu] Selected Path MTU (policy={args.pmtu_policy}): {chosen}") @@ -165,6 +293,14 @@ def main(): wg_mtu = max(args.wg_min, effective_mtu - args.wg_overhead) print(f"[wg-mtu] Computed {args.wg_if} MTU: {wg_mtu} (overhead={args.wg_overhead}, min={args.wg_min})") + # --- NEW: override with --set-wg-mtu if provided + if args.set_wg_mtu is not None: + if args.set_wg_mtu < args.wg_min: + print(f"[wg-mtu][WARN] --set-wg-mtu {args.set_wg_mtu} is below wg-min {args.wg_min}; clamping to {args.wg_min}.") + args.set_wg_mtu = args.wg_min + wg_mtu = args.set_wg_mtu + print(f"[wg-mtu] Forcing WireGuard MTU (override): {wg_mtu}") + # Apply if exists_iface(args.wg_if): set_mtu(args.wg_if, wg_mtu, args.dry_run) @@ -174,5 +310,6 @@ def main(): print(f"[wg-mtu] Done. Summary: egress={egress} mtu={base_mtu}, effective_mtu={effective_mtu}, {args.wg_if}_mtu={wg_mtu}") + if __name__ == "__main__": main() diff --git a/test.py b/test.py index 1e2a657..e2d0079 100644 --- a/test.py +++ b/test.py @@ -1,14 +1,15 @@ import io import sys import unittest -from unittest.mock import patch, call +from unittest.mock import patch from contextlib import redirect_stdout # Import the script as a module import main as automtu -class TestWgMtuAuto(unittest.TestCase): +class TestWgMtuAutoExtended(unittest.TestCase): + # ---------- Baseline behavior (unchanged) ---------- @patch("main.set_mtu") @patch("main.read_mtu", return_value=1500) @@ -16,12 +17,8 @@ class TestWgMtuAuto(unittest.TestCase): @patch("main.get_default_ifaces", return_value=["eth0"]) @patch("main.require_root", return_value=None) def test_no_pmtu_uses_egress_minus_overhead( - self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu + self, _req_root, mock_get_def, _exists, _read_mtu, mock_set_mtu ): - """ - Without PMTU probing, wg MTU should be base_mtu - overhead (clamped by min). - With base=1500, overhead=80 ⇒ wg_mtu=1420. - """ argv = ["main.py", "--dry-run"] with patch.object(sys, "argv", argv): buf = io.StringIO() @@ -32,74 +29,100 @@ class TestWgMtuAuto(unittest.TestCase): self.assertIn("Detected egress interface: eth0", out) self.assertIn("Egress base MTU: 1500", out) self.assertIn("Computed wg0 MTU: 1420", out) - - # dry-run still calls set_mtu (but prints DRY-RUN); ensure it targeted wg0 with 1420 mock_set_mtu.assert_any_call("wg0", 1420, True) + # get_default_ifaces should be called with ignore_vpn=True by default + mock_get_def.assert_called_with(ignore_vpn=True) + # ---------- prefer-wg-egress selection ---------- + + @patch("main.wg_default_is_active", return_value=True) + @patch("main.wg_is_active", return_value=True) @patch("main.set_mtu") + @patch("main.read_mtu", return_value=1420) @patch("main.exists_iface", return_value=True) - @patch("main.get_default_ifaces", return_value=["eth0"]) + @patch("main.get_default_ifaces", return_value=["eth0", "wg0"]) @patch("main.require_root", return_value=None) - def test_force_egress_mtu_and_pmtu_multiple_targets_min_policy( - self, _req_root, _get_def, _exists, mock_set_mtu + def test_prefer_wg_egress_picks_wg0_when_default_route_via_wg( + self, _req_root, mock_get_def, _exists, _read_mtu, _set_mtu, _wg_is_active, _wg_def_active ): - """ - base_mtu forced=1452; PMTU results: 1452, 1420 -> policy=min => 1420 chosen. - effective=min(1452,1420)=1420; wg_mtu=1420-80=1340 - """ - with patch("main.read_mtu", return_value=9999): # should be ignored because we force - with patch("main.probe_pmtu", side_effect=[1452, 1420]): - argv = [ - "main.py", - "--dry-run", - "--force-egress-mtu", "1452", - "--pmtu-target", "t1", - "--pmtu-target", "t2", - "--pmtu-policy", "min", - ] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() + argv = ["main.py", "--dry-run", "--prefer-wg-egress", "--wg-if", "wg0"] + with patch.object(sys, "argv", argv): + buf = io.StringIO() + with redirect_stdout(buf): + automtu.main() out = buf.getvalue() - self.assertIn("Forcing egress MTU 1452 on eth0", out) - self.assertIn("Probing Path MTU for: t1, t2 (policy=min)", out) - self.assertIn("Selected Path MTU (policy=min): 1420", out) + # When prefer-wg is set AND wg default route is active, wg0 should be chosen as egress + self.assertIn("Detected egress interface: wg0", out) + self.assertIn("Using WireGuard interface wg0 as egress basis.", out) + # Computed MTU: base 1420 - 80 = 1340 (clamped by min=1280) self.assertIn("Computed wg0 MTU: 1340", out) - mock_set_mtu.assert_any_call("wg0", 1340, True) + # get_default_ifaces should be called with ignore_vpn=False (because prefer-wg) + mock_get_def.assert_called_with(ignore_vpn=False) + # ---------- auto-pmtu-from-wg adds peer endpoints ---------- + + @patch("main.wg_peer_endpoints", return_value=["46.4.224.77", "2a01:db8::1"]) + @patch("main.wg_is_active", return_value=True) + @patch("main.probe_pmtu", side_effect=[1452, 1420]) # results for two peers @patch("main.set_mtu") @patch("main.read_mtu", return_value=1500) @patch("main.exists_iface", return_value=True) @patch("main.get_default_ifaces", return_value=["eth0"]) @patch("main.require_root", return_value=None) - def test_pmtu_policy_median( - self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu + def test_auto_pmtu_from_wg_adds_targets_and_uses_min_policy( + self, _req_root, _get_def, _exists, _read_mtu, _set_mtu, _probe_pmtu, _wg_active, _wg_peers ): - """ - base=1500; PMTUs: 1500, 1452, 1472 -> median=1472. - effective=min(1500,1472)=1472; wg_mtu=1472-80=1392 - """ - with patch("main.probe_pmtu", side_effect=[1500, 1452, 1472]): - argv = [ - "main.py", - "--dry-run", - "--pmtu-target", "a", - "--pmtu-target", "b", - "--pmtu-target", "c", - "--pmtu-policy", "median", - ] - with patch.object(sys, "argv", argv): - buf = io.StringIO() - with redirect_stdout(buf): - automtu.main() + argv = ["main.py", "--dry-run", "--auto-pmtu-from-wg", "--wg-if", "wg0"] + with patch.object(sys, "argv", argv): + buf = io.StringIO() + with redirect_stdout(buf): + automtu.main() out = buf.getvalue() - self.assertIn("Probing Path MTU for: a, b, c (policy=median)", out) + # Confirm WG peers were added + self.assertIn("Auto-added WG peer endpoints as PMTU targets: 46.4.224.77, 2a01:db8::1", out) + # The policy default is 'min', so chosen PMTU should be 1420 + self.assertIn("Selected Path MTU (policy=min): 1420", out) + # Computed wg0 MTU: 1420 - 80 = 1340 + self.assertIn("Computed wg0 MTU: 1340", out) + # Ensure probe was called twice (for both peers) + self.assertEqual(_probe_pmtu.call_count, 2) + + # ---------- manual PMTU still works with prefer-wg-egress ---------- + + @patch("main.wg_default_is_active", return_value=True) + @patch("main.wg_is_active", return_value=True) + @patch("main.probe_pmtu", side_effect=[1472, 1452, 1500]) + @patch("main.set_mtu") + @patch("main.read_mtu", return_value=1500) + @patch("main.exists_iface", return_value=True) + @patch("main.get_default_ifaces", return_value=["eth0"]) + @patch("main.require_root", return_value=None) + def test_prefer_wg_egress_with_manual_targets_and_median_policy( + self, _req_root, _get_def, _exists, _read_mtu, _set_mtu, _probe_pmtu, _wg_is_active, _wg_def_active + ): + argv = [ + "main.py", "--dry-run", + "--prefer-wg-egress", "--wg-if", "wg0", + "--pmtu-target", "a", "--pmtu-target", "b", "--pmtu-target", "c", + "--pmtu-policy", "median" + ] + with patch.object(sys, "argv", argv): + buf = io.StringIO() + with redirect_stdout(buf): + automtu.main() + + out = buf.getvalue() + # As default route via wg is active, wg0 should be used + self.assertIn("Detected egress interface: wg0", out) + # PMTU values: 1472, 1452, 1500 -> median = 1472 self.assertIn("Selected Path MTU (policy=median): 1472", out) + # Computed WG MTU: 1472 - 80 = 1392 self.assertIn("Computed wg0 MTU: 1392", out) - mock_set_mtu.assert_any_call("wg0", 1392, True) + self.assertEqual(_probe_pmtu.call_count, 3) + + # ---------- PMTU all fail fallback ---------- @patch("main.set_mtu") @patch("main.read_mtu", return_value=1500) @@ -109,16 +132,8 @@ class TestWgMtuAuto(unittest.TestCase): def test_pmtu_all_fail_falls_back_to_base( self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu ): - """ - If all PMTU probes fail, fall back to base MTU (1500) => wg_mtu=1420. - """ with patch("main.probe_pmtu", side_effect=[None, None]): - argv = [ - "main.py", - "--dry-run", - "--pmtu-target", "bad1", - "--pmtu-target", "bad2", - ] + argv = ["main.py", "--dry-run", "--pmtu-target", "bad1", "--pmtu-target", "bad2"] with patch.object(sys, "argv", argv): buf = io.StringIO() with redirect_stdout(buf): @@ -126,9 +141,47 @@ class TestWgMtuAuto(unittest.TestCase): out = buf.getvalue() self.assertIn("WARNING: All PMTU probes failed. Falling back to egress MTU.", out) - self.assertIn("Computed wg0 MTU: 1420", out) + self.assertIn("Computed wg0 MTU: 1420", out) # 1500 - 80 mock_set_mtu.assert_any_call("wg0", 1420, True) + # ---------- NEW: --set-wg-mtu overrides computed ---------- + + @patch("main.set_mtu") + @patch("main.read_mtu", return_value=1500) + @patch("main.exists_iface", return_value=True) + @patch("main.get_default_ifaces", return_value=["eth0"]) + @patch("main.require_root", return_value=None) + def test_force_set_wg_mtu_overrides_computed( + self, _req_root, _get_def, _exists, _read_mtu, mock_set_mtu + ): + """ + --set-wg-mtu must override the computed value. + Base=1500 -> computed 1420 (1500-80), but we force 1300. + """ + argv = ["main.py", "--dry-run", "--set-wg-mtu", "1300"] + with patch.object(sys, "argv", argv): + buf = io.StringIO() + with redirect_stdout(buf): + automtu.main() + + out = buf.getvalue() + # Computation is printed first + self.assertIn("Computed wg0 MTU: 1420", out) + # Then override message appears and applied value is 1300 + self.assertIn("Forcing WireGuard MTU (override): 1300", out) + mock_set_mtu.assert_any_call("wg0", 1300, True) + + # also test clamping below wg-min + argv2 = ["main.py", "--dry-run", "--set-wg-mtu", "1200"] # below default wg_min=1280 + with patch.object(sys, "argv", argv2): + out2 = io.StringIO() + with redirect_stdout(out2): + automtu.main() + s = out2.getvalue() + self.assertIn("[wg-mtu][WARN] --set-wg-mtu 1200 is below wg-min 1280; clamping to 1280.", s) + self.assertIn("Forcing WireGuard MTU (override): 1280", s) + mock_set_mtu.assert_any_call("wg0", 1280, True) + if __name__ == "__main__": unittest.main(verbosity=2)