From 37fd2192a5971682586e5d60acda4eadd4c26936 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Tue, 12 May 2026 22:18:31 +0200 Subject: [PATCH] feat(pull,push): parallel execution via --jobs flag Adds `pkgmgr pull -j N` and `pkgmgr push -j N` for concurrent operation across repositories (default: min(cpu_count, 8), use 1 for sequential). Verification in pull also parallelizes; interactive prompts and the actual git command still run on the main thread. Shared parallel-runner and repo-resolution helpers live in a new `_parallel.py` module. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/pkgmgr/actions/repository/_parallel.py | 91 ++++++++++++++++++ src/pkgmgr/actions/repository/pull.py | 105 ++++++++++++++++----- src/pkgmgr/actions/repository/push.py | 39 ++++++++ src/pkgmgr/cli/proxy.py | 22 +++++ src/pkgmgr/core/git/commands/__init__.py | 3 + src/pkgmgr/core/git/commands/push_args.py | 39 ++++++++ 6 files changed, 273 insertions(+), 26 deletions(-) create mode 100644 src/pkgmgr/actions/repository/_parallel.py create mode 100644 src/pkgmgr/actions/repository/push.py create mode 100644 src/pkgmgr/core/git/commands/push_args.py diff --git a/src/pkgmgr/actions/repository/_parallel.py b/src/pkgmgr/actions/repository/_parallel.py new file mode 100644 index 0000000..6d0e847 --- /dev/null +++ b/src/pkgmgr/actions/repository/_parallel.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import os +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Callable, Dict, List, Tuple + +from pkgmgr.core.repository.dir import get_repo_dir +from pkgmgr.core.repository.identifier import get_repo_identifier + +Repository = Dict[str, Any] +RepoRef = Tuple[str, str] +OpResult = Tuple[bool, str] +RepoOp = Callable[[str], OpResult] + + +def resolve_repos( + selected_repos: List[Repository], + repositories_base_dir: str, + all_repos: List[Repository], +) -> List[RepoRef]: + """ + Resolve ``(identifier, repo_dir)`` pairs for ``selected_repos``. + + Repositories whose directory does not exist on disk are reported and + skipped, matching the prior behavior of pull/push handlers. + """ + resolved: List[RepoRef] = [] + for repo in selected_repos: + ident = get_repo_identifier(repo, all_repos) + rd = get_repo_dir(repositories_base_dir, repo) + if not os.path.exists(rd): + print(f"Repository directory '{rd}' not found for {ident}.") + continue + resolved.append((ident, rd)) + return resolved + + +def run_on_repos( + repos: List[RepoRef], + op: RepoOp, + *, + jobs: int, + op_name: str, +) -> None: + """ + Run ``op(repo_dir) -> (ok, msg)`` for each repo, optionally in parallel. + + - ``jobs == 1``: serial, quiet on success, prints ``msg`` on failure. + - ``jobs > 1``: parallel via ThreadPoolExecutor, prints a banner plus + ``[OK]``/``[FAIL]`` per repo and a final summary. + - Exits with status 1 if any operation failed. + """ + if not repos: + return + + effective_jobs = max(1, min(jobs, len(repos))) + failed: List[Tuple[str, str]] = [] + + if effective_jobs == 1: + for ident, rd in repos: + ok, msg = op(rd) + if not ok: + print(msg) + failed.append((ident, msg)) + else: + print( + f"[{op_name.upper()}] Running {len(repos)} {op_name}(s) with up to " + f"{effective_jobs} parallel jobs..." + ) + with ThreadPoolExecutor(max_workers=effective_jobs) as executor: + futures = {executor.submit(op, rd): ident for ident, rd in repos} + for future in as_completed(futures): + ident = futures[future] + ok, msg = future.result() + if ok: + print(f"[OK] {ident}") + else: + print(f"[FAIL] {ident}") + for line in msg.splitlines(): + print(f" {line}") + failed.append((ident, msg)) + + if failed: + if effective_jobs > 1: + print( + f"\n[SUMMARY] {len(failed)} of {len(repos)} {op_name}(s) failed:" + ) + for ident, _msg in failed: + print(f" - {ident}") + sys.exit(1) diff --git a/src/pkgmgr/actions/repository/pull.py b/src/pkgmgr/actions/repository/pull.py index cc35a3a..c61249f 100644 --- a/src/pkgmgr/actions/repository/pull.py +++ b/src/pkgmgr/actions/repository/pull.py @@ -1,17 +1,66 @@ from __future__ import annotations import os -import sys -from typing import List, Dict, Any +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List, Tuple +from pkgmgr.actions.repository._parallel import RepoRef, run_on_repos from pkgmgr.core.git.commands import pull_args, GitPullArgsError -from pkgmgr.core.repository.dir import get_repo_dir from pkgmgr.core.repository.identifier import get_repo_identifier +from pkgmgr.core.repository.dir import get_repo_dir from pkgmgr.core.repository.verify import verify_repository Repository = Dict[str, Any] +def _pull_one(repo_dir: str, extra_args: List[str], preview: bool) -> Tuple[bool, str]: + try: + pull_args(extra_args, cwd=repo_dir, preview=preview) + return (True, "") + except GitPullArgsError as exc: + return (False, str(exc)) + + +def _verify_one( + repo: Repository, + repo_dir: str, + no_verification: bool, +) -> Tuple[bool, bool, List[str]]: + """Returns (has_verified_info, verified_ok, errors).""" + verified_ok, errors, _commit, _key = verify_repository( + repo, repo_dir, mode="pull", no_verification=no_verification, + ) + return (bool(repo.get("verified")), verified_ok, errors) + + +def _verify_all( + candidates: List[Tuple[Repository, str, str]], + no_verification: bool, + jobs: int, +) -> List[Tuple[str, str, bool, bool, List[str]]]: + """ + Verify all candidates (parallel if ``jobs > 1``), preserving input order. + + Returns one tuple per candidate: ``(ident, repo_dir, has_verified_info, + verified_ok, errors)``. + """ + verify_jobs = max(1, min(jobs, len(candidates))) + if verify_jobs == 1: + return [ + (ident, rd, *_verify_one(repo, rd, no_verification)) + for repo, ident, rd in candidates + ] + with ThreadPoolExecutor(max_workers=verify_jobs) as executor: + futures = [ + executor.submit(_verify_one, repo, rd, no_verification) + for repo, _ident, rd in candidates + ] + results = [f.result() for f in futures] + return [ + (ident, rd, *res) for (_repo, ident, rd), res in zip(candidates, results) + ] + + def pull_with_verification( selected_repos: List[Repository], repositories_base_dir: str, @@ -19,41 +68,45 @@ def pull_with_verification( extra_args: List[str], no_verification: bool, preview: bool, + jobs: int = 1, ) -> None: """ Execute `git pull` for each repository with verification. - - If verification fails and verification is enabled, prompt user to continue. - - Uses core.git.commands.pull_args() (no raw subprocess usage). + - Verification (I/O-bound) runs in parallel when ``jobs > 1``. + - Interactive prompts for failed verifications are handled serially on the + main thread after parallel verification completes. + - Approved repos are then pulled in parallel when ``jobs > 1``. + - On any pull failure, prints a summary and exits with status 1. """ + candidates: List[Tuple[Repository, str, str]] = [] for repo in selected_repos: - repo_identifier = get_repo_identifier(repo, all_repos) - repo_dir = get_repo_dir(repositories_base_dir, repo) - - if not os.path.exists(repo_dir): - print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.") + ident = get_repo_identifier(repo, all_repos) + rd = get_repo_dir(repositories_base_dir, repo) + if not os.path.exists(rd): + print(f"Repository directory '{rd}' not found for {ident}.") continue + candidates.append((repo, ident, rd)) - verified_info = repo.get("verified") - verified_ok, errors, _commit_hash, _signing_key = verify_repository( - repo, - repo_dir, - mode="pull", - no_verification=no_verification, - ) + if not candidates: + return - if not preview and not no_verification and verified_info and not verified_ok: - print(f"Warning: Verification failed for {repo_identifier}:") + verify_results = _verify_all(candidates, no_verification, jobs) + + approved: List[RepoRef] = [] + for ident, rd, has_verified_info, verified_ok, errors in verify_results: + if not preview and not no_verification and has_verified_info and not verified_ok: + print(f"Warning: Verification failed for {ident}:") for err in errors: print(f" - {err}") choice = input("Proceed with 'git pull'? (y/N): ").strip().lower() if choice != "y": continue + approved.append((ident, rd)) - try: - pull_args(extra_args, cwd=repo_dir, preview=preview) - except GitPullArgsError as exc: - # Keep behavior consistent with previous implementation: - # stop on first failure and propagate return code as generic failure. - print(str(exc)) - sys.exit(1) + run_on_repos( + approved, + lambda rd: _pull_one(rd, extra_args, preview), + jobs=jobs, + op_name="pull", + ) diff --git a/src/pkgmgr/actions/repository/push.py b/src/pkgmgr/actions/repository/push.py new file mode 100644 index 0000000..0a42d1f --- /dev/null +++ b/src/pkgmgr/actions/repository/push.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Tuple + +from pkgmgr.actions.repository._parallel import ( + resolve_repos, + run_on_repos, +) +from pkgmgr.core.git.commands import push_args, GitPushArgsError + +Repository = Dict[str, Any] + + +def _push_one(repo_dir: str, extra_args: List[str], preview: bool) -> Tuple[bool, str]: + try: + push_args(extra_args, cwd=repo_dir, preview=preview) + return (True, "") + except GitPushArgsError as exc: + return (False, str(exc)) + + +def push_in_parallel( + selected_repos: List[Repository], + repositories_base_dir: str, + all_repos: List[Repository], + extra_args: List[str], + preview: bool, + jobs: int = 1, +) -> None: + """ + Execute `git push` for each repository, optionally in parallel. + """ + repos = resolve_repos(selected_repos, repositories_base_dir, all_repos) + run_on_repos( + repos, + lambda rd: _push_one(rd, extra_args, preview), + jobs=jobs, + op_name="push", + ) diff --git a/src/pkgmgr/cli/proxy.py b/src/pkgmgr/cli/proxy.py index d218c4b..a4d1244 100644 --- a/src/pkgmgr/cli/proxy.py +++ b/src/pkgmgr/cli/proxy.py @@ -12,6 +12,7 @@ from pkgmgr.cli.context import CLIContext from pkgmgr.actions.repository.clone import clone_repos from pkgmgr.actions.proxy import exec_proxy_command from pkgmgr.actions.repository.pull import pull_with_verification +from pkgmgr.actions.repository.push import push_in_parallel from pkgmgr.core.repository.selected import get_selected_repos from pkgmgr.core.repository.dir import get_repo_dir @@ -177,6 +178,17 @@ def register_proxy_commands( default=False, help="Disable verification via commit/gpg", ) + if subcommand in ("pull", "push"): + parser.add_argument( + "-j", + "--jobs", + type=int, + default=min(os.cpu_count() or 4, 8), + help=( + f"Number of parallel {subcommand}s " + "(default: min(cpu_count, 8)). Use 1 for sequential." + ), + ) if subcommand == "clone": parser.add_argument( "--clone-mode", @@ -234,6 +246,16 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool: args.extra_args, args.no_verification, args.preview, + jobs=args.jobs, + ) + elif args.command == "push": + push_in_parallel( + selected, + ctx.repositories_base_dir, + ctx.all_repositories, + args.extra_args, + args.preview, + jobs=args.jobs, ) else: exec_proxy_command( diff --git a/src/pkgmgr/core/git/commands/__init__.py b/src/pkgmgr/core/git/commands/__init__.py index 94d6b56..16bc55d 100644 --- a/src/pkgmgr/core/git/commands/__init__.py +++ b/src/pkgmgr/core/git/commands/__init__.py @@ -19,6 +19,7 @@ from .pull import GitPullError, pull from .pull_args import GitPullArgsError, pull_args from .pull_ff_only import GitPullFfOnlyError, pull_ff_only from .push import GitPushError, push +from .push_args import GitPushArgsError, push_args from .push_upstream import GitPushUpstreamError, push_upstream from .set_remote_url import GitSetRemoteUrlError, set_remote_url from .tag_annotated import GitTagAnnotatedError, tag_annotated @@ -34,6 +35,7 @@ __all__ = [ "pull_ff_only", "merge_no_ff", "push", + "push_args", "commit", "delete_local_branch", "delete_remote_branch", @@ -56,6 +58,7 @@ __all__ = [ "GitPullFfOnlyError", "GitMergeError", "GitPushError", + "GitPushArgsError", "GitCommitError", "GitDeleteLocalBranchError", "GitDeleteRemoteBranchError", diff --git a/src/pkgmgr/core/git/commands/push_args.py b/src/pkgmgr/core/git/commands/push_args.py new file mode 100644 index 0000000..d7a897a --- /dev/null +++ b/src/pkgmgr/core/git/commands/push_args.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import List + +from ..errors import GitRunError, GitCommandError +from ..run import run + + +class GitPushArgsError(GitCommandError): + """Raised when `git push` with arbitrary args fails.""" + + +def push_args( + args: List[str] | None = None, + *, + cwd: str = ".", + preview: bool = False, +) -> None: + """ + Execute `git push` with caller-provided arguments. + + Examples: + [] -> git push + ["--force"] -> git push --force + ["origin", "main"] -> git push origin main + ["-u", "origin", "feature"] -> git push -u origin feature + """ + extra = args or [] + try: + run(["push", *extra], cwd=cwd, preview=preview) + except GitRunError as exc: + details = getattr(exc, "output", None) or getattr(exc, "stderr", None) or "" + raise GitPushArgsError( + ( + f"Failed to run `git push` with args={extra!r} " + f"in cwd={cwd!r}.\n{details}" + ).rstrip(), + cwd=cwd, + ) from exc