feat(pull,push): parallel execution via --jobs flag

Adds `pkgmgr pull -j N` and `pkgmgr push -j N` for concurrent operation
across repositories (default: min(cpu_count, 8), use 1 for sequential).
Verification in pull also parallelizes; interactive prompts and the
actual git command still run on the main thread. Shared parallel-runner
and repo-resolution helpers live in a new `_parallel.py` module.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-12 22:18:31 +02:00
parent 607102e7f8
commit 37fd2192a5
6 changed files with 273 additions and 26 deletions

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, Dict, List, Tuple
from pkgmgr.core.repository.dir import get_repo_dir
from pkgmgr.core.repository.identifier import get_repo_identifier
Repository = Dict[str, Any]
RepoRef = Tuple[str, str]
OpResult = Tuple[bool, str]
RepoOp = Callable[[str], OpResult]
def resolve_repos(
selected_repos: List[Repository],
repositories_base_dir: str,
all_repos: List[Repository],
) -> List[RepoRef]:
"""
Resolve ``(identifier, repo_dir)`` pairs for ``selected_repos``.
Repositories whose directory does not exist on disk are reported and
skipped, matching the prior behavior of pull/push handlers.
"""
resolved: List[RepoRef] = []
for repo in selected_repos:
ident = get_repo_identifier(repo, all_repos)
rd = get_repo_dir(repositories_base_dir, repo)
if not os.path.exists(rd):
print(f"Repository directory '{rd}' not found for {ident}.")
continue
resolved.append((ident, rd))
return resolved
def run_on_repos(
repos: List[RepoRef],
op: RepoOp,
*,
jobs: int,
op_name: str,
) -> None:
"""
Run ``op(repo_dir) -> (ok, msg)`` for each repo, optionally in parallel.
- ``jobs == 1``: serial, quiet on success, prints ``msg`` on failure.
- ``jobs > 1``: parallel via ThreadPoolExecutor, prints a banner plus
``[OK]``/``[FAIL]`` per repo and a final summary.
- Exits with status 1 if any operation failed.
"""
if not repos:
return
effective_jobs = max(1, min(jobs, len(repos)))
failed: List[Tuple[str, str]] = []
if effective_jobs == 1:
for ident, rd in repos:
ok, msg = op(rd)
if not ok:
print(msg)
failed.append((ident, msg))
else:
print(
f"[{op_name.upper()}] Running {len(repos)} {op_name}(s) with up to "
f"{effective_jobs} parallel jobs..."
)
with ThreadPoolExecutor(max_workers=effective_jobs) as executor:
futures = {executor.submit(op, rd): ident for ident, rd in repos}
for future in as_completed(futures):
ident = futures[future]
ok, msg = future.result()
if ok:
print(f"[OK] {ident}")
else:
print(f"[FAIL] {ident}")
for line in msg.splitlines():
print(f" {line}")
failed.append((ident, msg))
if failed:
if effective_jobs > 1:
print(
f"\n[SUMMARY] {len(failed)} of {len(repos)} {op_name}(s) failed:"
)
for ident, _msg in failed:
print(f" - {ident}")
sys.exit(1)

View File

