refactor(p2pkg): split monolith into atomic modules; add keep-legacy stubs; align tests with Strategy A discovery
Some checks failed
CI (tests + ruff) and stable tag / unittest (py3.10) (push) Has been cancelled
CI (tests + ruff) and stable tag / unittest (py3.11) (push) Has been cancelled
CI (tests + ruff) and stable tag / unittest (py3.12) (push) Has been cancelled
CI (tests + ruff) and stable tag / unittest (py3.13) (push) Has been cancelled
CI (tests + ruff) and stable tag / ruff (py3.12) (push) Has been cancelled
CI (tests + ruff) and stable tag / Tag stable (if version commit) (push) Has been cancelled
Some checks failed
CI (tests + ruff) and stable tag / unittest (py3.10) (push) Has been cancelled
CI (tests + ruff) and stable tag / unittest (py3.11) (push) Has been cancelled
CI (tests + ruff) and stable tag / unittest (py3.12) (push) Has been cancelled
CI (tests + ruff) and stable tag / unittest (py3.13) (push) Has been cancelled
CI (tests + ruff) and stable tag / ruff (py3.12) (push) Has been cancelled
CI (tests + ruff) and stable tag / Tag stable (if version commit) (push) Has been cancelled
- Move CLI + migration logic out of src/p2pkg/__main__.py into dedicated modules: apply.py, cli.py, discovery.py, gitutils.py, plans.py, templates.py - Add -k/--keep option to preserve legacy *.py files as forwarding stubs (default removes legacy) - Implement Strategy A recursive discovery: only migrate *.py inside a runnable outermost package (__init__.py + __main__.py) - Update tests to import new modules and build a Strategy A compatible package tree for recursive runs https://chatgpt.com/share/69468609-0584-800f-a3e0-9d58210fb0e8
This commit is contained in:
@@ -1,290 +1,9 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
from p2pkg.cli import main
|
||||||
import pathlib
|
from p2pkg.apply import migrate_one
|
||||||
import subprocess
|
|
||||||
from dataclasses import dataclass
|
|
||||||
|
|
||||||
|
|
||||||
INIT_TEMPLATE = """\
|
|
||||||
\"\"\"Compatibility wrapper.
|
|
||||||
|
|
||||||
This package was migrated from a flat module ({name}.py) to a package layout:
|
|
||||||
{name}/__main__.py contains the original implementation.
|
|
||||||
|
|
||||||
We re-export the public API so existing imports keep working.
|
|
||||||
\"\"\"
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from . import __main__ as _main
|
|
||||||
from .__main__ import * # noqa: F401,F403
|
|
||||||
|
|
||||||
# Prefer explicit __all__ if the original module defined it.
|
|
||||||
__all__ = getattr(_main, "__all__", [n for n in dir(_main) if not n.startswith("_")])
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
|
||||||
class MigrationPlan:
|
|
||||||
source: pathlib.Path
|
|
||||||
target_main: pathlib.Path
|
|
||||||
target_init: pathlib.Path
|
|
||||||
|
|
||||||
|
|
||||||
def _run(cmd: list[str], cwd: pathlib.Path | None = None) -> None:
|
|
||||||
subprocess.run(cmd, check=True, cwd=str(cwd) if cwd else None)
|
|
||||||
|
|
||||||
|
|
||||||
def _have_git_repo(root: pathlib.Path) -> bool:
|
|
||||||
try:
|
|
||||||
subprocess.run(
|
|
||||||
["git", "rev-parse", "--is-inside-work-tree"],
|
|
||||||
cwd=str(root),
|
|
||||||
stdout=subprocess.DEVNULL,
|
|
||||||
stderr=subprocess.DEVNULL,
|
|
||||||
check=True,
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _git_mv(src: pathlib.Path, dst: pathlib.Path, cwd: pathlib.Path) -> None:
|
|
||||||
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
_run(["git", "mv", str(src), str(dst)], cwd=cwd)
|
|
||||||
|
|
||||||
|
|
||||||
def _fs_mv(src: pathlib.Path, dst: pathlib.Path) -> None:
|
|
||||||
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
src.rename(dst)
|
|
||||||
|
|
||||||
|
|
||||||
def _is_candidate_py_file(path: pathlib.Path) -> bool:
|
|
||||||
"""
|
|
||||||
Decide if a file should be migrated.
|
|
||||||
|
|
||||||
We skip:
|
|
||||||
- non-.py files
|
|
||||||
- __init__.py and __main__.py
|
|
||||||
- files that are inside a package which already has a __main__.py
|
|
||||||
(i.e. that package is already runnable via `python -m package`)
|
|
||||||
"""
|
|
||||||
if path.suffix != ".py":
|
|
||||||
return False
|
|
||||||
if path.name in {"__init__.py", "__main__.py"}:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Find the nearest (innermost) package dir this file belongs to, if any.
|
|
||||||
pkg_dir: pathlib.Path | None = None
|
|
||||||
for parent in [path.parent, *path.parents]:
|
|
||||||
if (parent / "__init__.py").exists():
|
|
||||||
pkg_dir = parent
|
|
||||||
break
|
|
||||||
|
|
||||||
# If file is inside a package and that package already has __main__.py -> skip
|
|
||||||
if pkg_dir is not None and (pkg_dir / "__main__.py").exists():
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _build_plan(py_file: pathlib.Path) -> MigrationPlan:
|
|
||||||
py_file = py_file.resolve()
|
|
||||||
name = py_file.stem
|
|
||||||
pkg_dir = py_file.parent / name
|
|
||||||
target_main = pkg_dir / "__main__.py"
|
|
||||||
target_init = pkg_dir / "__init__.py"
|
|
||||||
return MigrationPlan(source=py_file, target_main=target_main, target_init=target_init)
|
|
||||||
|
|
||||||
|
|
||||||
def _discover_py_files_recursive(root: pathlib.Path) -> list[pathlib.Path]:
|
|
||||||
root = root.resolve()
|
|
||||||
files: list[pathlib.Path] = []
|
|
||||||
for p in root.rglob("*.py"):
|
|
||||||
if p.is_file() and _is_candidate_py_file(p):
|
|
||||||
files.append(p.resolve())
|
|
||||||
return sorted(set(files))
|
|
||||||
|
|
||||||
|
|
||||||
def _rel(path: pathlib.Path, repo_root: pathlib.Path) -> str:
|
|
||||||
try:
|
|
||||||
return str(path.relative_to(repo_root))
|
|
||||||
except Exception:
|
|
||||||
return str(path)
|
|
||||||
|
|
||||||
|
|
||||||
def _print_plan(plans: list[MigrationPlan], repo_root: pathlib.Path) -> None:
|
|
||||||
if not plans:
|
|
||||||
print("No candidates found.")
|
|
||||||
return
|
|
||||||
|
|
||||||
print("Planned migrations:")
|
|
||||||
for plan in plans:
|
|
||||||
print(f" - {_rel(plan.source, repo_root)} -> {_rel(plan.target_main, repo_root)}")
|
|
||||||
|
|
||||||
|
|
||||||
def _confirm_apply() -> bool:
|
|
||||||
"""
|
|
||||||
Ask user for confirmation. Default is NO.
|
|
||||||
Accepts: y, yes (case-insensitive).
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
ans = input("Apply these changes? [y/N] ").strip().lower()
|
|
||||||
except EOFError:
|
|
||||||
return False
|
|
||||||
return ans in {"y", "yes"}
|
|
||||||
|
|
||||||
|
|
||||||
def migrate_one(py_file: pathlib.Path, use_git: bool, repo_root: pathlib.Path) -> None:
|
|
||||||
"""Migrate a single flat module file (foo.py) into a package (foo/__main__.py)."""
|
|
||||||
plan = _build_plan(py_file)
|
|
||||||
|
|
||||||
if plan.source.suffix != ".py":
|
|
||||||
raise ValueError(f"Not a .py file: {plan.source}")
|
|
||||||
|
|
||||||
if plan.target_main.exists():
|
|
||||||
raise RuntimeError(f"Refusing to overwrite existing: {plan.target_main}")
|
|
||||||
|
|
||||||
if plan.target_main.parent.exists() and not plan.target_main.parent.is_dir():
|
|
||||||
raise RuntimeError(f"Target exists but is not a directory: {plan.target_main.parent}")
|
|
||||||
|
|
||||||
if use_git:
|
|
||||||
_git_mv(plan.source, plan.target_main, cwd=repo_root)
|
|
||||||
else:
|
|
||||||
_fs_mv(plan.source, plan.target_main)
|
|
||||||
|
|
||||||
if not plan.target_init.exists():
|
|
||||||
plan.target_init.write_text(INIT_TEMPLATE.format(name=plan.source.stem), encoding="utf-8")
|
|
||||||
|
|
||||||
|
|
||||||
def _apply_plans(plans: list[MigrationPlan], use_git: bool, repo_root: pathlib.Path) -> None:
|
|
||||||
# Preflight checks: fail fast before doing partial work
|
|
||||||
for plan in plans:
|
|
||||||
if plan.target_main.exists():
|
|
||||||
raise RuntimeError(f"Refusing to overwrite existing: {plan.target_main}")
|
|
||||||
if plan.target_main.parent.exists() and not plan.target_main.parent.is_dir():
|
|
||||||
raise RuntimeError(f"Target exists but is not a directory: {plan.target_main.parent}")
|
|
||||||
|
|
||||||
for plan in plans:
|
|
||||||
if use_git:
|
|
||||||
_git_mv(plan.source, plan.target_main, cwd=repo_root)
|
|
||||||
else:
|
|
||||||
_fs_mv(plan.source, plan.target_main)
|
|
||||||
|
|
||||||
if not plan.target_init.exists():
|
|
||||||
plan.target_init.write_text(INIT_TEMPLATE.format(name=plan.source.stem), encoding="utf-8")
|
|
||||||
|
|
||||||
|
|
||||||
def main(argv: list[str] | None = None) -> int:
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
prog="p2pkg",
|
|
||||||
description="Migrate foo.py -> foo/__main__.py and generate foo/__init__.py that re-exports public API.",
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.add_argument(
|
|
||||||
"paths",
|
|
||||||
nargs="+",
|
|
||||||
help="Python module files OR directories (with -R) to migrate (e.g. roles_list.py src/).",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-R",
|
|
||||||
"--recursive",
|
|
||||||
action="store_true",
|
|
||||||
help="Treat given paths as directories and discover *.py recursively.",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-p",
|
|
||||||
"--preview",
|
|
||||||
action="store_true",
|
|
||||||
help="Preview only (do not change anything).",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-f",
|
|
||||||
"--force",
|
|
||||||
action="store_true",
|
|
||||||
help="Apply without asking for confirmation.",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--no-git",
|
|
||||||
action="store_true",
|
|
||||||
help="Do not use `git mv` even if inside a git repo.",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--repo-root",
|
|
||||||
default=".",
|
|
||||||
help="Repository root used for git operations (default: current working directory).",
|
|
||||||
)
|
|
||||||
|
|
||||||
ns = parser.parse_args(argv)
|
|
||||||
repo_root = pathlib.Path(ns.repo_root).resolve()
|
|
||||||
use_git = (not ns.no_git) and _have_git_repo(repo_root)
|
|
||||||
|
|
||||||
if ns.recursive:
|
|
||||||
dirs: list[pathlib.Path] = []
|
|
||||||
for raw in ns.paths:
|
|
||||||
p = pathlib.Path(raw)
|
|
||||||
if not p.is_absolute():
|
|
||||||
p = (repo_root / p).resolve()
|
|
||||||
dirs.append(p)
|
|
||||||
|
|
||||||
for d in dirs:
|
|
||||||
if not d.exists():
|
|
||||||
raise FileNotFoundError(str(d))
|
|
||||||
if not d.is_dir():
|
|
||||||
raise NotADirectoryError(str(d))
|
|
||||||
|
|
||||||
candidates: list[pathlib.Path] = []
|
|
||||||
for d in dirs:
|
|
||||||
candidates.extend(_discover_py_files_recursive(d))
|
|
||||||
|
|
||||||
candidates = sorted(set(candidates))
|
|
||||||
plans = [_build_plan(p) for p in candidates]
|
|
||||||
|
|
||||||
_print_plan(plans, repo_root=repo_root)
|
|
||||||
|
|
||||||
if ns.preview:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if not plans:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if ns.force or _confirm_apply():
|
|
||||||
_apply_plans(plans, use_git=use_git, repo_root=repo_root)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
print("Aborted.")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
# Non-recursive: treat paths as files (existing behavior)
|
|
||||||
files: list[pathlib.Path] = []
|
|
||||||
for raw in ns.paths:
|
|
||||||
path = pathlib.Path(raw)
|
|
||||||
if not path.is_absolute():
|
|
||||||
path = (repo_root / path).resolve()
|
|
||||||
if not path.exists():
|
|
||||||
raise FileNotFoundError(str(path))
|
|
||||||
if path.is_dir():
|
|
||||||
raise IsADirectoryError(
|
|
||||||
f"{path} is a directory. Use -R/--recursive to migrate directories recursively."
|
|
||||||
)
|
|
||||||
files.append(path)
|
|
||||||
|
|
||||||
plans = [_build_plan(p) for p in files]
|
|
||||||
_print_plan(plans, repo_root=repo_root)
|
|
||||||
|
|
||||||
if ns.preview:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if not plans:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if ns.force or _confirm_apply():
|
|
||||||
_apply_plans(plans, use_git=use_git, repo_root=repo_root)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
print("Aborted.")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
|
__all__ = ["main", "migrate_one"]
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
raise SystemExit(main())
|
raise SystemExit(main())
|
||||||
|
|||||||
110
src/p2pkg/apply.py
Normal file
110
src/p2pkg/apply.py
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pathlib
|
||||||
|
|
||||||
|
from p2pkg import gitutils
|
||||||
|
from p2pkg.plans import MigrationPlan, build_plan
|
||||||
|
from p2pkg.templates import INIT_TEMPLATE, STUB_TEMPLATE
|
||||||
|
|
||||||
|
|
||||||
|
def _fs_mv(src: pathlib.Path, dst: pathlib.Path) -> None:
|
||||||
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
src.rename(dst)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_init(plan: MigrationPlan) -> None:
|
||||||
|
if not plan.target_init.exists():
|
||||||
|
plan.target_init.write_text(
|
||||||
|
INIT_TEMPLATE.format(name=plan.source.stem),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_stub(plan: MigrationPlan, overwrite: bool) -> None:
|
||||||
|
p = plan.legacy_stub
|
||||||
|
|
||||||
|
# After moving, p should be free. But be defensive:
|
||||||
|
if p.exists() and not p.is_file():
|
||||||
|
raise RuntimeError(f"Legacy stub target exists but is not a file: {p}")
|
||||||
|
|
||||||
|
if p.exists() and not overwrite:
|
||||||
|
raise RuntimeError(f"Refusing to overwrite existing legacy stub: {p}")
|
||||||
|
|
||||||
|
p.write_text(
|
||||||
|
STUB_TEMPLATE.format(
|
||||||
|
new_main_name=plan.new_main_module_name,
|
||||||
|
new_main_rel=plan.new_main_rel,
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_one(
|
||||||
|
py_file: pathlib.Path,
|
||||||
|
use_git: bool,
|
||||||
|
repo_root: pathlib.Path,
|
||||||
|
keep_legacy: bool = False,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Migrate a single flat module file (foo.py) into a package (foo/__main__.py).
|
||||||
|
|
||||||
|
Default behavior (keep_legacy=False):
|
||||||
|
- Move foo.py -> foo/__main__.py
|
||||||
|
- Create foo/__init__.py (compat re-exports)
|
||||||
|
- Remove legacy foo.py (no stub)
|
||||||
|
|
||||||
|
If keep_legacy=True:
|
||||||
|
- Additionally recreate foo.py as a stub that re-exports and forwards execution.
|
||||||
|
"""
|
||||||
|
plan = build_plan(py_file, repo_root=repo_root)
|
||||||
|
|
||||||
|
if plan.source.suffix != ".py":
|
||||||
|
raise ValueError(f"Not a .py file: {plan.source}")
|
||||||
|
|
||||||
|
if plan.target_main.exists():
|
||||||
|
raise RuntimeError(f"Refusing to overwrite existing: {plan.target_main}")
|
||||||
|
|
||||||
|
if plan.target_main.parent.exists() and not plan.target_main.parent.is_dir():
|
||||||
|
raise RuntimeError(f"Target exists but is not a directory: {plan.target_main.parent}")
|
||||||
|
|
||||||
|
if use_git:
|
||||||
|
gitutils.git_mv(plan.source, plan.target_main, cwd=repo_root)
|
||||||
|
else:
|
||||||
|
_fs_mv(plan.source, plan.target_main)
|
||||||
|
|
||||||
|
_write_init(plan)
|
||||||
|
|
||||||
|
if keep_legacy:
|
||||||
|
# After move: old path is free -> create stub
|
||||||
|
_write_stub(plan, overwrite=True)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_plans(
|
||||||
|
plans: list[MigrationPlan],
|
||||||
|
use_git: bool,
|
||||||
|
repo_root: pathlib.Path,
|
||||||
|
keep_legacy: bool,
|
||||||
|
) -> None:
|
||||||
|
# Preflight checks
|
||||||
|
for plan in plans:
|
||||||
|
if plan.target_main.exists():
|
||||||
|
raise RuntimeError(f"Refusing to overwrite existing: {plan.target_main}")
|
||||||
|
if plan.target_main.parent.exists() and not plan.target_main.parent.is_dir():
|
||||||
|
raise RuntimeError(f"Target exists but is not a directory: {plan.target_main.parent}")
|
||||||
|
|
||||||
|
# If keep_legacy, ensure the stub target is either absent (expected) or a file.
|
||||||
|
# Note: Before move, legacy_stub == source exists and is a file, which is fine.
|
||||||
|
if keep_legacy and plan.legacy_stub.exists() and not plan.legacy_stub.is_file():
|
||||||
|
raise RuntimeError(f"Legacy stub target exists but is not a file: {plan.legacy_stub}")
|
||||||
|
|
||||||
|
# Apply: move first, then generate init + optional stub
|
||||||
|
for plan in plans:
|
||||||
|
if use_git:
|
||||||
|
gitutils.git_mv(plan.source, plan.target_main, cwd=repo_root)
|
||||||
|
else:
|
||||||
|
_fs_mv(plan.source, plan.target_main)
|
||||||
|
|
||||||
|
_write_init(plan)
|
||||||
|
|
||||||
|
if keep_legacy:
|
||||||
|
_write_stub(plan, overwrite=True)
|
||||||
146
src/p2pkg/cli.py
Normal file
146
src/p2pkg/cli.py
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import pathlib
|
||||||
|
|
||||||
|
from p2pkg.apply import apply_plans
|
||||||
|
from p2pkg.discovery import discover_py_files_recursive
|
||||||
|
from p2pkg.gitutils import have_git_repo
|
||||||
|
from p2pkg.plans import MigrationPlan, build_plan, rel
|
||||||
|
|
||||||
|
|
||||||
|
def _print_plan(plans: list[MigrationPlan], repo_root: pathlib.Path, keep_legacy: bool) -> None:
|
||||||
|
if not plans:
|
||||||
|
print("No candidates found.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print("Planned migrations:")
|
||||||
|
for plan in plans:
|
||||||
|
print(f" - {rel(plan.source, repo_root)} -> {rel(plan.target_main, repo_root)}")
|
||||||
|
if keep_legacy:
|
||||||
|
print(f" (keep legacy stub) {rel(plan.legacy_stub, repo_root)}")
|
||||||
|
|
||||||
|
|
||||||
|
def _confirm_apply() -> bool:
|
||||||
|
try:
|
||||||
|
ans = input("Apply these changes? [y/N] ").strip().lower()
|
||||||
|
except EOFError:
|
||||||
|
return False
|
||||||
|
return ans in {"y", "yes"}
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> int:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
prog="p2pkg",
|
||||||
|
description="Migrate foo.py -> foo/__main__.py and generate foo/__init__.py that re-exports public API.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"paths",
|
||||||
|
nargs="+",
|
||||||
|
help="Python module files OR directories (with -R) to migrate (e.g. cli/setup/users.py cli/).",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-R",
|
||||||
|
"--recursive",
|
||||||
|
action="store_true",
|
||||||
|
help="Treat given paths as directories and discover *.py recursively.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-p",
|
||||||
|
"--preview",
|
||||||
|
action="store_true",
|
||||||
|
help="Preview only (do not change anything).",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-f",
|
||||||
|
"--force",
|
||||||
|
action="store_true",
|
||||||
|
help="Apply without asking for confirmation.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--no-git",
|
||||||
|
action="store_true",
|
||||||
|
help="Do not use `git mv` even if inside a git repo.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--repo-root",
|
||||||
|
default=".",
|
||||||
|
help="Repository root used for git operations (default: current working directory).",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-k",
|
||||||
|
"--keep",
|
||||||
|
action="store_true",
|
||||||
|
help="Keep legacy foo.py as a stub (default: remove legacy file).",
|
||||||
|
)
|
||||||
|
|
||||||
|
ns = parser.parse_args(argv)
|
||||||
|
repo_root = pathlib.Path(ns.repo_root).resolve()
|
||||||
|
|
||||||
|
keep_legacy = bool(ns.keep)
|
||||||
|
use_git = (not ns.no_git) and have_git_repo(repo_root)
|
||||||
|
|
||||||
|
if ns.recursive:
|
||||||
|
dirs: list[pathlib.Path] = []
|
||||||
|
for raw in ns.paths:
|
||||||
|
p = pathlib.Path(raw)
|
||||||
|
if not p.is_absolute():
|
||||||
|
p = (repo_root / p).resolve()
|
||||||
|
dirs.append(p)
|
||||||
|
|
||||||
|
for d in dirs:
|
||||||
|
if not d.exists():
|
||||||
|
raise FileNotFoundError(str(d))
|
||||||
|
if not d.is_dir():
|
||||||
|
raise NotADirectoryError(str(d))
|
||||||
|
|
||||||
|
candidates: list[pathlib.Path] = []
|
||||||
|
for d in dirs:
|
||||||
|
candidates.extend(discover_py_files_recursive(d))
|
||||||
|
|
||||||
|
candidates = sorted(set(candidates))
|
||||||
|
plans = [build_plan(p, repo_root=repo_root) for p in candidates]
|
||||||
|
|
||||||
|
_print_plan(plans, repo_root=repo_root, keep_legacy=keep_legacy)
|
||||||
|
|
||||||
|
if ns.preview:
|
||||||
|
return 0
|
||||||
|
if not plans:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if ns.force or _confirm_apply():
|
||||||
|
apply_plans(plans, use_git=use_git, repo_root=repo_root, keep_legacy=keep_legacy)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
print("Aborted.")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Non-recursive: treat paths as files
|
||||||
|
files: list[pathlib.Path] = []
|
||||||
|
for raw in ns.paths:
|
||||||
|
path = pathlib.Path(raw)
|
||||||
|
if not path.is_absolute():
|
||||||
|
path = (repo_root / path).resolve()
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(str(path))
|
||||||
|
if path.is_dir():
|
||||||
|
raise IsADirectoryError(
|
||||||
|
f"{path} is a directory. Use -R/--recursive to migrate directories recursively."
|
||||||
|
)
|
||||||
|
files.append(path)
|
||||||
|
|
||||||
|
plans = [build_plan(p, repo_root=repo_root) for p in files]
|
||||||
|
_print_plan(plans, repo_root=repo_root, keep_legacy=keep_legacy)
|
||||||
|
|
||||||
|
if ns.preview:
|
||||||
|
return 0
|
||||||
|
if not plans:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if ns.force or _confirm_apply():
|
||||||
|
apply_plans(plans, use_git=use_git, repo_root=repo_root, keep_legacy=keep_legacy)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
print("Aborted.")
|
||||||
|
return 1
|
||||||
42
src/p2pkg/discovery.py
Normal file
42
src/p2pkg/discovery.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pathlib
|
||||||
|
|
||||||
|
|
||||||
|
def is_candidate_py_file(path: pathlib.Path) -> bool:
|
||||||
|
"""
|
||||||
|
Strategy A candidate selection:
|
||||||
|
|
||||||
|
- Only consider *.py files (excluding __init__.py / __main__.py).
|
||||||
|
- Only migrate files that are inside a package tree whose TOP package already has __main__.py
|
||||||
|
(i.e., there is some parent dir with __init__.py, and the outermost such dir also has __main__.py).
|
||||||
|
|
||||||
|
This matches your requirement: "only those that lie in a package and have a __main__.py".
|
||||||
|
"""
|
||||||
|
if path.suffix != ".py":
|
||||||
|
return False
|
||||||
|
if path.name in {"__init__.py", "__main__.py"}:
|
||||||
|
return False
|
||||||
|
|
||||||
|
pkg_ancestors: list[pathlib.Path] = []
|
||||||
|
for parent in [path.parent, *path.parents]:
|
||||||
|
if (parent / "__init__.py").exists():
|
||||||
|
pkg_ancestors.append(parent)
|
||||||
|
|
||||||
|
if not pkg_ancestors:
|
||||||
|
return False
|
||||||
|
|
||||||
|
outermost_pkg = pkg_ancestors[-1]
|
||||||
|
if not (outermost_pkg / "__main__.py").exists():
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def discover_py_files_recursive(root: pathlib.Path) -> list[pathlib.Path]:
|
||||||
|
root = root.resolve()
|
||||||
|
files: list[pathlib.Path] = []
|
||||||
|
for p in root.rglob("*.py"):
|
||||||
|
if p.is_file() and is_candidate_py_file(p):
|
||||||
|
files.append(p.resolve())
|
||||||
|
return sorted(set(files))
|
||||||
27
src/p2pkg/gitutils.py
Normal file
27
src/p2pkg/gitutils.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pathlib
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
|
def run(cmd: list[str], cwd: pathlib.Path | None = None) -> None:
|
||||||
|
subprocess.run(cmd, check=True, cwd=str(cwd) if cwd else None)
|
||||||
|
|
||||||
|
|
||||||
|
def have_git_repo(root: pathlib.Path) -> bool:
|
||||||
|
try:
|
||||||
|
subprocess.run(
|
||||||
|
["git", "rev-parse", "--is-inside-work-tree"],
|
||||||
|
cwd=str(root),
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def git_mv(src: pathlib.Path, dst: pathlib.Path, cwd: pathlib.Path) -> None:
|
||||||
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
run(["git", "mv", str(src), str(dst)], cwd=cwd)
|
||||||
60
src/p2pkg/plans.py
Normal file
60
src/p2pkg/plans.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pathlib
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class MigrationPlan:
|
||||||
|
source: pathlib.Path
|
||||||
|
target_main: pathlib.Path
|
||||||
|
target_init: pathlib.Path
|
||||||
|
legacy_stub: pathlib.Path
|
||||||
|
legacy_module_name: str
|
||||||
|
new_main_module_name: str
|
||||||
|
new_main_rel: str
|
||||||
|
|
||||||
|
|
||||||
|
def module_name_from_path(py_file: pathlib.Path, repo_root: pathlib.Path) -> str:
|
||||||
|
"""
|
||||||
|
Convert a file path to a dotted module name relative to repo_root.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
repo_root=/repo
|
||||||
|
py_file=/repo/cli/setup/users.py -> "cli.setup.users"
|
||||||
|
"""
|
||||||
|
rel = py_file.resolve().relative_to(repo_root.resolve())
|
||||||
|
if rel.suffix != ".py":
|
||||||
|
raise ValueError(f"Not a .py file: {py_file}")
|
||||||
|
parts = list(rel.with_suffix("").parts)
|
||||||
|
return ".".join(parts)
|
||||||
|
|
||||||
|
|
||||||
|
def rel(path: pathlib.Path, repo_root: pathlib.Path) -> str:
|
||||||
|
try:
|
||||||
|
return str(path.resolve().relative_to(repo_root.resolve()))
|
||||||
|
except Exception:
|
||||||
|
return str(path)
|
||||||
|
|
||||||
|
|
||||||
|
def build_plan(py_file: pathlib.Path, repo_root: pathlib.Path) -> MigrationPlan:
|
||||||
|
py_file = py_file.resolve()
|
||||||
|
name = py_file.stem
|
||||||
|
pkg_dir = py_file.parent / name
|
||||||
|
|
||||||
|
target_main = pkg_dir / "__main__.py"
|
||||||
|
target_init = pkg_dir / "__init__.py"
|
||||||
|
|
||||||
|
legacy_module_name = module_name_from_path(py_file, repo_root=repo_root)
|
||||||
|
new_main_module_name = f"{legacy_module_name}.__main__"
|
||||||
|
new_main_rel = rel(target_main, repo_root=repo_root)
|
||||||
|
|
||||||
|
return MigrationPlan(
|
||||||
|
source=py_file,
|
||||||
|
target_main=target_main,
|
||||||
|
target_init=target_init,
|
||||||
|
legacy_stub=py_file,
|
||||||
|
legacy_module_name=legacy_module_name,
|
||||||
|
new_main_module_name=new_main_module_name,
|
||||||
|
new_main_rel=new_main_rel,
|
||||||
|
)
|
||||||
63
src/p2pkg/templates.py
Normal file
63
src/p2pkg/templates.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
INIT_TEMPLATE = """\
|
||||||
|
\"\"\"Compatibility wrapper.
|
||||||
|
|
||||||
|
This package was migrated from a flat module ({name}.py) to a package layout:
|
||||||
|
{name}/__main__.py contains the original implementation.
|
||||||
|
|
||||||
|
We re-export the public API so existing imports keep working.
|
||||||
|
\"\"\"
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from . import __main__ as _main
|
||||||
|
from .__main__ import * # noqa: F401,F403
|
||||||
|
|
||||||
|
# Prefer explicit __all__ if the original module defined it.
|
||||||
|
__all__ = getattr(_main, "__all__", [n for n in dir(_main) if not n.startswith("_")])
|
||||||
|
"""
|
||||||
|
|
||||||
|
STUB_TEMPLATE = """\
|
||||||
|
\"\"\"Legacy stub for backwards compatibility.
|
||||||
|
|
||||||
|
This file was migrated to a package:
|
||||||
|
{new_main_rel}
|
||||||
|
|
||||||
|
- Importers should prefer importing the package module path.
|
||||||
|
- Direct execution is forwarded to the new package entrypoint if available.
|
||||||
|
\"\"\"
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import importlib
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
def _load():
|
||||||
|
return importlib.import_module({new_main_name!r})
|
||||||
|
|
||||||
|
|
||||||
|
# Re-export public API from the new implementation module.
|
||||||
|
_mod = _load()
|
||||||
|
for _name in getattr(_mod, "__all__", [n for n in dir(_mod) if not n.startswith("_")]):
|
||||||
|
globals()[_name] = getattr(_mod, _name)
|
||||||
|
|
||||||
|
__all__ = getattr(_mod, "__all__", [n for n in dir(_mod) if not n.startswith("_")])
|
||||||
|
|
||||||
|
|
||||||
|
def _main() -> int:
|
||||||
|
# Forward to "main(argv)" if present, otherwise do nothing.
|
||||||
|
main = getattr(_mod, "main", None)
|
||||||
|
if callable(main):
|
||||||
|
try:
|
||||||
|
return int(main()) # type: ignore[arg-type]
|
||||||
|
except TypeError:
|
||||||
|
# main(argv) signature
|
||||||
|
return int(main(sys.argv[1:])) # type: ignore[misc]
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(_main())
|
||||||
|
"""
|
||||||
@@ -9,13 +9,41 @@ from contextlib import redirect_stdout
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
# Ensure we import THIS repo's src/ implementation, not an installed site-packages one.
|
# Ensure we import THIS repo's src/ implementation, not an installed site-packages one.
|
||||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||||
SRC_DIR = REPO_ROOT / "src"
|
SRC_DIR = REPO_ROOT / "src"
|
||||||
sys.path.insert(0, str(SRC_DIR))
|
sys.path.insert(0, str(SRC_DIR))
|
||||||
|
|
||||||
from p2pkg.__main__ import main, migrate_one # noqa: E402
|
from p2pkg.cli import main # noqa: E402
|
||||||
|
from p2pkg.apply import migrate_one # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
def _mk_strategy_a_tree(root: Path) -> tuple[Path, Path]:
|
||||||
|
"""
|
||||||
|
Build a directory structure that matches Strategy A discovery:
|
||||||
|
|
||||||
|
root/
|
||||||
|
pkg/
|
||||||
|
__init__.py
|
||||||
|
__main__.py
|
||||||
|
tree/
|
||||||
|
x.py
|
||||||
|
y.py
|
||||||
|
|
||||||
|
Returns (pkg_dir, tree_dir).
|
||||||
|
"""
|
||||||
|
pkg = root / "pkg"
|
||||||
|
tree = pkg / "tree"
|
||||||
|
tree.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Mark pkg as a runnable package (outermost package with __main__.py)
|
||||||
|
(pkg / "__init__.py").write_text("# pkg init\n", encoding="utf-8")
|
||||||
|
(pkg / "__main__.py").write_text("# pkg main\n", encoding="utf-8")
|
||||||
|
|
||||||
|
# Candidate files (inside pkg/* and outermost pkg has __main__.py)
|
||||||
|
(tree / "x.py").write_text("X = 1\n", encoding="utf-8")
|
||||||
|
(tree / "y.py").write_text("Y = 2\n", encoding="utf-8")
|
||||||
|
return pkg, tree
|
||||||
|
|
||||||
|
|
||||||
class TestMigration(unittest.TestCase):
|
class TestMigration(unittest.TestCase):
|
||||||
@@ -40,12 +68,12 @@ class TestMigration(unittest.TestCase):
|
|||||||
encoding="utf-8",
|
encoding="utf-8",
|
||||||
)
|
)
|
||||||
|
|
||||||
migrate_one(mod, use_git=False, repo_root=root)
|
migrate_one(mod, use_git=False, repo_root=root, keep_legacy=False)
|
||||||
|
|
||||||
pkg = root / "roles_list"
|
pkg = root / "roles_list"
|
||||||
self.assertTrue((pkg / "__main__.py").exists())
|
self.assertTrue((pkg / "__main__.py").exists())
|
||||||
self.assertTrue((pkg / "__init__.py").exists())
|
self.assertTrue((pkg / "__init__.py").exists())
|
||||||
self.assertFalse(mod.exists())
|
self.assertFalse(mod.exists(), "Legacy roles_list.py should be removed by default")
|
||||||
|
|
||||||
sys.path.insert(0, str(root))
|
sys.path.insert(0, str(root))
|
||||||
try:
|
try:
|
||||||
@@ -62,7 +90,25 @@ class TestMigration(unittest.TestCase):
|
|||||||
sys.path.remove(str(root))
|
sys.path.remove(str(root))
|
||||||
sys.modules.pop("roles_list", None)
|
sys.modules.pop("roles_list", None)
|
||||||
|
|
||||||
def test_main_non_recursive_prompts_and_applies_on_yes(self) -> None:
|
def test_migrate_one_keep_legacy_writes_stub(self) -> None:
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
root = Path(td)
|
||||||
|
mod = root / "foo.py"
|
||||||
|
mod.write_text("X = 1\n", encoding="utf-8")
|
||||||
|
|
||||||
|
migrate_one(mod, use_git=False, repo_root=root, keep_legacy=True)
|
||||||
|
|
||||||
|
# New package exists
|
||||||
|
self.assertTrue((root / "foo" / "__main__.py").exists())
|
||||||
|
self.assertTrue((root / "foo" / "__init__.py").exists())
|
||||||
|
|
||||||
|
# Legacy file exists as stub
|
||||||
|
self.assertTrue((root / "foo.py").exists())
|
||||||
|
stub = (root / "foo.py").read_text(encoding="utf-8")
|
||||||
|
self.assertIn("Legacy stub", stub)
|
||||||
|
self.assertGreater(len(stub.strip()), 20, "Stub should not be empty")
|
||||||
|
|
||||||
|
def test_main_non_recursive_default_removes_legacy_file(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as td:
|
with tempfile.TemporaryDirectory() as td:
|
||||||
root = Path(td)
|
root = Path(td)
|
||||||
mod = root / "foo.py"
|
mod = root / "foo.py"
|
||||||
@@ -73,85 +119,115 @@ class TestMigration(unittest.TestCase):
|
|||||||
rc = main(["--no-git", "--repo-root", str(root), str(mod)])
|
rc = main(["--no-git", "--repo-root", str(root), str(mod)])
|
||||||
|
|
||||||
self.assertEqual(rc, 0)
|
self.assertEqual(rc, 0)
|
||||||
self.assertFalse(mod.exists())
|
self.assertFalse((root / "foo.py").exists(), "Without -k, legacy foo.py must not exist")
|
||||||
self.assertTrue((root / "foo" / "__main__.py").exists())
|
self.assertTrue((root / "foo" / "__main__.py").exists())
|
||||||
self.assertTrue((root / "foo" / "__init__.py").exists())
|
self.assertTrue((root / "foo" / "__init__.py").exists())
|
||||||
|
|
||||||
|
def test_main_non_recursive_keep_preserves_stub(self) -> None:
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
root = Path(td)
|
||||||
|
mod = root / "foo.py"
|
||||||
|
mod.write_text("X = 1\n", encoding="utf-8")
|
||||||
|
|
||||||
|
buf = io.StringIO()
|
||||||
|
with redirect_stdout(buf), patch("builtins.input", return_value="y"):
|
||||||
|
rc = main(["-k", "--no-git", "--repo-root", str(root), str(mod)])
|
||||||
|
|
||||||
|
self.assertEqual(rc, 0)
|
||||||
|
self.assertTrue((root / "foo.py").exists(), "With -k, legacy foo.py stub must exist")
|
||||||
|
self.assertTrue((root / "foo" / "__main__.py").exists())
|
||||||
|
self.assertTrue((root / "foo" / "__init__.py").exists())
|
||||||
|
|
||||||
def test_main_recursive_preview_does_not_apply_and_does_not_prompt(self) -> None:
|
def test_main_recursive_preview_does_not_apply_and_does_not_prompt(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as td:
|
with tempfile.TemporaryDirectory() as td:
|
||||||
root = Path(td)
|
root = Path(td)
|
||||||
target = root / "tree"
|
_, tree = _mk_strategy_a_tree(root)
|
||||||
target.mkdir()
|
|
||||||
|
|
||||||
(target / "x.py").write_text("X = 1\n", encoding="utf-8")
|
|
||||||
(target / "y.py").write_text("Y = 2\n", encoding="utf-8")
|
|
||||||
|
|
||||||
buf = io.StringIO()
|
buf = io.StringIO()
|
||||||
with redirect_stdout(buf), patch("builtins.input") as mocked_input:
|
with redirect_stdout(buf), patch("builtins.input") as mocked_input:
|
||||||
rc = main(["-R", "-p", "--no-git", "--repo-root", str(root), str(target)])
|
rc = main(["-R", "-p", "--no-git", "--repo-root", str(root), str(tree)])
|
||||||
|
|
||||||
self.assertEqual(rc, 0)
|
self.assertEqual(rc, 0)
|
||||||
mocked_input.assert_not_called()
|
mocked_input.assert_not_called()
|
||||||
self.assertTrue((target / "x.py").exists())
|
|
||||||
self.assertTrue((target / "y.py").exists())
|
|
||||||
self.assertFalse((target / "x").exists())
|
|
||||||
self.assertFalse((target / "y").exists())
|
|
||||||
|
|
||||||
def test_main_recursive_force_applies_without_prompt(self) -> None:
|
# Preview: nothing changed
|
||||||
|
self.assertTrue((tree / "x.py").exists())
|
||||||
|
self.assertTrue((tree / "y.py").exists())
|
||||||
|
self.assertFalse((tree / "x").exists())
|
||||||
|
self.assertFalse((tree / "y").exists())
|
||||||
|
|
||||||
|
def test_main_recursive_force_applies_without_prompt_default_removes_legacy(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as td:
|
with tempfile.TemporaryDirectory() as td:
|
||||||
root = Path(td)
|
root = Path(td)
|
||||||
target = root / "tree"
|
_, tree = _mk_strategy_a_tree(root)
|
||||||
target.mkdir()
|
|
||||||
|
|
||||||
(target / "x.py").write_text("X = 1\n", encoding="utf-8")
|
|
||||||
(target / "y.py").write_text("Y = 2\n", encoding="utf-8")
|
|
||||||
|
|
||||||
buf = io.StringIO()
|
buf = io.StringIO()
|
||||||
with redirect_stdout(buf), patch("builtins.input") as mocked_input:
|
with redirect_stdout(buf), patch("builtins.input") as mocked_input:
|
||||||
rc = main(["-R", "-f", "--no-git", "--repo-root", str(root), str(target)])
|
rc = main(["-R", "-f", "--no-git", "--repo-root", str(root), str(tree)])
|
||||||
|
|
||||||
self.assertEqual(rc, 0)
|
self.assertEqual(rc, 0)
|
||||||
mocked_input.assert_not_called()
|
mocked_input.assert_not_called()
|
||||||
self.assertFalse((target / "x.py").exists())
|
|
||||||
self.assertFalse((target / "y.py").exists())
|
# default: legacy removed
|
||||||
self.assertTrue((target / "x" / "__main__.py").exists())
|
self.assertFalse((tree / "x.py").exists())
|
||||||
self.assertTrue((target / "y" / "__main__.py").exists())
|
self.assertFalse((tree / "y.py").exists())
|
||||||
|
self.assertTrue((tree / "x" / "__main__.py").exists())
|
||||||
|
self.assertTrue((tree / "y" / "__main__.py").exists())
|
||||||
|
self.assertTrue((tree / "x" / "__init__.py").exists())
|
||||||
|
self.assertTrue((tree / "y" / "__init__.py").exists())
|
||||||
|
|
||||||
|
def test_main_recursive_force_keep_creates_stubs(self) -> None:
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
root = Path(td)
|
||||||
|
_, tree = _mk_strategy_a_tree(root)
|
||||||
|
|
||||||
|
buf = io.StringIO()
|
||||||
|
with redirect_stdout(buf), patch("builtins.input") as mocked_input:
|
||||||
|
rc = main(["-R", "-f", "-k", "--no-git", "--repo-root", str(root), str(tree)])
|
||||||
|
|
||||||
|
self.assertEqual(rc, 0)
|
||||||
|
mocked_input.assert_not_called()
|
||||||
|
|
||||||
|
# keep: legacy stubs exist + packages exist
|
||||||
|
self.assertTrue((tree / "x.py").exists())
|
||||||
|
self.assertTrue((tree / "y.py").exists())
|
||||||
|
self.assertTrue((tree / "x" / "__main__.py").exists())
|
||||||
|
self.assertTrue((tree / "y" / "__main__.py").exists())
|
||||||
|
|
||||||
|
stubx = (tree / "x.py").read_text(encoding="utf-8")
|
||||||
|
self.assertIn("Legacy stub", stubx)
|
||||||
|
|
||||||
def test_main_recursive_prompts_and_aborts_on_no(self) -> None:
|
def test_main_recursive_prompts_and_aborts_on_no(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as td:
|
with tempfile.TemporaryDirectory() as td:
|
||||||
root = Path(td)
|
root = Path(td)
|
||||||
target = root / "tree"
|
_, tree = _mk_strategy_a_tree(root)
|
||||||
target.mkdir()
|
|
||||||
|
|
||||||
(target / "x.py").write_text("X = 1\n", encoding="utf-8")
|
|
||||||
|
|
||||||
buf = io.StringIO()
|
buf = io.StringIO()
|
||||||
with redirect_stdout(buf), patch("builtins.input", return_value="n"):
|
with redirect_stdout(buf), patch("builtins.input", return_value="n"):
|
||||||
rc = main(["-R", "--no-git", "--repo-root", str(root), str(target)])
|
rc = main(["-R", "--no-git", "--repo-root", str(root), str(tree)])
|
||||||
|
|
||||||
self.assertEqual(rc, 1)
|
self.assertEqual(rc, 1)
|
||||||
self.assertTrue((target / "x.py").exists())
|
|
||||||
self.assertFalse((target / "x").exists())
|
|
||||||
|
|
||||||
def test_main_recursive_prompts_and_applies_on_yes(self) -> None:
|
# No changes
|
||||||
|
self.assertTrue((tree / "x.py").exists())
|
||||||
|
self.assertTrue((tree / "y.py").exists())
|
||||||
|
self.assertFalse((tree / "x").exists())
|
||||||
|
self.assertFalse((tree / "y").exists())
|
||||||
|
|
||||||
|
def test_main_recursive_prompts_and_applies_on_yes_default_removes_legacy(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as td:
|
with tempfile.TemporaryDirectory() as td:
|
||||||
root = Path(td)
|
root = Path(td)
|
||||||
target = root / "tree"
|
_, tree = _mk_strategy_a_tree(root)
|
||||||
target.mkdir()
|
|
||||||
|
|
||||||
(target / "x.py").write_text("X = 1\n", encoding="utf-8")
|
|
||||||
(target / "y.py").write_text("Y = 2\n", encoding="utf-8")
|
|
||||||
|
|
||||||
buf = io.StringIO()
|
buf = io.StringIO()
|
||||||
with redirect_stdout(buf), patch("builtins.input", return_value="y"):
|
with redirect_stdout(buf), patch("builtins.input", return_value="y"):
|
||||||
rc = main(["-R", "--no-git", "--repo-root", str(root), str(target)])
|
rc = main(["-R", "--no-git", "--repo-root", str(root), str(tree)])
|
||||||
|
|
||||||
self.assertEqual(rc, 0)
|
self.assertEqual(rc, 0)
|
||||||
self.assertFalse((target / "x.py").exists())
|
self.assertFalse((tree / "x.py").exists())
|
||||||
self.assertFalse((target / "y.py").exists())
|
self.assertFalse((tree / "y.py").exists())
|
||||||
self.assertTrue((target / "x" / "__main__.py").exists())
|
self.assertTrue((tree / "x" / "__main__.py").exists())
|
||||||
self.assertTrue((target / "y" / "__main__.py").exists())
|
self.assertTrue((tree / "y" / "__main__.py").exists())
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user