#!/usr/bin/env python3 import argparse import os import re import shutil import subprocess from pathlib import Path FSTAB_FILE = Path("/etc/fstab") BTRFS_SWAP_DIR = Path("/var/swap") BTRFS_SWAP_PATH = BTRFS_SWAP_DIR / "swapfile" DEFAULT_SWAP_PATH = Path("/swapfile") def run(cmd, check=True, capture=False): """Helper to run shell commands.""" kwargs = {"check": check} if capture: kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.PIPE kwargs["text"] = True return subprocess.run(cmd, **kwargs) def detect_root_fs() -> str: result = run(["findmnt", "-no", "FSTYPE", "/"], capture=True) return result.stdout.strip() def parse_size_to_mib(size: str) -> int: """Convert '64G' or '2048M' into MiB integer.""" match = re.match(r"^(\d+)([gGmM]?[bB]?)?$", size.strip()) if not match: raise ValueError(f"Invalid size format: {size}") num, unit = match.groups() num = int(num) unit = (unit or "M").lower() if unit in ("g", "gb"): return num * 1024 elif unit in ("m", "mb"): return num else: raise ValueError(f"Unsupported unit: {unit}") def get_swap_size(path: Path) -> int: """Return swapfile size in MiB if it exists, else 0.""" if not path.exists(): return 0 return path.stat().st_size // (1024 * 1024) def remove_swap(path: Path): """Disable and remove swapfile and its fstab entry.""" run(["swapoff", str(path)], check=False) if path.exists(): path.unlink() # remove old fstab line if FSTAB_FILE.exists(): text = FSTAB_FILE.read_text().splitlines() new_lines = [ line for line in text if not re.search(rf"^{re.escape(str(path))}\s+none\s+swap", line) ] FSTAB_FILE.write_text("\n".join(new_lines) + "\n") def create_swap(path: Path, size_mib: int, fs: str): if fs == "btrfs": print(f"Creating {size_mib} MiB swapfile on btrfs at {path}") path.parent.mkdir(parents=True, exist_ok=True) run(["chattr", "+C", str(path.parent)], check=False) run(["btrfs", "property", "set", "-ts", str(path.parent), "compression", "none"], check=False) run([ "dd", "if=/dev/zero", f"of={path}", "bs=1M", f"count={size_mib}", "status=progress" ]) else: print(f"Creating {size_mib} MiB swapfile on {fs} at {path}") run(["fallocate", "-l", f"{size_mib}M", str(path)]) run(["chmod", "600", str(path)]) run(["mkswap", str(path)]) run(["swapon", str(path)]) with FSTAB_FILE.open("a") as f: f.write(f"{path} none swap sw 0 0\n") def main(): parser = argparse.ArgumentParser(description="SwapForge Python edition") parser.add_argument("size", help="Swapfile size (e.g. 2048M or 64G)") args = parser.parse_args() fs = detect_root_fs() path = BTRFS_SWAP_PATH if fs == "btrfs" else DEFAULT_SWAP_PATH new_size = parse_size_to_mib(args.size) old_size = get_swap_size(path) if old_size == new_size and old_size > 0: print(f"Swapfile {path} already exists with correct size ({old_size} MiB). Skipping.") return if old_size > 0 and old_size != new_size: print(f"Existing swapfile {path} has size {old_size} MiB, expected {new_size} MiB. Recreating...") remove_swap(path) create_swap(path, new_size, fs) print("Done.") if __name__ == "__main__": main()