from __future__ import annotations import io import sys import tempfile import textwrap import unittest from contextlib import redirect_stdout from pathlib import Path from unittest.mock import patch # Ensure we import THIS repo's src/ implementation, not an installed site-packages one. REPO_ROOT = Path(__file__).resolve().parents[1] SRC_DIR = REPO_ROOT / "src" sys.path.insert(0, str(SRC_DIR)) from p2pkg.cli import main # noqa: E402 from p2pkg.apply import migrate_one # noqa: E402 def _mk_strategy_a_tree(root: Path) -> tuple[Path, Path]: """ Build a directory structure that matches Strategy A discovery: root/ pkg/ __init__.py __main__.py tree/ x.py y.py Returns (pkg_dir, tree_dir). """ pkg = root / "pkg" tree = pkg / "tree" tree.mkdir(parents=True, exist_ok=True) # Mark pkg as a runnable package (outermost package with __main__.py) (pkg / "__init__.py").write_text("# pkg init\n", encoding="utf-8") (pkg / "__main__.py").write_text("# pkg main\n", encoding="utf-8") # Candidate files (inside pkg/* and outermost pkg has __main__.py) (tree / "x.py").write_text("X = 1\n", encoding="utf-8") (tree / "y.py").write_text("Y = 2\n", encoding="utf-8") return pkg, tree class TestMigration(unittest.TestCase): def test_migrate_one_creates_package_and_exports_public_api(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) mod = root / "roles_list.py" mod.write_text( textwrap.dedent( """\ __all__ = ["add", "PUBLIC_CONST"] PUBLIC_CONST = 123 _PRIVATE_CONST = 999 def add(a: int, b: int) -> int: return a + b def _hidden() -> int: return 1 """ ), encoding="utf-8", ) migrate_one(mod, use_git=False, repo_root=root, keep_legacy=False) pkg = root / "roles_list" self.assertTrue((pkg / "__main__.py").exists()) self.assertTrue((pkg / "__init__.py").exists()) self.assertFalse(mod.exists(), "Legacy roles_list.py should be removed by default") sys.path.insert(0, str(root)) try: import roles_list # type: ignore self.assertTrue(hasattr(roles_list, "add")) self.assertEqual(roles_list.add(2, 3), 5) self.assertTrue(hasattr(roles_list, "PUBLIC_CONST")) self.assertEqual(roles_list.PUBLIC_CONST, 123) self.assertEqual(set(roles_list.__all__), {"add", "PUBLIC_CONST"}) self.assertFalse(hasattr(roles_list, "_hidden")) finally: sys.path.remove(str(root)) sys.modules.pop("roles_list", None) def test_migrate_one_keep_legacy_writes_stub(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) mod = root / "foo.py" mod.write_text("X = 1\n", encoding="utf-8") migrate_one(mod, use_git=False, repo_root=root, keep_legacy=True) # New package exists self.assertTrue((root / "foo" / "__main__.py").exists()) self.assertTrue((root / "foo" / "__init__.py").exists()) # Legacy file exists as stub self.assertTrue((root / "foo.py").exists()) stub = (root / "foo.py").read_text(encoding="utf-8") self.assertIn("Legacy stub", stub) self.assertGreater(len(stub.strip()), 20, "Stub should not be empty") def test_main_non_recursive_default_removes_legacy_file(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) mod = root / "foo.py" mod.write_text("X = 1\n", encoding="utf-8") buf = io.StringIO() with redirect_stdout(buf), patch("builtins.input", return_value="y"): rc = main(["--no-git", "--repo-root", str(root), str(mod)]) self.assertEqual(rc, 0) self.assertFalse((root / "foo.py").exists(), "Without -k, legacy foo.py must not exist") self.assertTrue((root / "foo" / "__main__.py").exists()) self.assertTrue((root / "foo" / "__init__.py").exists()) def test_main_non_recursive_keep_preserves_stub(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) mod = root / "foo.py" mod.write_text("X = 1\n", encoding="utf-8") buf = io.StringIO() with redirect_stdout(buf), patch("builtins.input", return_value="y"): rc = main(["-k", "--no-git", "--repo-root", str(root), str(mod)]) self.assertEqual(rc, 0) self.assertTrue((root / "foo.py").exists(), "With -k, legacy foo.py stub must exist") self.assertTrue((root / "foo" / "__main__.py").exists()) self.assertTrue((root / "foo" / "__init__.py").exists()) def test_main_recursive_preview_does_not_apply_and_does_not_prompt(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) _, tree = _mk_strategy_a_tree(root) buf = io.StringIO() with redirect_stdout(buf), patch("builtins.input") as mocked_input: rc = main(["-R", "-p", "--no-git", "--repo-root", str(root), str(tree)]) self.assertEqual(rc, 0) mocked_input.assert_not_called() # Preview: nothing changed self.assertTrue((tree / "x.py").exists()) self.assertTrue((tree / "y.py").exists()) self.assertFalse((tree / "x").exists()) self.assertFalse((tree / "y").exists()) def test_main_recursive_force_applies_without_prompt_default_removes_legacy(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) _, tree = _mk_strategy_a_tree(root) buf = io.StringIO() with redirect_stdout(buf), patch("builtins.input") as mocked_input: rc = main(["-R", "-f", "--no-git", "--repo-root", str(root), str(tree)]) self.assertEqual(rc, 0) mocked_input.assert_not_called() # default: legacy removed self.assertFalse((tree / "x.py").exists()) self.assertFalse((tree / "y.py").exists()) self.assertTrue((tree / "x" / "__main__.py").exists()) self.assertTrue((tree / "y" / "__main__.py").exists()) self.assertTrue((tree / "x" / "__init__.py").exists()) self.assertTrue((tree / "y" / "__init__.py").exists()) def test_main_recursive_force_keep_creates_stubs(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) _, tree = _mk_strategy_a_tree(root) buf = io.StringIO() with redirect_stdout(buf), patch("builtins.input") as mocked_input: rc = main(["-R", "-f", "-k", "--no-git", "--repo-root", str(root), str(tree)]) self.assertEqual(rc, 0) mocked_input.assert_not_called() # keep: legacy stubs exist + packages exist self.assertTrue((tree / "x.py").exists()) self.assertTrue((tree / "y.py").exists()) self.assertTrue((tree / "x" / "__main__.py").exists()) self.assertTrue((tree / "y" / "__main__.py").exists()) stubx = (tree / "x.py").read_text(encoding="utf-8") self.assertIn("Legacy stub", stubx) def test_main_recursive_prompts_and_aborts_on_no(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) _, tree = _mk_strategy_a_tree(root) buf = io.StringIO() with redirect_stdout(buf), patch("builtins.input", return_value="n"): rc = main(["-R", "--no-git", "--repo-root", str(root), str(tree)]) self.assertEqual(rc, 1) # No changes self.assertTrue((tree / "x.py").exists()) self.assertTrue((tree / "y.py").exists()) self.assertFalse((tree / "x").exists()) self.assertFalse((tree / "y").exists()) def test_main_recursive_prompts_and_applies_on_yes_default_removes_legacy(self) -> None: with tempfile.TemporaryDirectory() as td: root = Path(td) _, tree = _mk_strategy_a_tree(root) buf = io.StringIO() with redirect_stdout(buf), patch("builtins.input", return_value="y"): rc = main(["-R", "--no-git", "--repo-root", str(root), str(tree)]) self.assertEqual(rc, 0) self.assertFalse((tree / "x.py").exists()) self.assertFalse((tree / "y.py").exists()) self.assertTrue((tree / "x" / "__main__.py").exists()) self.assertTrue((tree / "y" / "__main__.py").exists()) if __name__ == "__main__": unittest.main(verbosity=2)