mirror of
https://github.com/kevinveenbirkenbach/docker-volume-backup.git
synced 2026-01-11 17:36:53 +00:00
fix(seed): handle empty databases.csv and add unit tests
- Gracefully handle empty databases.csv by creating header columns and emitting a warning - Add _empty_df() helper for consistent DataFrame initialization - Add unit tests for baudolo-seed including empty-file regression case - Apply minor formatting fixes across backup and e2e test files https://chatgpt.com/share/69628f0b-8744-800f-b08d-2633e05167da
This commit is contained in:
@@ -72,6 +72,7 @@ def requires_stop(containers: list[str], images_no_stop_required: list[str]) ->
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def backup_mariadb_or_postgres(
|
def backup_mariadb_or_postgres(
|
||||||
*,
|
*,
|
||||||
container: str,
|
container: str,
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ def parse_args() -> argparse.Namespace:
|
|||||||
action="store_true",
|
action="store_true",
|
||||||
help="Do not restart containers after backup",
|
help="Do not restart containers after backup",
|
||||||
)
|
)
|
||||||
|
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--dump-only-sql",
|
"--dump-only-sql",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
|
|||||||
@@ -52,7 +52,9 @@ def _atomic_write_cmd(cmd: str, out_file: str) -> None:
|
|||||||
execute_shell_command(f"mv {tmp} {out_file}")
|
execute_shell_command(f"mv {tmp} {out_file}")
|
||||||
|
|
||||||
|
|
||||||
def fallback_pg_dumpall(container: str, username: str, password: str, out_file: str) -> None:
|
def fallback_pg_dumpall(
|
||||||
|
container: str, username: str, password: str, out_file: str
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Perform a full Postgres cluster dump using pg_dumpall.
|
Perform a full Postgres cluster dump using pg_dumpall.
|
||||||
"""
|
"""
|
||||||
@@ -103,9 +105,7 @@ def backup_database(
|
|||||||
"'*' is currently only supported for Postgres."
|
"'*' is currently only supported for Postgres."
|
||||||
)
|
)
|
||||||
|
|
||||||
cluster_file = os.path.join(
|
cluster_file = os.path.join(out_dir, f"{instance_name}.cluster.backup.sql")
|
||||||
out_dir, f"{instance_name}.cluster.backup.sql"
|
|
||||||
)
|
|
||||||
fallback_pg_dumpall(container, user, password, cluster_file)
|
fallback_pg_dumpall(container, user, password, cluster_file)
|
||||||
produced = True
|
produced = True
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -7,10 +7,11 @@ import re
|
|||||||
import sys
|
import sys
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
from pandas.errors import EmptyDataError
|
||||||
|
|
||||||
DB_NAME_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_-]*$")
|
DB_NAME_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_-]*$")
|
||||||
|
|
||||||
|
|
||||||
def _validate_database_value(value: Optional[str], *, instance: str) -> str:
|
def _validate_database_value(value: Optional[str], *, instance: str) -> str:
|
||||||
v = (value or "").strip()
|
v = (value or "").strip()
|
||||||
if v == "":
|
if v == "":
|
||||||
@@ -31,6 +32,11 @@ def _validate_database_value(value: Optional[str], *, instance: str) -> str:
|
|||||||
)
|
)
|
||||||
return v
|
return v
|
||||||
|
|
||||||
|
|
||||||
|
def _empty_df() -> pd.DataFrame:
|
||||||
|
return pd.DataFrame(columns=["instance", "database", "username", "password"])
|
||||||
|
|
||||||
|
|
||||||
def check_and_add_entry(
|
def check_and_add_entry(
|
||||||
file_path: str,
|
file_path: str,
|
||||||
instance: str,
|
instance: str,
|
||||||
@@ -48,17 +54,21 @@ def check_and_add_entry(
|
|||||||
database = _validate_database_value(database, instance=instance)
|
database = _validate_database_value(database, instance=instance)
|
||||||
|
|
||||||
if os.path.exists(file_path):
|
if os.path.exists(file_path):
|
||||||
df = pd.read_csv(
|
try:
|
||||||
file_path,
|
df = pd.read_csv(
|
||||||
sep=";",
|
file_path,
|
||||||
dtype=str,
|
sep=";",
|
||||||
keep_default_na=False,
|
dtype=str,
|
||||||
)
|
keep_default_na=False,
|
||||||
|
)
|
||||||
|
except EmptyDataError:
|
||||||
|
print(
|
||||||
|
f"WARNING: databases.csv exists but is empty: {file_path}. Creating header columns.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
df = _empty_df()
|
||||||
else:
|
else:
|
||||||
df = pd.DataFrame(
|
df = _empty_df()
|
||||||
columns=["instance", "database", "username", "password"]
|
|
||||||
)
|
|
||||||
|
|
||||||
mask = (df["instance"] == instance) & (df["database"] == database)
|
mask = (df["instance"] == instance) & (df["database"] == database)
|
||||||
|
|
||||||
if mask.any():
|
if mask.any():
|
||||||
|
|||||||
@@ -133,22 +133,28 @@ class TestE2EDumpOnlyFallbackToFiles(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def test_files_backup_exists_due_to_fallback(self) -> None:
|
def test_files_backup_exists_due_to_fallback(self) -> None:
|
||||||
p = backup_path(
|
p = (
|
||||||
self.backups_dir,
|
backup_path(
|
||||||
self.repo_name,
|
self.backups_dir,
|
||||||
self.version,
|
self.repo_name,
|
||||||
self.pg_volume,
|
self.version,
|
||||||
) / "files"
|
self.pg_volume,
|
||||||
|
)
|
||||||
|
/ "files"
|
||||||
|
)
|
||||||
self.assertTrue(p.is_dir(), f"Expected files backup dir at: {p}")
|
self.assertTrue(p.is_dir(), f"Expected files backup dir at: {p}")
|
||||||
|
|
||||||
def test_sql_dump_not_present(self) -> None:
|
def test_sql_dump_not_present(self) -> None:
|
||||||
# There should be no sql dumps because databases.csv had no matching entry.
|
# There should be no sql dumps because databases.csv had no matching entry.
|
||||||
sql_dir = backup_path(
|
sql_dir = (
|
||||||
self.backups_dir,
|
backup_path(
|
||||||
self.repo_name,
|
self.backups_dir,
|
||||||
self.version,
|
self.repo_name,
|
||||||
self.pg_volume,
|
self.version,
|
||||||
) / "sql"
|
self.pg_volume,
|
||||||
|
)
|
||||||
|
/ "sql"
|
||||||
|
)
|
||||||
# Could exist (dir created) in some edge cases, but should contain no *.sql dumps.
|
# Could exist (dir created) in some edge cases, but should contain no *.sql dumps.
|
||||||
if sql_dir.exists():
|
if sql_dir.exists():
|
||||||
dumps = list(sql_dir.glob("*.sql"))
|
dumps = list(sql_dir.glob("*.sql"))
|
||||||
|
|||||||
@@ -96,10 +96,10 @@ class TestE2EDumpOnlySqlMixedRun(unittest.TestCase):
|
|||||||
"sh",
|
"sh",
|
||||||
"-lc",
|
"-lc",
|
||||||
(
|
(
|
||||||
f'psql -U postgres -d {cls.pg_db} -c '
|
f"psql -U postgres -d {cls.pg_db} -c "
|
||||||
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
|
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
|
||||||
"INSERT INTO t(id,v) VALUES (1,'hello-db') "
|
"INSERT INTO t(id,v) VALUES (1,'hello-db') "
|
||||||
"ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;\""
|
'ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;"'
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
check=True,
|
check=True,
|
||||||
@@ -143,7 +143,9 @@ class TestE2EDumpOnlySqlMixedRun(unittest.TestCase):
|
|||||||
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
|
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
|
||||||
|
|
||||||
def test_db_volume_has_dump_and_no_files_dir(self) -> None:
|
def test_db_volume_has_dump_and_no_files_dir(self) -> None:
|
||||||
base = backup_path(self.backups_dir, self.repo_name, self.version, self.db_volume)
|
base = backup_path(
|
||||||
|
self.backups_dir, self.repo_name, self.version, self.db_volume
|
||||||
|
)
|
||||||
|
|
||||||
dumps = base / "sql"
|
dumps = base / "sql"
|
||||||
files = base / "files"
|
files = base / "files"
|
||||||
|
|||||||
@@ -99,10 +99,10 @@ class TestE2ESeedStarAndDbEntriesBackupPostgres(unittest.TestCase):
|
|||||||
"sh",
|
"sh",
|
||||||
"-lc",
|
"-lc",
|
||||||
(
|
(
|
||||||
f'psql -U {cls.pg_user} -d {cls.pg_db1} -c '
|
f"psql -U {cls.pg_user} -d {cls.pg_db1} -c "
|
||||||
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
|
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
|
||||||
"INSERT INTO t(id,v) VALUES (1,'hello-db1') "
|
"INSERT INTO t(id,v) VALUES (1,'hello-db1') "
|
||||||
"ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;\""
|
'ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;"'
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
check=True,
|
check=True,
|
||||||
@@ -115,10 +115,10 @@ class TestE2ESeedStarAndDbEntriesBackupPostgres(unittest.TestCase):
|
|||||||
"sh",
|
"sh",
|
||||||
"-lc",
|
"-lc",
|
||||||
(
|
(
|
||||||
f'psql -U {cls.pg_user} -d {cls.pg_db2} -c '
|
f"psql -U {cls.pg_user} -d {cls.pg_db2} -c "
|
||||||
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
|
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
|
||||||
"INSERT INTO t(id,v) VALUES (1,'hello-db2') "
|
"INSERT INTO t(id,v) VALUES (1,'hello-db2') "
|
||||||
"ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;\""
|
'ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;"'
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
check=True,
|
check=True,
|
||||||
@@ -132,7 +132,16 @@ class TestE2ESeedStarAndDbEntriesBackupPostgres(unittest.TestCase):
|
|||||||
instance = cls.pg_container
|
instance = cls.pg_container
|
||||||
|
|
||||||
# Seed star entry (pg_dumpall)
|
# Seed star entry (pg_dumpall)
|
||||||
run(["baudolo-seed", cls.databases_csv, instance, "*", cls.pg_user, cls.pg_password])
|
run(
|
||||||
|
[
|
||||||
|
"baudolo-seed",
|
||||||
|
cls.databases_csv,
|
||||||
|
instance,
|
||||||
|
"*",
|
||||||
|
cls.pg_user,
|
||||||
|
cls.pg_password,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
# Seed concrete DB entry (pg_dump)
|
# Seed concrete DB entry (pg_dump)
|
||||||
run(
|
run(
|
||||||
@@ -177,7 +186,9 @@ class TestE2ESeedStarAndDbEntriesBackupPostgres(unittest.TestCase):
|
|||||||
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
|
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
|
||||||
|
|
||||||
def test_db_volume_has_cluster_dump_and_concrete_db_dump_and_no_files(self) -> None:
|
def test_db_volume_has_cluster_dump_and_concrete_db_dump_and_no_files(self) -> None:
|
||||||
base = backup_path(self.backups_dir, self.repo_name, self.version, self.db_volume)
|
base = backup_path(
|
||||||
|
self.backups_dir, self.repo_name, self.version, self.db_volume
|
||||||
|
)
|
||||||
sql_dir = base / "sql"
|
sql_dir = base / "sql"
|
||||||
files_dir = base / "files"
|
files_dir = base / "files"
|
||||||
|
|
||||||
@@ -204,10 +215,14 @@ class TestE2ESeedStarAndDbEntriesBackupPostgres(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def test_non_db_volume_still_has_files_backup(self) -> None:
|
def test_non_db_volume_still_has_files_backup(self) -> None:
|
||||||
base = backup_path(self.backups_dir, self.repo_name, self.version, self.files_volume)
|
base = backup_path(
|
||||||
|
self.backups_dir, self.repo_name, self.version, self.files_volume
|
||||||
|
)
|
||||||
files_dir = base / "files"
|
files_dir = base / "files"
|
||||||
|
|
||||||
self.assertTrue(files_dir.exists(), f"Expected files dir for non-DB volume at: {files_dir}")
|
self.assertTrue(
|
||||||
|
files_dir.exists(), f"Expected files dir for non-DB volume at: {files_dir}"
|
||||||
|
)
|
||||||
|
|
||||||
marker = files_dir / "hello.txt"
|
marker = files_dir / "hello.txt"
|
||||||
self.assertTrue(marker.is_file(), f"Expected marker file at: {marker}")
|
self.assertTrue(marker.is_file(), f"Expected marker file at: {marker}")
|
||||||
|
|||||||
216
tests/unit/src/baudolo/seed/test_main.py
Normal file
216
tests/unit/src/baudolo/seed/test_main.py
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
# tests/unit/src/baudolo/seed/test_main.py
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from pandas.errors import EmptyDataError
|
||||||
|
|
||||||
|
import baudolo.seed.__main__ as seed_main
|
||||||
|
|
||||||
|
|
||||||
|
class TestSeedMain(unittest.TestCase):
|
||||||
|
@patch("baudolo.seed.__main__.pd.DataFrame")
|
||||||
|
def test_empty_df_creates_expected_columns(self, df_ctor: MagicMock) -> None:
|
||||||
|
seed_main._empty_df()
|
||||||
|
df_ctor.assert_called_once_with(
|
||||||
|
columns=["instance", "database", "username", "password"]
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_validate_database_value_rejects_empty(self) -> None:
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
seed_main._validate_database_value("", instance="x")
|
||||||
|
|
||||||
|
def test_validate_database_value_accepts_star(self) -> None:
|
||||||
|
self.assertEqual(seed_main._validate_database_value("*", instance="x"), "*")
|
||||||
|
|
||||||
|
def test_validate_database_value_rejects_nan(self) -> None:
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
seed_main._validate_database_value("nan", instance="x")
|
||||||
|
|
||||||
|
def test_validate_database_value_rejects_invalid_name(self) -> None:
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
seed_main._validate_database_value("bad name", instance="x")
|
||||||
|
|
||||||
|
@patch("baudolo.seed.__main__.os.path.exists", return_value=False)
|
||||||
|
@patch("baudolo.seed.__main__.pd.read_csv")
|
||||||
|
@patch("baudolo.seed.__main__._empty_df")
|
||||||
|
@patch("baudolo.seed.__main__.pd.concat")
|
||||||
|
def test_check_and_add_entry_file_missing_adds_entry(
|
||||||
|
self,
|
||||||
|
concat: MagicMock,
|
||||||
|
empty_df: MagicMock,
|
||||||
|
read_csv: MagicMock,
|
||||||
|
exists: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
df_existing = MagicMock(spec=pd.DataFrame)
|
||||||
|
series_mask = MagicMock()
|
||||||
|
series_mask.any.return_value = False
|
||||||
|
|
||||||
|
df_existing.__getitem__.return_value = series_mask # for df["instance"] etc.
|
||||||
|
empty_df.return_value = df_existing
|
||||||
|
|
||||||
|
df_out = MagicMock(spec=pd.DataFrame)
|
||||||
|
concat.return_value = df_out
|
||||||
|
|
||||||
|
seed_main.check_and_add_entry(
|
||||||
|
file_path="/tmp/databases.csv",
|
||||||
|
instance="inst",
|
||||||
|
database="db",
|
||||||
|
username="user",
|
||||||
|
password="pass",
|
||||||
|
)
|
||||||
|
|
||||||
|
read_csv.assert_not_called()
|
||||||
|
empty_df.assert_called_once()
|
||||||
|
concat.assert_called_once()
|
||||||
|
df_out.to_csv.assert_called_once_with(
|
||||||
|
"/tmp/databases.csv", sep=";", index=False
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("baudolo.seed.__main__.os.path.exists", return_value=True)
|
||||||
|
@patch("baudolo.seed.__main__.pd.read_csv", side_effect=EmptyDataError("empty"))
|
||||||
|
@patch("baudolo.seed.__main__._empty_df")
|
||||||
|
@patch("baudolo.seed.__main__.pd.concat")
|
||||||
|
@patch("baudolo.seed.__main__.print")
|
||||||
|
def test_check_and_add_entry_empty_file_warns_and_creates_columns_and_adds(
|
||||||
|
self,
|
||||||
|
print_: MagicMock,
|
||||||
|
concat: MagicMock,
|
||||||
|
empty_df: MagicMock,
|
||||||
|
read_csv: MagicMock,
|
||||||
|
exists: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Key regression test:
|
||||||
|
If file exists but is empty => warn, create header columns, then proceed.
|
||||||
|
"""
|
||||||
|
df_existing = MagicMock(spec=pd.DataFrame)
|
||||||
|
series_mask = MagicMock()
|
||||||
|
series_mask.any.return_value = False
|
||||||
|
|
||||||
|
# emulate df["instance"] and df["database"] usage
|
||||||
|
df_existing.__getitem__.return_value = series_mask
|
||||||
|
empty_df.return_value = df_existing
|
||||||
|
|
||||||
|
df_out = MagicMock(spec=pd.DataFrame)
|
||||||
|
concat.return_value = df_out
|
||||||
|
|
||||||
|
seed_main.check_and_add_entry(
|
||||||
|
file_path="/tmp/databases.csv",
|
||||||
|
instance="inst",
|
||||||
|
database="db",
|
||||||
|
username="user",
|
||||||
|
password="pass",
|
||||||
|
)
|
||||||
|
|
||||||
|
exists.assert_called_once_with("/tmp/databases.csv")
|
||||||
|
read_csv.assert_called_once()
|
||||||
|
empty_df.assert_called_once()
|
||||||
|
|
||||||
|
# warning was printed to stderr
|
||||||
|
self.assertTrue(print_.called)
|
||||||
|
args, kwargs = print_.call_args
|
||||||
|
self.assertIn("WARNING: databases.csv exists but is empty", args[0])
|
||||||
|
self.assertIn("file", kwargs)
|
||||||
|
self.assertEqual(kwargs["file"], seed_main.sys.stderr)
|
||||||
|
|
||||||
|
concat.assert_called_once()
|
||||||
|
df_out.to_csv.assert_called_once_with(
|
||||||
|
"/tmp/databases.csv", sep=";", index=False
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("baudolo.seed.__main__.os.path.exists", return_value=True)
|
||||||
|
@patch("baudolo.seed.__main__.pd.read_csv")
|
||||||
|
def test_check_and_add_entry_updates_existing_row(
|
||||||
|
self,
|
||||||
|
read_csv: MagicMock,
|
||||||
|
exists: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
df = MagicMock(spec=pd.DataFrame)
|
||||||
|
|
||||||
|
# mask.any() => True triggers update branch
|
||||||
|
mask = MagicMock()
|
||||||
|
mask.any.return_value = True
|
||||||
|
|
||||||
|
# df["instance"] etc => return something that supports comparisons;
|
||||||
|
# simplest: just return an object that makes mask flow work.
|
||||||
|
df.__getitem__.return_value = MagicMock()
|
||||||
|
# Force the computed mask to be our mask
|
||||||
|
# by making (df["instance"] == instance) & (df["database"] == database) return `mask`
|
||||||
|
left = MagicMock()
|
||||||
|
right = MagicMock()
|
||||||
|
left.__and__.return_value = mask
|
||||||
|
df.__getitem__.return_value.__eq__.side_effect = [left, right] # two == calls
|
||||||
|
|
||||||
|
read_csv.return_value = df
|
||||||
|
|
||||||
|
seed_main.check_and_add_entry(
|
||||||
|
file_path="/tmp/databases.csv",
|
||||||
|
instance="inst",
|
||||||
|
database="db",
|
||||||
|
username="user",
|
||||||
|
password="pass",
|
||||||
|
)
|
||||||
|
|
||||||
|
# update branch: df.loc[mask, ["username","password"]] = ...
|
||||||
|
# we can't easily assert the assignment, but we can assert .loc was accessed
|
||||||
|
self.assertTrue(hasattr(df, "loc"))
|
||||||
|
df.to_csv.assert_called_once_with("/tmp/databases.csv", sep=";", index=False)
|
||||||
|
|
||||||
|
@patch("baudolo.seed.__main__.check_and_add_entry")
|
||||||
|
@patch("baudolo.seed.__main__.argparse.ArgumentParser.parse_args")
|
||||||
|
def test_main_calls_check_and_add_entry(
|
||||||
|
self, parse_args: MagicMock, cae: MagicMock
|
||||||
|
) -> None:
|
||||||
|
ns = MagicMock()
|
||||||
|
ns.file = "/tmp/databases.csv"
|
||||||
|
ns.instance = "inst"
|
||||||
|
ns.database = "db"
|
||||||
|
ns.username = "user"
|
||||||
|
ns.password = "pass"
|
||||||
|
parse_args.return_value = ns
|
||||||
|
|
||||||
|
seed_main.main()
|
||||||
|
|
||||||
|
cae.assert_called_once_with(
|
||||||
|
file_path="/tmp/databases.csv",
|
||||||
|
instance="inst",
|
||||||
|
database="db",
|
||||||
|
username="user",
|
||||||
|
password="pass",
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch("baudolo.seed.__main__.sys.exit")
|
||||||
|
@patch("baudolo.seed.__main__.print")
|
||||||
|
@patch(
|
||||||
|
"baudolo.seed.__main__.check_and_add_entry", side_effect=RuntimeError("boom")
|
||||||
|
)
|
||||||
|
@patch("baudolo.seed.__main__.argparse.ArgumentParser.parse_args")
|
||||||
|
def test_main_exits_nonzero_on_error(
|
||||||
|
self,
|
||||||
|
parse_args: MagicMock,
|
||||||
|
cae: MagicMock,
|
||||||
|
print_: MagicMock,
|
||||||
|
exit_: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
ns = MagicMock()
|
||||||
|
ns.file = "/tmp/databases.csv"
|
||||||
|
ns.instance = "inst"
|
||||||
|
ns.database = "db"
|
||||||
|
ns.username = "user"
|
||||||
|
ns.password = "pass"
|
||||||
|
parse_args.return_value = ns
|
||||||
|
|
||||||
|
seed_main.main()
|
||||||
|
|
||||||
|
# prints error to stderr and exits with 1
|
||||||
|
self.assertTrue(print_.called)
|
||||||
|
_, kwargs = print_.call_args
|
||||||
|
self.assertEqual(kwargs.get("file"), seed_main.sys.stderr)
|
||||||
|
exit_.assert_called_once_with(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user