@@ -1,17 +1,66 @@
from __future__ import annotations
import os
import sys
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Tuple
from pkgmgr.actions.repository._parallel import RepoRef, run_on_repos
from pkgmgr.core.git.commands import pull_args, GitPullArgsError
from pkgmgr.core.repository.dir import get_repo_dir
from pkgmgr.core.repository.identifier import get_repo_identifier
from pkgmgr.core.repository.dir import get_repo_dir
from pkgmgr.core.repository.verify import verify_repository
Repository = Dict[str, Any]
def _pull_one(repo_dir: str, extra_args: List[str], preview: bool) -> Tuple[bool, str]:
try:
pull_args(extra_args, cwd=repo_dir, preview=preview)
return (True, "")
except GitPullArgsError as exc:
return (False, str(exc))
def _verify_one(
repo: Repository,
repo_dir: str,
no_verification: bool,
) -> Tuple[bool, bool, List[str]]:
"""Returns (has_verified_info, verified_ok, errors)."""
verified_ok, errors, _commit, _key = verify_repository(
repo, repo_dir, mode="pull", no_verification=no_verification,
)
return (bool(repo.get("verified")), verified_ok, errors)
def _verify_all(
candidates: List[Tuple[Repository, str, str]],
no_verification: bool,
jobs: int,
) -> List[Tuple[str, str, bool, bool, List[str]]]:
"""
Verify all candidates (parallel if ``jobs > 1``), preserving input order.
Returns one tuple per candidate: ``(ident, repo_dir, has_verified_info,
verified_ok, errors)``.
"""
verify_jobs = max(1, min(jobs, len(candidates)))
if verify_jobs == 1:
return [
(ident, rd, *_verify_one(repo, rd, no_verification))
for repo, ident, rd in candidates
]
with ThreadPoolExecutor(max_workers=verify_jobs) as executor:
futures = [
executor.submit(_verify_one, repo, rd, no_verification)
for repo, _ident, rd in candidates
]
results = [f.result() for f in futures]
return [
(ident, rd, *res) for (_repo, ident, rd), res in zip(candidates, results)
]
def pull_with_verification(
selected_repos: List[Repository],
repositories_base_dir: str,
@@ -19,41 +68,45 @@ def pull_with_verification(
extra_args: List[str],
no_verification: bool,
preview: bool,
jobs: int = 1,
) -> None:
"""
Execute `git pull` for each repository with verification.
- If verification fails and verification is enabled, prompt user to continue.
- Uses core.git.commands.pull_args() (no raw subprocess usage).
- Verification (I/O-bound) runs in parallel when ``jobs > 1``.
- Interactive prompts for failed verifications are handled serially on the
main thread after parallel verification completes.
- Approved repos are then pulled in parallel when ``jobs > 1``.
- On any pull failure, prints a summary and exits with status 1.
"""
candidates: List[Tuple[Repository, str, str]] = []
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir, repo)
if not os.path.exists(repo_dir):
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
ident = get_repo_identifier(repo, all_repos)
rd = get_repo_dir(repositories_base_dir, repo)
if not os.path.exists(rd):
print(f"Repository directory '{rd}' not found for {ident}.")
continue
candidates.append((repo, ident, rd))
verified_info = repo.get("verified")
verified_ok, errors, _commit_hash, _signing_key = verify_repository(
repo,
repo_dir,
mode="pull",
no_verification=no_verification,
)
if not candidates:
return
if not preview and not no_verification and verified_info and not verified_ok:
print(f"Warning: Verification failed for {repo_identifier}:")
verify_results = _verify_all(candidates, no_verification, jobs)
approved: List[RepoRef] = []
for ident, rd, has_verified_info, verified_ok, errors in verify_results:
if not preview and not no_verification and has_verified_info and not verified_ok:
print(f"Warning: Verification failed for {ident}:")
for err in errors:
print(f" - {err}")
choice = input("Proceed with 'git pull'? (y/N): ").strip().lower()
if choice != "y":
continue
approved.append((ident, rd))
try:
pull_args(extra_args, cwd=repo_dir, preview=preview)
except GitPullArgsError as exc:
# Keep behavior consistent with previous implementation:
# stop on first failure and propagate return code as generic failure.
print(str(exc))
sys.exit(1)
run_on_repos(
approved,
lambda rd: _pull_one(rd, extra_args, preview),
jobs=jobs,
op_name="pull",
)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from typing import Any, Dict, List, Tuple
from pkgmgr.actions.repository._parallel import (
resolve_repos,
run_on_repos,
)
from pkgmgr.core.git.commands import push_args, GitPushArgsError
Repository = Dict[str, Any]
def _push_one(repo_dir: str, extra_args: List[str], preview: bool) -> Tuple[bool, str]:
try:
push_args(extra_args, cwd=repo_dir, preview=preview)
return (True, "")
except GitPushArgsError as exc:
return (False, str(exc))
def push_in_parallel(
selected_repos: List[Repository],
repositories_base_dir: str,
all_repos: List[Repository],
extra_args: List[str],
preview: bool,
jobs: int = 1,
) -> None:
"""
Execute `git push` for each repository, optionally in parallel.
"""
repos = resolve_repos(selected_repos, repositories_base_dir, all_repos)
run_on_repos(
repos,
lambda rd: _push_one(rd, extra_args, preview),
jobs=jobs,
op_name="push",
)

View File

@@ -12,6 +12,7 @@ from pkgmgr.cli.context import CLIContext
from pkgmgr.actions.repository.clone import clone_repos
from pkgmgr.actions.proxy import exec_proxy_command
from pkgmgr.actions.repository.pull import pull_with_verification
from pkgmgr.actions.repository.push import push_in_parallel
from pkgmgr.core.repository.selected import get_selected_repos
from pkgmgr.core.repository.dir import get_repo_dir
@@ -177,6 +178,17 @@ def register_proxy_commands(
default=False,
help="Disable verification via commit/gpg",
)
if subcommand in ("pull", "push"):
parser.add_argument(
"-j",
"--jobs",
type=int,
default=min(os.cpu_count() or 4, 8),
help=(
f"Number of parallel {subcommand}s "
"(default: min(cpu_count, 8)). Use 1 for sequential."
),
)
if subcommand == "clone":
parser.add_argument(
"--clone-mode",
@@ -234,6 +246,16 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
args.extra_args,
args.no_verification,
args.preview,
jobs=args.jobs,
)
elif args.command == "push":
push_in_parallel(
selected,
ctx.repositories_base_dir,
ctx.all_repositories,
args.extra_args,
args.preview,
jobs=args.jobs,
)
else:
exec_proxy_command(

View File

@@ -19,6 +19,7 @@ from .pull import GitPullError, pull
from .pull_args import GitPullArgsError, pull_args
from .pull_ff_only import GitPullFfOnlyError, pull_ff_only
from .push import GitPushError, push
from .push_args import GitPushArgsError, push_args
from .push_upstream import GitPushUpstreamError, push_upstream
from .set_remote_url import GitSetRemoteUrlError, set_remote_url
from .tag_annotated import GitTagAnnotatedError, tag_annotated
@@ -34,6 +35,7 @@ __all__ = [
"pull_ff_only",
"merge_no_ff",
"push",
"push_args",
"commit",
"delete_local_branch",
"delete_remote_branch",
@@ -56,6 +58,7 @@ __all__ = [
"GitPullFfOnlyError",
"GitMergeError",
"GitPushError",
"GitPushArgsError",
"GitCommitError",
"GitDeleteLocalBranchError",
"GitDeleteRemoteBranchError",

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from typing import List
from ..errors import GitRunError, GitCommandError
from ..run import run
class GitPushArgsError(GitCommandError):
"""Raised when `git push` with arbitrary args fails."""
def push_args(
args: List[str] | None = None,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Execute `git push` with caller-provided arguments.
Examples:
[] -> git push
["--force"] -> git push --force
["origin", "main"] -> git push origin main
["-u", "origin", "feature"] -> git push -u origin feature
"""
extra = args or []
try:
run(["push", *extra], cwd=cwd, preview=preview)
except GitRunError as exc:
details = getattr(exc, "output", None) or getattr(exc, "stderr", None) or ""
raise GitPushArgsError(
(
f"Failed to run `git push` with args={extra!r} "
f"in cwd={cwd!r}.\n{details}"
).rstrip(),
cwd=cwd,
) from exc