Fix GPG verification runtime handling
This commit is contained in:
@@ -51,6 +51,8 @@
|
|||||||
pyPkgs.pyyaml
|
pyPkgs.pyyaml
|
||||||
pyPkgs.jinja2
|
pyPkgs.jinja2
|
||||||
pyPkgs.pip
|
pyPkgs.pip
|
||||||
|
pkgs.git
|
||||||
|
pkgs.gnupg
|
||||||
];
|
];
|
||||||
|
|
||||||
doCheck = false;
|
doCheck = false;
|
||||||
@@ -87,6 +89,7 @@
|
|||||||
buildInputs = [
|
buildInputs = [
|
||||||
pythonWithDeps
|
pythonWithDeps
|
||||||
pkgs.git
|
pkgs.git
|
||||||
|
pkgs.gnupg
|
||||||
ansiblePkg
|
ansiblePkg
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ fi
|
|||||||
pacman -S --noconfirm --needed \
|
pacman -S --noconfirm --needed \
|
||||||
base-devel \
|
base-devel \
|
||||||
git \
|
git \
|
||||||
|
gnupg \
|
||||||
rsync \
|
rsync \
|
||||||
curl \
|
curl \
|
||||||
ca-certificates \
|
ca-certificates \
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ echo "[centos/dependencies] Installing CentOS build dependencies..."
|
|||||||
dnf -y update
|
dnf -y update
|
||||||
dnf -y install \
|
dnf -y install \
|
||||||
git \
|
git \
|
||||||
|
gnupg2 \
|
||||||
rsync \
|
rsync \
|
||||||
rpm-build \
|
rpm-build \
|
||||||
make \
|
make \
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
|
|||||||
debhelper \
|
debhelper \
|
||||||
dpkg-dev \
|
dpkg-dev \
|
||||||
git \
|
git \
|
||||||
|
gnupg \
|
||||||
rsync \
|
rsync \
|
||||||
bash \
|
bash \
|
||||||
curl \
|
curl \
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ echo "[fedora/dependencies] Installing Fedora build dependencies..."
|
|||||||
dnf -y update
|
dnf -y update
|
||||||
dnf -y install \
|
dnf -y install \
|
||||||
git \
|
git \
|
||||||
|
gnupg2 \
|
||||||
rsync \
|
rsync \
|
||||||
rpm-build \
|
rpm-build \
|
||||||
make \
|
make \
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
|
|||||||
debhelper \
|
debhelper \
|
||||||
dpkg-dev \
|
dpkg-dev \
|
||||||
git \
|
git \
|
||||||
|
gnupg \
|
||||||
tzdata \
|
tzdata \
|
||||||
lsb-release \
|
lsb-release \
|
||||||
rsync \
|
rsync \
|
||||||
|
|||||||
@@ -1,13 +1,33 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from ..errors import GitQueryError, GitRunError
|
import subprocess
|
||||||
from ..run import run
|
|
||||||
|
from ..errors import GitNotRepositoryError, GitQueryError
|
||||||
|
|
||||||
|
|
||||||
class GitLatestSigningKeyQueryError(GitQueryError):
|
class GitLatestSigningKeyQueryError(GitQueryError):
|
||||||
"""Raised when querying the latest commit signing key fails."""
|
"""Raised when querying the latest commit signing key fails."""
|
||||||
|
|
||||||
|
|
||||||
|
def _is_not_repository(stderr: str) -> bool:
|
||||||
|
return "not a git repository" in (stderr or "").lower()
|
||||||
|
|
||||||
|
|
||||||
|
def _looks_like_gpg_runtime_error(stderr: str) -> bool:
|
||||||
|
lowered = (stderr or "").lower()
|
||||||
|
markers = (
|
||||||
|
"cannot run gpg",
|
||||||
|
"can't check signature",
|
||||||
|
"no public key",
|
||||||
|
"failed to create temporary file",
|
||||||
|
"can't connect to the keyboxd",
|
||||||
|
"error opening key db",
|
||||||
|
"gpg failed",
|
||||||
|
"no such file or directory",
|
||||||
|
)
|
||||||
|
return any(marker in lowered for marker in markers)
|
||||||
|
|
||||||
|
|
||||||
def get_latest_signing_key(*, cwd: str = ".") -> str:
|
def get_latest_signing_key(*, cwd: str = ".") -> str:
|
||||||
"""
|
"""
|
||||||
Return the GPG signing key ID of the latest commit, via:
|
Return the GPG signing key ID of the latest commit, via:
|
||||||
@@ -17,9 +37,46 @@ def get_latest_signing_key(*, cwd: str = ".") -> str:
|
|||||||
Returns:
|
Returns:
|
||||||
The key id string (may be empty if commit is not signed).
|
The key id string (may be empty if commit is not signed).
|
||||||
"""
|
"""
|
||||||
|
cmd = ["git", "log", "-1", "--format=%GK"]
|
||||||
try:
|
try:
|
||||||
return run(["log", "-1", "--format=%GK"], cwd=cwd).strip()
|
result = subprocess.run(
|
||||||
except GitRunError as exc:
|
cmd,
|
||||||
|
cwd=cwd,
|
||||||
|
check=False,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
except OSError as exc:
|
||||||
raise GitLatestSigningKeyQueryError(
|
raise GitLatestSigningKeyQueryError(
|
||||||
"Failed to query latest signing key.",
|
"Failed to query latest signing key.\n"
|
||||||
|
f"Command: {' '.join(cmd)}\n"
|
||||||
|
f"Reason: {exc}"
|
||||||
) from exc
|
) from exc
|
||||||
|
|
||||||
|
stdout = (result.stdout or "").strip()
|
||||||
|
stderr = (result.stderr or "").strip()
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
if _is_not_repository(stderr):
|
||||||
|
raise GitNotRepositoryError(
|
||||||
|
f"Not a git repository: {cwd!r}\n"
|
||||||
|
f"Command: {' '.join(cmd)}\n"
|
||||||
|
f"STDERR:\n{stderr}"
|
||||||
|
)
|
||||||
|
raise GitLatestSigningKeyQueryError(
|
||||||
|
"Failed to query latest signing key.\n"
|
||||||
|
f"Command: {' '.join(cmd)}\n"
|
||||||
|
f"Exit code: {result.returncode}\n"
|
||||||
|
f"STDOUT:\n{stdout}\n"
|
||||||
|
f"STDERR:\n{stderr}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not stdout and stderr and _looks_like_gpg_runtime_error(stderr):
|
||||||
|
raise GitLatestSigningKeyQueryError(
|
||||||
|
"Failed to query latest signing key.\n"
|
||||||
|
f"Command: {' '.join(cmd)}\n"
|
||||||
|
f"STDERR:\n{stderr}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return stdout
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ def verify_repository(repo, repo_dir, mode="local", no_verification=False):
|
|||||||
|
|
||||||
commit_hash = ""
|
commit_hash = ""
|
||||||
signing_key = ""
|
signing_key = ""
|
||||||
|
signing_key_query_failed = False
|
||||||
|
|
||||||
# best-effort info collection
|
# best-effort info collection
|
||||||
try:
|
try:
|
||||||
@@ -59,6 +60,7 @@ def verify_repository(repo, repo_dir, mode="local", no_verification=False):
|
|||||||
except GitLatestSigningKeyQueryError as exc:
|
except GitLatestSigningKeyQueryError as exc:
|
||||||
error_details.append(str(exc))
|
error_details.append(str(exc))
|
||||||
signing_key = ""
|
signing_key = ""
|
||||||
|
signing_key_query_failed = True
|
||||||
|
|
||||||
commit_check_passed = True
|
commit_check_passed = True
|
||||||
gpg_check_passed = True
|
gpg_check_passed = True
|
||||||
@@ -78,9 +80,10 @@ def verify_repository(repo, repo_dir, mode="local", no_verification=False):
|
|||||||
if expected_gpg_keys:
|
if expected_gpg_keys:
|
||||||
if not signing_key:
|
if not signing_key:
|
||||||
gpg_check_passed = False
|
gpg_check_passed = False
|
||||||
error_details.append(
|
if not signing_key_query_failed:
|
||||||
f"Expected one of GPG keys: {expected_gpg_keys}, but no signing key was found."
|
error_details.append(
|
||||||
)
|
f"Expected one of GPG keys: {expected_gpg_keys}, but no signing key was found."
|
||||||
|
)
|
||||||
elif signing_key not in expected_gpg_keys:
|
elif signing_key not in expected_gpg_keys:
|
||||||
gpg_check_passed = False
|
gpg_check_passed = False
|
||||||
error_details.append(
|
error_details.append(
|
||||||
|
|||||||
@@ -0,0 +1,57 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def _find_repo_root() -> Path:
|
||||||
|
here = Path(__file__).resolve()
|
||||||
|
for parent in here.parents:
|
||||||
|
if (parent / "pyproject.toml").is_file() and (
|
||||||
|
parent / "src" / "pkgmgr"
|
||||||
|
).is_dir():
|
||||||
|
return parent
|
||||||
|
raise RuntimeError(
|
||||||
|
"Could not determine repository root for pkgmgr integration test"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGitVerificationRuntimeDependencies(unittest.TestCase):
|
||||||
|
def test_flake_app_includes_git_and_gpg_runtime_tools(self) -> None:
|
||||||
|
repo_root = _find_repo_root()
|
||||||
|
flake_text = (repo_root / "flake.nix").read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
self.assertIn("pkgs.git", flake_text)
|
||||||
|
self.assertIn("pkgs.gnupg", flake_text)
|
||||||
|
|
||||||
|
def test_distro_dependency_scripts_install_gpg_tools(self) -> None:
|
||||||
|
repo_root = _find_repo_root()
|
||||||
|
expected_packages = {
|
||||||
|
"arch": "gnupg",
|
||||||
|
"debian": "gnupg",
|
||||||
|
"ubuntu": "gnupg",
|
||||||
|
"fedora": "gnupg2",
|
||||||
|
"centos": "gnupg2",
|
||||||
|
}
|
||||||
|
|
||||||
|
missing: list[str] = []
|
||||||
|
for distro, package_name in expected_packages.items():
|
||||||
|
script_path = (
|
||||||
|
repo_root / "scripts" / "installation" / distro / "dependencies.sh"
|
||||||
|
)
|
||||||
|
content = script_path.read_text(encoding="utf-8")
|
||||||
|
if not re.search(rf"\b{re.escape(package_name)}\b", content):
|
||||||
|
missing.append(
|
||||||
|
f"{distro}: expected package {package_name} in {script_path}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if missing:
|
||||||
|
self.fail(
|
||||||
|
"Git signature verification runtime dependencies are incomplete:\n"
|
||||||
|
+ "\n".join(f" - {item}" for item in missing)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -1,7 +1,8 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
import subprocess
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from pkgmgr.core.git.errors import GitNotRepositoryError, GitRunError
|
from pkgmgr.core.git.errors import GitNotRepositoryError
|
||||||
from pkgmgr.core.git.queries.get_latest_signing_key import (
|
from pkgmgr.core.git.queries.get_latest_signing_key import (
|
||||||
GitLatestSigningKeyQueryError,
|
GitLatestSigningKeyQueryError,
|
||||||
get_latest_signing_key,
|
get_latest_signing_key,
|
||||||
@@ -10,25 +11,53 @@ from pkgmgr.core.git.queries.get_latest_signing_key import (
|
|||||||
|
|
||||||
class TestGetLatestSigningKey(unittest.TestCase):
|
class TestGetLatestSigningKey(unittest.TestCase):
|
||||||
@patch(
|
@patch(
|
||||||
"pkgmgr.core.git.queries.get_latest_signing_key.run",
|
"pkgmgr.core.git.queries.get_latest_signing_key.subprocess.run",
|
||||||
return_value="ABCDEF1234567890\n",
|
return_value=subprocess.CompletedProcess(
|
||||||
|
args=["git", "log", "-1", "--format=%GK"],
|
||||||
|
returncode=0,
|
||||||
|
stdout="ABCDEF1234567890\n",
|
||||||
|
stderr="",
|
||||||
|
),
|
||||||
)
|
)
|
||||||
def test_strips_output(self, _mock_run) -> None:
|
def test_strips_output(self, _mock_run) -> None:
|
||||||
out = get_latest_signing_key(cwd="/tmp/repo")
|
out = get_latest_signing_key(cwd="/tmp/repo")
|
||||||
self.assertEqual(out, "ABCDEF1234567890")
|
self.assertEqual(out, "ABCDEF1234567890")
|
||||||
|
|
||||||
@patch(
|
@patch(
|
||||||
"pkgmgr.core.git.queries.get_latest_signing_key.run",
|
"pkgmgr.core.git.queries.get_latest_signing_key.subprocess.run",
|
||||||
side_effect=GitRunError("boom"),
|
return_value=subprocess.CompletedProcess(
|
||||||
|
args=["git", "log", "-1", "--format=%GK"],
|
||||||
|
returncode=1,
|
||||||
|
stdout="",
|
||||||
|
stderr="boom",
|
||||||
|
),
|
||||||
)
|
)
|
||||||
def test_wraps_git_run_error(self, _mock_run) -> None:
|
def test_wraps_git_run_error(self, _mock_run) -> None:
|
||||||
with self.assertRaises(GitLatestSigningKeyQueryError):
|
with self.assertRaisesRegex(GitLatestSigningKeyQueryError, "boom"):
|
||||||
get_latest_signing_key(cwd="/tmp/repo")
|
get_latest_signing_key(cwd="/tmp/repo")
|
||||||
|
|
||||||
@patch(
|
@patch(
|
||||||
"pkgmgr.core.git.queries.get_latest_signing_key.run",
|
"pkgmgr.core.git.queries.get_latest_signing_key.subprocess.run",
|
||||||
side_effect=GitNotRepositoryError("no repo"),
|
return_value=subprocess.CompletedProcess(
|
||||||
|
args=["git", "log", "-1", "--format=%GK"],
|
||||||
|
returncode=128,
|
||||||
|
stdout="",
|
||||||
|
stderr="fatal: not a git repository",
|
||||||
|
),
|
||||||
)
|
)
|
||||||
def test_does_not_catch_not_repository_error(self, _mock_run) -> None:
|
def test_does_not_catch_not_repository_error(self, _mock_run) -> None:
|
||||||
with self.assertRaises(GitNotRepositoryError):
|
with self.assertRaises(GitNotRepositoryError):
|
||||||
get_latest_signing_key(cwd="/tmp/no-repo")
|
get_latest_signing_key(cwd="/tmp/no-repo")
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"pkgmgr.core.git.queries.get_latest_signing_key.subprocess.run",
|
||||||
|
return_value=subprocess.CompletedProcess(
|
||||||
|
args=["git", "log", "-1", "--format=%GK"],
|
||||||
|
returncode=0,
|
||||||
|
stdout="",
|
||||||
|
stderr="error: cannot run gpg: No such file or directory",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
def test_raises_when_git_reports_gpg_runtime_error(self, _mock_run) -> None:
|
||||||
|
with self.assertRaisesRegex(GitLatestSigningKeyQueryError, "cannot run gpg"):
|
||||||
|
get_latest_signing_key(cwd="/tmp/repo")
|
||||||
|
|||||||
@@ -77,6 +77,23 @@ class TestVerifyRepository(unittest.TestCase):
|
|||||||
self.assertEqual(commit, "")
|
self.assertEqual(commit, "")
|
||||||
self.assertEqual(key, "")
|
self.assertEqual(key, "")
|
||||||
|
|
||||||
|
def test_verified_gpg_query_error_does_not_add_missing_key_fallback(self) -> None:
|
||||||
|
repo = {"verified": {"commit": None, "gpg_keys": ["ABC"]}}
|
||||||
|
with (
|
||||||
|
patch("pkgmgr.core.repository.verify.get_head_commit", return_value=""),
|
||||||
|
patch(
|
||||||
|
"pkgmgr.core.repository.verify.get_latest_signing_key",
|
||||||
|
side_effect=GitLatestSigningKeyQueryError("cannot run gpg"),
|
||||||
|
),
|
||||||
|
):
|
||||||
|
ok, errors, commit, key = verify_repository(repo, "/tmp/repo", mode="local")
|
||||||
|
|
||||||
|
self.assertFalse(ok)
|
||||||
|
self.assertIn("cannot run gpg", " ".join(errors))
|
||||||
|
self.assertFalse(any("no signing key was found" in e for e in errors))
|
||||||
|
self.assertEqual(commit, "")
|
||||||
|
self.assertEqual(key, "")
|
||||||
|
|
||||||
def test_strict_pull_collects_remote_error_message(self) -> None:
|
def test_strict_pull_collects_remote_error_message(self) -> None:
|
||||||
repo = {"verified": {"commit": "expected", "gpg_keys": None}}
|
repo = {"verified": {"commit": "expected", "gpg_keys": None}}
|
||||||
with (
|
with (
|
||||||
|
|||||||
Reference in New Issue
Block a user