6 Commits
v1.1.0 ... main

Author SHA1 Message Date
e4bc075474 Release version 1.2.0 2025-12-29 11:46:39 +01:00
f3ef86a444 feat(backup): stricter databases.csv semantics + atomic SQL dumps
- read databases.csv with stable types (dtype=str, keep_default_na=False)
- validate database field: require '*' or concrete name (no empty/NaN)
- support Postgres cluster dumps via '*' entries (pg_dumpall)
- write SQL dumps atomically to avoid partial/empty files
- early-skip fully ignored volumes before creating backup directories
- update seed CLI to enforce new contract and update by (instance,database)
- adjust tests: sql dir naming + add E2E coverage for early-skip and '*' seeding
2025-12-29 11:39:57 +01:00
c01ab55f2d test(e2e): add dump-only-sql mixed-run + CLI contract coverage
- rename dump-only flag to --dump-only-sql across docs and tests
- update backup logic: skip files/ only for DB volumes when dumps succeed; fallback to files when dumps fail
- extend e2e helpers to support dump_only_sql
- add e2e mixed-run regression test (DB dump => no files/, non-DB => files/)
- add e2e CLI/argparse contract test (--dump-only-sql present, --dump-only rejected)
- fix e2e files test to expect file backups for non-DB volumes in dump-only-sql mode and verify restore
- update changelog + README flag table

https://chatgpt.com/share/69522d9c-ce08-800f-9070-71df3bd779ae
2025-12-29 08:28:23 +01:00
e3cdfd6fc4 Release version 1.1.1 2025-12-28 22:52:31 +01:00
df32671cec fix(backup): fallback to file backup in dump-only mode when no DB dump is possible
- Change DB backup helpers to return whether a dump was actually produced
- Detect DB containers without successful dumps in --dump-only mode
- Fallback to file backups with a warning instead of skipping silently
- Refactor DB dump logic to return boolean status
- Add E2E test covering dump-only fallback when databases.csv entry is missing

https://chatgpt.com/share/6951a659-2b0c-800f-aafa-3e89ae1eb697
2025-12-28 22:51:12 +01:00
d563dce20f Ignored dist 2025-12-28 22:19:19 +01:00
20 changed files with 1162 additions and 140 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__ __pycache__
artifacts/ artifacts/
*.egg-info *.egg-info
dist/

View File

@@ -1,3 +1,18 @@
## [1.2.0] - 2025-12-29
* * Introduced **`--dump-only-sql`** mode for reliable, SQL-only database backups (replaces `--dump-only`).
* Database configuration in `databases.csv` is now **strict and explicit** (`*` or concrete database name only).
* **PostgreSQL cluster backups** are supported via `*`.
* SQL dumps are written **atomically** to avoid corrupted or empty files.
* Backups are **smarter and faster**: ignored volumes are skipped early, file backups run only when needed.
* Improved reliability through expanded end-to-end tests and safer defaults.
## [1.1.1] - 2025-12-28
* * **Backup:** In ***--dump-only-sql*** mode, fall back to file backups with a warning when no database dump can be produced (e.g. missing `databases.csv` entry).
## [1.1.0] - 2025-12-28 ## [1.1.0] - 2025-12-28
* * **Backup:** Log a warning and skip database dumps when no databases.csv entry is present instead of raising an exception; introduce module-level logging and apply formatting cleanups across backup/restore code and tests. * * **Backup:** Log a warning and skip database dumps when no databases.csv entry is present instead of raising an exception; introduce module-level logging and apply formatting cleanups across backup/restore code and tests.

View File

@@ -134,7 +134,7 @@ baudolo \
| Flag | Description | | Flag | Description |
| --------------- | ------------------------------------------- | | --------------- | ------------------------------------------- |
| `--everything` | Always stop containers and re-run rsync | | `--everything` | Always stop containers and re-run rsync |
| `--dump-only` | Only create SQL dumps, skip file backups | | `--dump-only-sql`| Skip file backups only for DB volumes when dumps succeed; non-DB volumes are still backed up; fallback to files if no dump. |
| `--shutdown` | Do not restart containers after backup | | `--shutdown` | Do not restart containers after backup |
| `--backups-dir` | Backup root directory (default: `/Backups`) | | `--backups-dir` | Backup root directory (default: `/Backups`) |
| `--repo-name` | Backup namespace under machine hash | | `--repo-name` | Backup namespace under machine hash |

Binary file not shown.

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "backup-docker-to-local" name = "backup-docker-to-local"
version = "1.1.0" version = "1.2.0"
description = "Backup Docker volumes to local with rsync and optional DB dumps." description = "Backup Docker volumes to local with rsync and optional DB dumps."
readme = "README.md" readme = "README.md"
requires-python = ">=3.9" requires-python = ">=3.9"

View File

@@ -72,28 +72,27 @@ def requires_stop(containers: list[str], images_no_stop_required: list[str]) ->
return True return True
return False return False
def backup_mariadb_or_postgres( def backup_mariadb_or_postgres(
*, *,
container: str, container: str,
volume_dir: str, volume_dir: str,
databases_df: "pandas.DataFrame", databases_df: "pandas.DataFrame",
database_containers: list[str], database_containers: list[str],
) -> bool: ) -> tuple[bool, bool]:
""" """
Returns True if the container is a DB container we handled. Returns (is_db_container, dumped_any)
""" """
for img in ["mariadb", "postgres"]: for img in ["mariadb", "postgres"]:
if has_image(container, img): if has_image(container, img):
backup_database( dumped = backup_database(
container=container, container=container,
volume_dir=volume_dir, volume_dir=volume_dir,
db_type=img, db_type=img,
databases_df=databases_df, databases_df=databases_df,
database_containers=database_containers, database_containers=database_containers,
) )
return True return True, dumped
return False return False, False
def _backup_dumps_for_volume( def _backup_dumps_for_volume(
@@ -102,21 +101,26 @@ def _backup_dumps_for_volume(
vol_dir: str, vol_dir: str,
databases_df: "pandas.DataFrame", databases_df: "pandas.DataFrame",
database_containers: list[str], database_containers: list[str],
) -> bool: ) -> tuple[bool, bool]:
""" """
Create DB dumps for any mariadb/postgres containers attached to this volume. Returns (found_db_container, dumped_any)
Returns True if at least one dump was produced.
""" """
found_db = False
dumped_any = False dumped_any = False
for c in containers: for c in containers:
if backup_mariadb_or_postgres( is_db, dumped = backup_mariadb_or_postgres(
container=c, container=c,
volume_dir=vol_dir, volume_dir=vol_dir,
databases_df=databases_df, databases_df=databases_df,
database_containers=database_containers, database_containers=database_containers,
): )
if is_db:
found_db = True
if dumped:
dumped_any = True dumped_any = True
return dumped_any
return found_db, dumped_any
def main() -> int: def main() -> int:
@@ -128,7 +132,12 @@ def main() -> int:
versions_dir = os.path.join(args.backups_dir, machine_id, args.repo_name) versions_dir = os.path.join(args.backups_dir, machine_id, args.repo_name)
version_dir = create_version_directory(versions_dir, backup_time) version_dir = create_version_directory(versions_dir, backup_time)
databases_df = pandas.read_csv(args.databases_csv, sep=";") # IMPORTANT:
# - keep_default_na=False prevents empty fields from turning into NaN
# - dtype=str keeps all columns stable for comparisons/validation
databases_df = pandas.read_csv(
args.databases_csv, sep=";", keep_default_na=False, dtype=str
)
print("💾 Start volume backups...", flush=True) print("💾 Start volume backups...", flush=True)
@@ -136,27 +145,36 @@ def main() -> int:
print(f"Start backup routine for volume: {volume_name}", flush=True) print(f"Start backup routine for volume: {volume_name}", flush=True)
containers = containers_using_volume(volume_name) containers = containers_using_volume(volume_name)
# EARLY SKIP: if all linked containers are ignored, do not create any dirs
if volume_is_fully_ignored(containers, args.images_no_backup_required):
print(
f"Skipping volume '{volume_name}' entirely (all linked containers are ignored).",
flush=True,
)
continue
vol_dir = create_volume_directory(version_dir, volume_name) vol_dir = create_volume_directory(version_dir, volume_name)
# Old behavior: DB dumps are additional to file backups. found_db, dumped_any = _backup_dumps_for_volume(
_backup_dumps_for_volume(
containers=containers, containers=containers,
vol_dir=vol_dir, vol_dir=vol_dir,
databases_df=databases_df, databases_df=databases_df,
database_containers=args.database_containers, database_containers=args.database_containers,
) )
# dump-only: skip ALL file rsync backups # dump-only-sql logic:
if args.dump_only: if args.dump_only_sql:
continue if found_db:
if not dumped_any:
# skip file backup if all linked containers are ignored
if volume_is_fully_ignored(containers, args.images_no_backup_required):
print( print(
f"Skipping file backup for volume '{volume_name}' (all linked containers are ignored).", f"WARNING: dump-only-sql requested but no DB dump was produced for DB volume '{volume_name}'. Falling back to file backup.",
flush=True, flush=True,
) )
# fall through to file backup below
else:
# DB volume successfully dumped -> skip file backup
continue continue
# Non-DB volume -> always do file backup (fall through)
if args.everything: if args.everything:
# "everything": always do pre-rsync, then stop + rsync again # "everything": always do pre-rsync, then stop + rsync again

View File

@@ -68,10 +68,15 @@ def parse_args() -> argparse.Namespace:
action="store_true", action="store_true",
help="Do not restart containers after backup", help="Do not restart containers after backup",
) )
p.add_argument(
"--dump-only",
action="store_true",
help="Only create DB dumps (skip ALL file rsync backups)",
)
p.add_argument(
"--dump-only-sql",
action="store_true",
help=(
"Create database dumps only for DB volumes. "
"File backups are skipped for DB volumes if a dump succeeds, "
"but non-DB volumes are still backed up. "
"If a DB dump cannot be produced, baudolo falls back to a file backup."
),
)
return p.parse_args() return p.parse_args()

View File

@@ -3,9 +3,10 @@ from __future__ import annotations
import os import os
import pathlib import pathlib
import re import re
import logging
from typing import Optional
import pandas import pandas
import logging
from .shell import BackupException, execute_shell_command from .shell import BackupException, execute_shell_command
@@ -13,19 +14,53 @@ log = logging.getLogger(__name__)
def get_instance(container: str, database_containers: list[str]) -> str: def get_instance(container: str, database_containers: list[str]) -> str:
"""
Derive a stable instance name from the container name.
"""
if container in database_containers: if container in database_containers:
return container return container
return re.split(r"(_|-)(database|db|postgres)", container)[0] return re.split(r"(_|-)(database|db|postgres)", container)[0]
def fallback_pg_dumpall( def _validate_database_value(value: Optional[str], *, instance: str) -> str:
container: str, username: str, password: str, out_file: str """
) -> None: Enforce explicit database semantics:
- "*" => dump ALL databases (cluster dump for Postgres)
- "<name>" => dump exactly this database
- "" => invalid configuration (would previously result in NaN / nan.backup.sql)
"""
v = (value or "").strip()
if v == "":
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': "
"column 'database' must be '*' or a concrete database name (not empty)."
)
return v
def _atomic_write_cmd(cmd: str, out_file: str) -> None:
"""
Write dump output atomically:
- write to <file>.tmp
- rename to <file> only on success
This prevents empty or partial dump files from being treated as valid backups.
"""
tmp = f"{out_file}.tmp"
execute_shell_command(f"{cmd} > {tmp}")
execute_shell_command(f"mv {tmp} {out_file}")
def fallback_pg_dumpall(container: str, username: str, password: str, out_file: str) -> None:
"""
Perform a full Postgres cluster dump using pg_dumpall.
"""
cmd = ( cmd = (
f"PGPASSWORD={password} docker exec -i {container} " f"PGPASSWORD={password} docker exec -i {container} "
f"pg_dumpall -U {username} -h localhost > {out_file}" f"pg_dumpall -U {username} -h localhost"
) )
execute_shell_command(cmd) _atomic_write_cmd(cmd, out_file)
def backup_database( def backup_database(
@@ -35,48 +70,75 @@ def backup_database(
db_type: str, db_type: str,
databases_df: "pandas.DataFrame", databases_df: "pandas.DataFrame",
database_containers: list[str], database_containers: list[str],
) -> None: ) -> bool:
"""
Backup databases for a given DB container.
Returns True if at least one dump was produced.
"""
instance_name = get_instance(container, database_containers) instance_name = get_instance(container, database_containers)
entries = databases_df.loc[databases_df["instance"] == instance_name]
entries = databases_df[databases_df["instance"] == instance_name]
if entries.empty: if entries.empty:
log.warning("No entry found for instance '%s'", instance_name) log.debug("No database entries for instance '%s'", instance_name)
return return False
out_dir = os.path.join(volume_dir, "sql") out_dir = os.path.join(volume_dir, "sql")
pathlib.Path(out_dir).mkdir(parents=True, exist_ok=True) pathlib.Path(out_dir).mkdir(parents=True, exist_ok=True)
for row in entries.iloc: produced = False
db_name = row["database"]
user = row["username"]
password = row["password"]
for row in entries.itertuples(index=False):
raw_db = getattr(row, "database", "")
user = (getattr(row, "username", "") or "").strip()
password = (getattr(row, "password", "") or "").strip()
db_value = _validate_database_value(raw_db, instance=instance_name)
# Explicit: dump ALL databases
if db_value == "*":
if db_type != "postgres":
raise ValueError(
f"databases.csv entry for instance '{instance_name}': "
"'*' is currently only supported for Postgres."
)
cluster_file = os.path.join(
out_dir, f"{instance_name}.cluster.backup.sql"
)
fallback_pg_dumpall(container, user, password, cluster_file)
produced = True
continue
# Concrete database dump
db_name = db_value
dump_file = os.path.join(out_dir, f"{db_name}.backup.sql") dump_file = os.path.join(out_dir, f"{db_name}.backup.sql")
if db_type == "mariadb": if db_type == "mariadb":
cmd = ( cmd = (
f"docker exec {container} /usr/bin/mariadb-dump " f"docker exec {container} /usr/bin/mariadb-dump "
f"-u {user} -p{password} {db_name} > {dump_file}" f"-u {user} -p{password} {db_name}"
) )
execute_shell_command(cmd) _atomic_write_cmd(cmd, dump_file)
produced = True
continue continue
if db_type == "postgres": if db_type == "postgres":
cluster_file = os.path.join(out_dir, f"{instance_name}.cluster.backup.sql")
if not db_name:
fallback_pg_dumpall(container, user, password, cluster_file)
return
try: try:
cmd = ( cmd = (
f"PGPASSWORD={password} docker exec -i {container} " f"PGPASSWORD={password} docker exec -i {container} "
f"pg_dump -U {user} -d {db_name} -h localhost > {dump_file}" f"pg_dump -U {user} -d {db_name} -h localhost"
) )
execute_shell_command(cmd) _atomic_write_cmd(cmd, dump_file)
produced = True
except BackupException as e: except BackupException as e:
print(f"pg_dump failed: {e}", flush=True) # Explicit DB dump failed -> hard error
print( raise BackupException(
f"Falling back to pg_dumpall for instance '{instance_name}'", f"Postgres dump failed for instance '{instance_name}', "
flush=True, f"database '{db_name}'. This database was explicitly configured "
"and therefore must succeed.\n"
f"{e}"
) )
fallback_pg_dumpall(container, user, password, cluster_file)
continue continue
return produced

View File

@@ -1,67 +1,106 @@
import pandas as pd #!/usr/bin/env python3
from __future__ import annotations
import argparse import argparse
import os import os
import re
import sys
import pandas as pd
from typing import Optional
def check_and_add_entry(file_path, instance, database, username, password): DB_NAME_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_-]*$")
# Check if the file exists and is not empty
if os.path.exists(file_path) and os.path.getsize(file_path) > 0: def _validate_database_value(value: Optional[str], *, instance: str) -> str:
# Read the existing CSV file with header v = (value or "").strip()
df = pd.read_csv(file_path, sep=";") if v == "":
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': "
"column 'database' must be '*' or a concrete database name (not empty)."
)
if v == "*":
return "*"
if v.lower() == "nan":
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': database must not be 'nan'."
)
if not DB_NAME_RE.match(v):
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': "
f"invalid database name '{v}'. Allowed: letters, numbers, '_' and '-'."
)
return v
def check_and_add_entry(
file_path: str,
instance: str,
database: Optional[str],
username: str,
password: str,
) -> None:
"""
Add or update an entry in databases.csv.
The function enforces strict validation:
- database MUST be set
- database MUST be '*' or a valid database name
"""
database = _validate_database_value(database, instance=instance)
if os.path.exists(file_path):
df = pd.read_csv(
file_path,
sep=";",
dtype=str,
keep_default_na=False,
)
else: else:
# Create a new DataFrame with columns if file does not exist df = pd.DataFrame(
df = pd.DataFrame(columns=["instance", "database", "username", "password"]) columns=["instance", "database", "username", "password"]
# Check if the entry exists and remove it
mask = (
(df["instance"] == instance)
& (
(df["database"] == database)
| (((df["database"].isna()) | (df["database"] == "")) & (database == ""))
)
& (df["username"] == username)
) )
if not df[mask].empty: mask = (df["instance"] == instance) & (df["database"] == database)
print("Replacing existing entry.")
df = df[~mask] if mask.any():
print("Updating existing entry.")
df.loc[mask, ["username", "password"]] = [username, password]
else: else:
print("Adding new entry.") print("Adding new entry.")
# Create a new DataFrame for the new entry
new_entry = pd.DataFrame( new_entry = pd.DataFrame(
[ [[instance, database, username, password]],
{ columns=["instance", "database", "username", "password"],
"instance": instance,
"database": database,
"username": username,
"password": password,
}
]
) )
# Add (or replace) the entry using concat
df = pd.concat([df, new_entry], ignore_index=True) df = pd.concat([df, new_entry], ignore_index=True)
# Save the updated CSV file
df.to_csv(file_path, sep=";", index=False) df.to_csv(file_path, sep=";", index=False)
def main(): def main() -> None:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Check and replace (or add) a database entry in a CSV file." description="Seed or update databases.csv for backup configuration."
) )
parser.add_argument("file_path", help="Path to the CSV file") parser.add_argument("file", help="Path to databases.csv")
parser.add_argument("instance", help="Database instance") parser.add_argument("instance", help="Instance name (e.g. bigbluebutton)")
parser.add_argument("database", help="Database name") parser.add_argument(
parser.add_argument("username", help="Username") "database",
parser.add_argument("password", nargs="?", default="", help="Password (optional)") help="Database name or '*' to dump all databases",
)
parser.add_argument("username", help="Database username")
parser.add_argument("password", help="Database password")
args = parser.parse_args() args = parser.parse_args()
try:
check_and_add_entry( check_and_add_entry(
args.file_path, args.instance, args.database, args.username, args.password file_path=args.file,
instance=args.instance,
database=args.database,
username=args.username,
password=args.password,
) )
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -166,7 +166,7 @@ def backup_run(
database_containers: list[str], database_containers: list[str],
images_no_stop_required: list[str], images_no_stop_required: list[str],
images_no_backup_required: list[str] | None = None, images_no_backup_required: list[str] | None = None,
dump_only: bool = False, dump_only_sql: bool = False,
) -> None: ) -> None:
cmd = [ cmd = [
"baudolo", "baudolo",
@@ -187,8 +187,8 @@ def backup_run(
] ]
if images_no_backup_required: if images_no_backup_required:
cmd += ["--images-no-backup-required", *images_no_backup_required] cmd += ["--images-no-backup-required", *images_no_backup_required]
if dump_only: if dump_only_sql:
cmd += ["--dump-only"] cmd += ["--dump-only-sql"]
try: try:
run(cmd, capture=True, check=True) run(cmd, capture=True, check=True)

View File

@@ -0,0 +1,29 @@
import unittest
from .helpers import run
class TestE2ECLIContractDumpOnlySql(unittest.TestCase):
def test_help_mentions_new_flag(self) -> None:
cp = run(["baudolo", "--help"], capture=True, check=True)
out = (cp.stdout or "") + "\n" + (cp.stderr or "")
self.assertIn(
"--dump-only-sql",
out,
f"Expected '--dump-only-sql' to appear in --help output. Output:\n{out}",
)
def test_old_flag_is_rejected(self) -> None:
cp = run(["baudolo", "--dump-only"], capture=True, check=False)
self.assertEqual(
cp.returncode,
2,
f"Expected exitcode 2 for unknown args, got {cp.returncode}\n"
f"STDOUT={cp.stdout}\nSTDERR={cp.stderr}",
)
err = (cp.stderr or "") + "\n" + (cp.stdout or "")
# Argparse typically prints "unrecognized arguments"
self.assertTrue(
("unrecognized arguments" in err) or ("usage:" in err.lower()),
f"Expected argparse-style error output. Output:\n{err}",
)

View File

@@ -0,0 +1,175 @@
# tests/e2e/test_e2e_dump_only_fallback_to_files.py
import unittest
from .helpers import (
backup_path,
cleanup_docker,
create_minimal_compose_dir,
ensure_empty_dir,
latest_version_dir,
require_docker,
run,
unique,
write_databases_csv,
wait_for_postgres,
)
class TestE2EDumpOnlyFallbackToFiles(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
require_docker()
cls.prefix = unique("baudolo-e2e-dump-only-sql-fallback")
cls.backups_dir = f"/tmp/{cls.prefix}/Backups"
ensure_empty_dir(cls.backups_dir)
cls.compose_dir = create_minimal_compose_dir(f"/tmp/{cls.prefix}")
cls.repo_name = cls.prefix
cls.pg_container = f"{cls.prefix}-pg"
cls.pg_volume = f"{cls.prefix}-pg-vol"
cls.restore_volume = f"{cls.prefix}-restore-vol"
cls.containers = [cls.pg_container]
cls.volumes = [cls.pg_volume, cls.restore_volume]
run(["docker", "volume", "create", cls.pg_volume])
# Start Postgres (creates a real DB volume)
run(
[
"docker",
"run",
"-d",
"--name",
cls.pg_container,
"-e",
"POSTGRES_PASSWORD=pgpw",
"-e",
"POSTGRES_DB=appdb",
"-e",
"POSTGRES_USER=postgres",
"-v",
f"{cls.pg_volume}:/var/lib/postgresql/data",
"postgres:16",
]
)
wait_for_postgres(cls.pg_container, user="postgres", timeout_s=90)
# Add a deterministic marker file into the volume
cls.marker = "dump-only-sql-fallback-marker"
run(
[
"docker",
"exec",
cls.pg_container,
"sh",
"-lc",
f"echo '{cls.marker}' > /var/lib/postgresql/data/marker.txt",
]
)
# databases.csv WITHOUT matching entry for this instance -> should skip dump
cls.databases_csv = f"/tmp/{cls.prefix}/databases.csv"
write_databases_csv(cls.databases_csv, []) # empty except header
# Run baudolo with --dump-only-sql and a DB container present:
# Expected: WARNING + FALLBACK to file backup (files/ must exist)
cmd = [
"baudolo",
"--compose-dir",
cls.compose_dir,
"--docker-compose-hard-restart-required",
"mailu",
"--repo-name",
cls.repo_name,
"--databases-csv",
cls.databases_csv,
"--backups-dir",
cls.backups_dir,
"--database-containers",
cls.pg_container,
"--images-no-stop-required",
"postgres",
"mariadb",
"mysql",
"alpine",
"--dump-only-sql",
]
cp = run(cmd, capture=True, check=True)
cls.stdout = cp.stdout or ""
cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name)
# Restore files into a fresh volume to prove file backup happened
run(["docker", "volume", "create", cls.restore_volume])
run(
[
"baudolo-restore",
"files",
cls.restore_volume,
cls.hash,
cls.version,
"--backups-dir",
cls.backups_dir,
"--repo-name",
cls.repo_name,
"--source-volume",
cls.pg_volume,
"--rsync-image",
"ghcr.io/kevinveenbirkenbach/alpine-rsync",
]
)
@classmethod
def tearDownClass(cls) -> None:
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
def test_warns_about_missing_dump_in_dump_only_mode(self) -> None:
self.assertIn(
"WARNING: dump-only-sql requested but no DB dump was produced",
self.stdout,
f"Expected warning in baudolo output. STDOUT:\n{self.stdout}",
)
def test_files_backup_exists_due_to_fallback(self) -> None:
p = backup_path(
self.backups_dir,
self.repo_name,
self.version,
self.pg_volume,
) / "files"
self.assertTrue(p.is_dir(), f"Expected files backup dir at: {p}")
def test_sql_dump_not_present(self) -> None:
# There should be no sql dumps because databases.csv had no matching entry.
sql_dir = backup_path(
self.backups_dir,
self.repo_name,
self.version,
self.pg_volume,
) / "sql"
# Could exist (dir created) in some edge cases, but should contain no *.sql dumps.
if sql_dir.exists():
dumps = list(sql_dir.glob("*.sql"))
self.assertEqual(
len(dumps),
0,
f"Did not expect SQL dump files, found: {dumps}",
)
def test_restored_files_contain_marker(self) -> None:
p = run(
[
"docker",
"run",
"--rm",
"-v",
f"{self.restore_volume}:/data",
"alpine:3.20",
"sh",
"-lc",
"cat /data/marker.txt",
]
)
self.assertEqual((p.stdout or "").strip(), self.marker)

View File

@@ -0,0 +1,182 @@
import unittest
from .helpers import (
backup_path,
cleanup_docker,
create_minimal_compose_dir,
ensure_empty_dir,
latest_version_dir,
require_docker,
run,
unique,
wait_for_postgres,
write_databases_csv,
)
class TestE2EDumpOnlySqlMixedRun(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
require_docker()
cls.prefix = unique("baudolo-e2e-dump-only-sql-mixed-run")
cls.backups_dir = f"/tmp/{cls.prefix}/Backups"
ensure_empty_dir(cls.backups_dir)
cls.compose_dir = create_minimal_compose_dir(f"/tmp/{cls.prefix}")
cls.repo_name = cls.prefix
# --- Volumes ---
cls.db_volume = f"{cls.prefix}-vol-db"
cls.files_volume = f"{cls.prefix}-vol-files"
# Track for cleanup
cls.containers: list[str] = []
cls.volumes = [cls.db_volume, cls.files_volume]
# Create volumes
run(["docker", "volume", "create", cls.db_volume])
run(["docker", "volume", "create", cls.files_volume])
# Put a marker into the non-db volume
run(
[
"docker",
"run",
"--rm",
"-v",
f"{cls.files_volume}:/data",
"alpine:3.20",
"sh",
"-lc",
"echo 'hello-non-db' > /data/hello.txt",
]
)
# --- Start Postgres container using the DB volume ---
cls.pg_container = f"{cls.prefix}-pg"
cls.containers.append(cls.pg_container)
cls.pg_password = "postgres"
cls.pg_db = "testdb"
cls.pg_user = "postgres"
run(
[
"docker",
"run",
"-d",
"--name",
cls.pg_container,
"-e",
f"POSTGRES_PASSWORD={cls.pg_password}",
"-v",
f"{cls.db_volume}:/var/lib/postgresql/data",
"postgres:16-alpine",
]
)
wait_for_postgres(cls.pg_container, user="postgres", timeout_s=90)
# Create deterministic content in DB so dump is non-empty
run(
[
"docker",
"exec",
cls.pg_container,
"sh",
"-lc",
f'psql -U postgres -c "CREATE DATABASE {cls.pg_db};" || true',
],
check=True,
)
run(
[
"docker",
"exec",
cls.pg_container,
"sh",
"-lc",
(
f'psql -U postgres -d {cls.pg_db} -c '
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
"INSERT INTO t(id,v) VALUES (1,'hello-db') "
"ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;\""
),
],
check=True,
)
# databases.csv with an entry => dump should succeed
cls.databases_csv = f"/tmp/{cls.prefix}/databases.csv"
write_databases_csv(
cls.databases_csv,
[(cls.pg_container, cls.pg_db, cls.pg_user, cls.pg_password)],
)
# Run baudolo with dump-only-sql
cmd = [
"baudolo",
"--compose-dir",
cls.compose_dir,
"--databases-csv",
cls.databases_csv,
"--database-containers",
cls.pg_container,
"--images-no-stop-required",
"alpine",
"postgres",
"mariadb",
"mysql",
"--dump-only-sql",
"--backups-dir",
cls.backups_dir,
"--repo-name",
cls.repo_name,
]
cp = run(cmd, capture=True, check=True)
cls.stdout = cp.stdout
cls.stderr = cp.stderr
cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name)
@classmethod
def tearDownClass(cls) -> None:
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
def test_db_volume_has_dump_and_no_files_dir(self) -> None:
base = backup_path(self.backups_dir, self.repo_name, self.version, self.db_volume)
dumps = base / "sql"
files = base / "files"
self.assertTrue(dumps.exists(), f"Expected dumps dir for DB volume at: {dumps}")
self.assertFalse(
files.exists(),
f"Did not expect files dir for DB volume when dump succeeded at: {files}",
)
# Optional: at least one dump file exists
dump_files = list(dumps.glob("*.sql")) + list(dumps.glob("*.sql.gz"))
self.assertTrue(
dump_files,
f"Expected at least one SQL dump file in {dumps}, found none.",
)
def test_non_db_volume_has_files_dir(self) -> None:
base = backup_path(
self.backups_dir, self.repo_name, self.version, self.files_volume
)
files = base / "files"
self.assertTrue(
files.exists(),
f"Expected files dir for non-DB volume at: {files}",
)
def test_dump_only_sql_does_not_disable_non_db_files_backup(self) -> None:
# Regression guard: even with --dump-only-sql, non-DB volumes must still be backed up as files
base = backup_path(
self.backups_dir, self.repo_name, self.version, self.files_volume
)
self.assertTrue(
(base / "files").exists(),
f"Expected non-DB volume files backup to exist at: {base / 'files'}",
)

View File

@@ -26,10 +26,10 @@ class TestE2EFilesNoCopy(unittest.TestCase):
cls.repo_name = cls.prefix cls.repo_name = cls.prefix
cls.volume_src = f"{cls.prefix}-vol-src" cls.volume_src = f"{cls.prefix}-vol-src"
cls.volume_dst = f"{cls.prefix}-vol-dst" cls.containers: list[str] = []
cls.containers = [] cls.volumes = [cls.volume_src]
cls.volumes = [cls.volume_src, cls.volume_dst]
# Create source volume and write a marker file
run(["docker", "volume", "create", cls.volume_src]) run(["docker", "volume", "create", cls.volume_src])
run( run(
[ [
@@ -48,7 +48,7 @@ class TestE2EFilesNoCopy(unittest.TestCase):
cls.databases_csv = f"/tmp/{cls.prefix}/databases.csv" cls.databases_csv = f"/tmp/{cls.prefix}/databases.csv"
write_databases_csv(cls.databases_csv, []) write_databases_csv(cls.databases_csv, [])
# dump-only => NO file rsync backups # dump-only-sql => non-DB volumes are STILL backed up as files
backup_run( backup_run(
backups_dir=cls.backups_dir, backups_dir=cls.backups_dir,
repo_name=cls.repo_name, repo_name=cls.repo_name,
@@ -56,28 +56,32 @@ class TestE2EFilesNoCopy(unittest.TestCase):
databases_csv=cls.databases_csv, databases_csv=cls.databases_csv,
database_containers=["dummy-db"], database_containers=["dummy-db"],
images_no_stop_required=["alpine", "postgres", "mariadb", "mysql"], images_no_stop_required=["alpine", "postgres", "mariadb", "mysql"],
dump_only=True, dump_only_sql=True,
) )
cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name) cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name)
# Wipe the volume to ensure restore actually restores something
run(["docker", "volume", "rm", "-f", cls.volume_src])
run(["docker", "volume", "create", cls.volume_src])
@classmethod @classmethod
def tearDownClass(cls) -> None: def tearDownClass(cls) -> None:
cleanup_docker(containers=cls.containers, volumes=cls.volumes) cleanup_docker(containers=cls.containers, volumes=cls.volumes)
def test_files_backup_not_present(self) -> None: def test_files_backup_present_for_non_db_volume(self) -> None:
p = ( p = (
backup_path(self.backups_dir, self.repo_name, self.version, self.volume_src) backup_path(self.backups_dir, self.repo_name, self.version, self.volume_src)
/ "files" / "files"
) )
self.assertFalse(p.exists(), f"Did not expect files backup dir at: {p}") self.assertTrue(p.exists(), f"Expected files backup dir at: {p}")
def test_restore_files_fails_expected(self) -> None: def test_restore_files_succeeds_and_restores_content(self) -> None:
p = run( p = run(
[ [
"baudolo-restore", "baudolo-restore",
"files", "files",
self.volume_dst, self.volume_src,
self.hash, self.hash,
self.version, self.version,
"--backups-dir", "--backups-dir",
@@ -89,6 +93,27 @@ class TestE2EFilesNoCopy(unittest.TestCase):
) )
self.assertEqual( self.assertEqual(
p.returncode, p.returncode,
2, 0,
f"Expected exitcode 2, got {p.returncode}\nSTDOUT={p.stdout}\nSTDERR={p.stderr}", f"Expected exitcode 0, got {p.returncode}\nSTDOUT={p.stdout}\nSTDERR={p.stderr}",
)
cp = run(
[
"docker",
"run",
"--rm",
"-v",
f"{self.volume_src}:/data",
"alpine:3.20",
"sh",
"-lc",
"cat /data/hello.txt",
],
capture=True,
check=True,
)
self.assertEqual(
cp.stdout.strip(),
"hello",
f"Unexpected restored content. STDOUT={cp.stdout}\nSTDERR={cp.stderr}",
) )

View File

@@ -0,0 +1,131 @@
# tests/e2e/test_e2e_images_no_backup_required_early_skip.py
import unittest
from .helpers import (
backup_path,
cleanup_docker,
create_minimal_compose_dir,
ensure_empty_dir,
latest_version_dir,
require_docker,
run,
unique,
write_databases_csv,
)
class TestE2EImagesNoBackupRequiredEarlySkip(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
require_docker()
cls.prefix = unique("baudolo-e2e-early-skip-no-backup-required")
cls.backups_dir = f"/tmp/{cls.prefix}/Backups"
ensure_empty_dir(cls.backups_dir)
cls.compose_dir = create_minimal_compose_dir(f"/tmp/{cls.prefix}")
cls.repo_name = cls.prefix
# --- Docker resources ---
cls.redis_container = f"{cls.prefix}-redis"
cls.ignored_volume = f"{cls.prefix}-redis-vol"
cls.normal_volume = f"{cls.prefix}-files-vol"
cls.containers = [cls.redis_container]
cls.volumes = [cls.ignored_volume, cls.normal_volume]
# Create volumes
run(["docker", "volume", "create", cls.ignored_volume])
run(["docker", "volume", "create", cls.normal_volume])
# Start redis container using the ignored volume
run(
[
"docker",
"run",
"-d",
"--name",
cls.redis_container,
"-v",
f"{cls.ignored_volume}:/data",
"redis:alpine",
]
)
# Put deterministic content into the normal volume
run(
[
"docker",
"run",
"--rm",
"-v",
f"{cls.normal_volume}:/data",
"alpine:3.20",
"sh",
"-lc",
"mkdir -p /data && echo 'hello' > /data/hello.txt",
]
)
# databases.csv required by CLI (can be empty)
cls.databases_csv = f"/tmp/{cls.prefix}/databases.csv"
write_databases_csv(cls.databases_csv, [])
# Run baudolo with images-no-backup-required redis
cmd = [
"baudolo",
"--compose-dir",
cls.compose_dir,
"--docker-compose-hard-restart-required",
"mailu",
"--repo-name",
cls.repo_name,
"--databases-csv",
cls.databases_csv,
"--backups-dir",
cls.backups_dir,
"--database-containers",
"dummy-db",
"--images-no-stop-required",
"alpine",
"redis",
"postgres",
"mariadb",
"mysql",
"--images-no-backup-required",
"redis",
]
cp = run(cmd, capture=True, check=True)
cls.stdout = cp.stdout or ""
cls.stderr = cp.stderr or ""
cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name)
@classmethod
def tearDownClass(cls) -> None:
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
def test_ignored_volume_has_no_backup_directory_at_all(self) -> None:
p = backup_path(
self.backups_dir,
self.repo_name,
self.version,
self.ignored_volume,
)
self.assertFalse(
p.exists(),
f"Expected NO backup directory to be created for ignored volume, but found: {p}",
)
def test_normal_volume_is_still_backed_up(self) -> None:
p = (
backup_path(
self.backups_dir,
self.repo_name,
self.version,
self.normal_volume,
)
/ "files"
/ "hello.txt"
)
self.assertTrue(p.is_file(), f"Expected backed up file at: {p}")

View File

@@ -87,7 +87,7 @@ class TestE2EMariaDBNoCopy(unittest.TestCase):
[(cls.db_container, cls.db_name, cls.db_user, cls.db_password)], [(cls.db_container, cls.db_name, cls.db_user, cls.db_password)],
) )
# dump-only => no files # dump-only-sql => no files
backup_run( backup_run(
backups_dir=cls.backups_dir, backups_dir=cls.backups_dir,
repo_name=cls.repo_name, repo_name=cls.repo_name,
@@ -95,7 +95,7 @@ class TestE2EMariaDBNoCopy(unittest.TestCase):
databases_csv=cls.databases_csv, databases_csv=cls.databases_csv,
database_containers=[cls.db_container], database_containers=[cls.db_container],
images_no_stop_required=["mariadb", "mysql", "alpine", "postgres"], images_no_stop_required=["mariadb", "mysql", "alpine", "postgres"],
dump_only=True, dump_only_sql=True,
) )
cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name) cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name)

View File

@@ -75,7 +75,7 @@ class TestE2EPostgresNoCopy(unittest.TestCase):
databases_csv=cls.databases_csv, databases_csv=cls.databases_csv,
database_containers=[cls.pg_container], database_containers=[cls.pg_container],
images_no_stop_required=["postgres", "mariadb", "mysql", "alpine"], images_no_stop_required=["postgres", "mariadb", "mysql", "alpine"],
dump_only=True, dump_only_sql=True,
) )
cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name) cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name)

View File

@@ -0,0 +1,217 @@
import unittest
from .helpers import (
backup_path,
cleanup_docker,
create_minimal_compose_dir,
ensure_empty_dir,
latest_version_dir,
require_docker,
run,
unique,
wait_for_postgres,
)
class TestE2ESeedStarAndDbEntriesBackupPostgres(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
require_docker()
cls.prefix = unique("baudolo-e2e-seed-star-and-db")
cls.backups_dir = f"/tmp/{cls.prefix}/Backups"
ensure_empty_dir(cls.backups_dir)
cls.compose_dir = create_minimal_compose_dir(f"/tmp/{cls.prefix}")
cls.repo_name = cls.prefix
# --- Volumes ---
cls.db_volume = f"{cls.prefix}-vol-db"
cls.files_volume = f"{cls.prefix}-vol-files"
cls.volumes = [cls.db_volume, cls.files_volume]
run(["docker", "volume", "create", cls.db_volume])
run(["docker", "volume", "create", cls.files_volume])
# Put a marker into the non-db volume
cls.marker = "hello-non-db-seed-star"
run(
[
"docker",
"run",
"--rm",
"-v",
f"{cls.files_volume}:/data",
"alpine:3.20",
"sh",
"-lc",
f"echo '{cls.marker}' > /data/hello.txt",
]
)
# --- Start Postgres container using the DB volume ---
cls.pg_container = f"{cls.prefix}-pg"
cls.containers = [cls.pg_container]
cls.pg_password = "postgres"
cls.pg_user = "postgres"
run(
[
"docker",
"run",
"-d",
"--name",
cls.pg_container,
"-e",
f"POSTGRES_PASSWORD={cls.pg_password}",
"-v",
f"{cls.db_volume}:/var/lib/postgresql/data",
"postgres:16-alpine",
]
)
wait_for_postgres(cls.pg_container, user="postgres", timeout_s=90)
# Create two DBs and deterministic content, so pg_dumpall is meaningful
cls.pg_db1 = "testdb1"
cls.pg_db2 = "testdb2"
run(
[
"docker",
"exec",
cls.pg_container,
"sh",
"-lc",
(
f'psql -U {cls.pg_user} -c "CREATE DATABASE {cls.pg_db1};" || true; '
f'psql -U {cls.pg_user} -c "CREATE DATABASE {cls.pg_db2};" || true; '
),
],
check=True,
)
run(
[
"docker",
"exec",
cls.pg_container,
"sh",
"-lc",
(
f'psql -U {cls.pg_user} -d {cls.pg_db1} -c '
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
"INSERT INTO t(id,v) VALUES (1,'hello-db1') "
"ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;\""
),
],
check=True,
)
run(
[
"docker",
"exec",
cls.pg_container,
"sh",
"-lc",
(
f'psql -U {cls.pg_user} -d {cls.pg_db2} -c '
'"CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, v TEXT);'
"INSERT INTO t(id,v) VALUES (1,'hello-db2') "
"ON CONFLICT (id) DO UPDATE SET v=EXCLUDED.v;\""
),
],
check=True,
)
# --- Seed databases.csv using CLI (star + concrete db) ---
cls.databases_csv = f"/tmp/{cls.prefix}/databases.csv"
# IMPORTANT: because we pass --database-containers <container>,
# get_instance() will use the container name as instance key.
instance = cls.pg_container
# Seed star entry (pg_dumpall)
run(["baudolo-seed", cls.databases_csv, instance, "*", cls.pg_user, cls.pg_password])
# Seed concrete DB entry (pg_dump)
run(
[
"baudolo-seed",
cls.databases_csv,
instance,
cls.pg_db1,
cls.pg_user,
cls.pg_password,
]
)
# --- Run baudolo with dump-only-sql ---
cmd = [
"baudolo",
"--compose-dir",
cls.compose_dir,
"--databases-csv",
cls.databases_csv,
"--database-containers",
cls.pg_container,
"--images-no-stop-required",
"alpine",
"postgres",
"mariadb",
"mysql",
"--dump-only-sql",
"--backups-dir",
cls.backups_dir,
"--repo-name",
cls.repo_name,
]
cp = run(cmd, capture=True, check=True)
cls.stdout = cp.stdout or ""
cls.stderr = cp.stderr or ""
cls.hash, cls.version = latest_version_dir(cls.backups_dir, cls.repo_name)
@classmethod
def tearDownClass(cls) -> None:
cleanup_docker(containers=cls.containers, volumes=cls.volumes)
def test_db_volume_has_cluster_dump_and_concrete_db_dump_and_no_files(self) -> None:
base = backup_path(self.backups_dir, self.repo_name, self.version, self.db_volume)
sql_dir = base / "sql"
files_dir = base / "files"
self.assertTrue(sql_dir.exists(), f"Expected sql dir at: {sql_dir}")
self.assertFalse(
files_dir.exists(),
f"Did not expect files dir for DB volume when dump-only-sql succeeded: {files_dir}",
)
# Cluster dump file produced by '*' entry
cluster = sql_dir / f"{self.pg_container}.cluster.backup.sql"
self.assertTrue(cluster.is_file(), f"Expected cluster dump file at: {cluster}")
# Concrete DB dump produced by normal entry
db1 = sql_dir / f"{self.pg_db1}.backup.sql"
self.assertTrue(db1.is_file(), f"Expected db dump file at: {db1}")
# Basic sanity: cluster dump usually contains CREATE DATABASE statements
txt = cluster.read_text(encoding="utf-8", errors="ignore")
self.assertIn(
"CREATE DATABASE",
txt,
"Expected cluster dump to contain CREATE DATABASE statements",
)
def test_non_db_volume_still_has_files_backup(self) -> None:
base = backup_path(self.backups_dir, self.repo_name, self.version, self.files_volume)
files_dir = base / "files"
self.assertTrue(files_dir.exists(), f"Expected files dir for non-DB volume at: {files_dir}")
marker = files_dir / "hello.txt"
self.assertTrue(marker.is_file(), f"Expected marker file at: {marker}")
self.assertEqual(
marker.read_text(encoding="utf-8").strip(),
self.marker,
)

View File

@@ -7,9 +7,47 @@ from pathlib import Path
def run_seed( def run_seed(
csv_path: Path, instance: str, database: str, username: str, password: str = "" csv_path: Path, instance: str, database: str, username: str, password: str
) -> subprocess.CompletedProcess: ) -> subprocess.CompletedProcess:
# Run the real CLI module (integration-style). """
Run the real CLI module (E2E-style) using subprocess.
Seed contract (current):
- database must be "*" or a valid name (non-empty, matches allowed charset)
- password is required
- entry is keyed by (instance, database); username/password get updated
"""
cp = subprocess.run(
[
sys.executable,
"-m",
"baudolo.seed",
str(csv_path),
instance,
database,
username,
password,
],
text=True,
capture_output=True,
check=False,
)
if cp.returncode != 0:
raise AssertionError(
"seed command failed unexpectedly.\n"
f"returncode: {cp.returncode}\n"
f"stdout:\n{cp.stdout}\n"
f"stderr:\n{cp.stderr}\n"
)
return cp
def run_seed_expect_fail(
csv_path: Path, instance: str, database: str, username: str, password: str
) -> subprocess.CompletedProcess:
"""
Same as run_seed, but expects non-zero exit. Returns CompletedProcess for inspection.
"""
return subprocess.run( return subprocess.run(
[ [
sys.executable, sys.executable,
@@ -23,7 +61,7 @@ def run_seed(
], ],
text=True, text=True,
capture_output=True, capture_output=True,
check=True, check=False,
) )
@@ -33,6 +71,10 @@ def read_csv_semicolon(path: Path) -> list[dict]:
return list(reader) return list(reader)
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
class TestSeedIntegration(unittest.TestCase): class TestSeedIntegration(unittest.TestCase):
def test_creates_file_and_adds_entry_when_missing(self) -> None: def test_creates_file_and_adds_entry_when_missing(self) -> None:
with tempfile.TemporaryDirectory() as td: with tempfile.TemporaryDirectory() as td:
@@ -41,7 +83,7 @@ class TestSeedIntegration(unittest.TestCase):
cp = run_seed(p, "docker.test", "appdb", "alice", "secret") cp = run_seed(p, "docker.test", "appdb", "alice", "secret")
self.assertEqual(cp.returncode, 0, cp.stderr) self.assertEqual(cp.returncode, 0)
self.assertTrue(p.exists()) self.assertTrue(p.exists())
rows = read_csv_semicolon(p) rows = read_csv_semicolon(p)
@@ -51,40 +93,121 @@ class TestSeedIntegration(unittest.TestCase):
self.assertEqual(rows[0]["username"], "alice") self.assertEqual(rows[0]["username"], "alice")
self.assertEqual(rows[0]["password"], "secret") self.assertEqual(rows[0]["password"], "secret")
def test_replaces_existing_entry_same_keys(self) -> None: def test_replaces_existing_entry_same_instance_and_database_updates_username_and_password(
self,
) -> None:
"""
Replacement semantics:
- Key is (instance, database)
- username/password are updated in-place
"""
with tempfile.TemporaryDirectory() as td: with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv" p = Path(td) / "databases.csv"
# First add
run_seed(p, "docker.test", "appdb", "alice", "oldpw") run_seed(p, "docker.test", "appdb", "alice", "oldpw")
rows = read_csv_semicolon(p) rows = read_csv_semicolon(p)
self.assertEqual(len(rows), 1) self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["username"], "alice")
self.assertEqual(rows[0]["password"], "oldpw") self.assertEqual(rows[0]["password"], "oldpw")
# Replace (same instance+database+username) run_seed(p, "docker.test", "appdb", "bob", "newpw")
run_seed(p, "docker.test", "appdb", "alice", "newpw")
rows = read_csv_semicolon(p) rows = read_csv_semicolon(p)
self.assertEqual(len(rows), 1, "Expected replacement, not a duplicate row") self.assertEqual(len(rows), 1, "Expected replacement, not a duplicate row")
self.assertEqual(rows[0]["instance"], "docker.test") self.assertEqual(rows[0]["instance"], "docker.test")
self.assertEqual(rows[0]["database"], "appdb") self.assertEqual(rows[0]["database"], "appdb")
self.assertEqual(rows[0]["username"], "alice") self.assertEqual(rows[0]["username"], "bob")
self.assertEqual(rows[0]["password"], "newpw") self.assertEqual(rows[0]["password"], "newpw")
def test_database_empty_string_matches_existing_empty_database(self) -> None: def test_allows_star_database_for_dump_all(self) -> None:
with tempfile.TemporaryDirectory() as td: with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv" p = Path(td) / "databases.csv"
# Add with empty database cp = run_seed(p, "bigbluebutton", "*", "postgres", "pw")
run_seed(p, "docker.test", "", "alice", "pw1") self.assertEqual(cp.returncode, 0)
rows = read_csv_semicolon(p) rows = read_csv_semicolon(p)
self.assertEqual(len(rows), 1) self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["database"], "") self.assertEqual(rows[0]["instance"], "bigbluebutton")
self.assertEqual(rows[0]["database"], "*")
self.assertEqual(rows[0]["username"], "postgres")
self.assertEqual(rows[0]["password"], "pw")
def test_replaces_existing_star_entry(self) -> None:
with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv"
run_seed(p, "bigbluebutton", "*", "postgres", "pw1")
run_seed(p, "bigbluebutton", "*", "postgres", "pw2")
# Replace with empty database again
run_seed(p, "docker.test", "", "alice", "pw2")
rows = read_csv_semicolon(p) rows = read_csv_semicolon(p)
self.assertEqual(len(rows), 1) self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["database"], "") self.assertEqual(rows[0]["database"], "*")
self.assertEqual(rows[0]["password"], "pw2") self.assertEqual(rows[0]["password"], "pw2")
def test_rejects_empty_database_value(self) -> None:
with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv"
cp = run_seed_expect_fail(p, "docker.test", "", "alice", "pw")
self.assertNotEqual(cp.returncode, 0)
combined = ((cp.stdout or "") + "\n" + (cp.stderr or "")).lower()
self.assertIn("error:", combined)
self.assertIn("database", combined)
self.assertIn("not empty", combined)
self.assertFalse(p.exists(), "Should not create file on invalid input")
def test_rejects_invalid_database_name_characters(self) -> None:
with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv"
cp = run_seed_expect_fail(p, "docker.test", "app db", "alice", "pw")
self.assertNotEqual(cp.returncode, 0)
combined = ((cp.stdout or "") + "\n" + (cp.stderr or "")).lower()
self.assertIn("error:", combined)
self.assertIn("invalid database name", combined)
self.assertFalse(p.exists(), "Should not create file on invalid input")
def test_rejects_nan_database_name(self) -> None:
with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv"
cp = run_seed_expect_fail(p, "docker.test", "nan", "alice", "pw")
self.assertNotEqual(cp.returncode, 0)
combined = ((cp.stdout or "") + "\n" + (cp.stderr or "")).lower()
self.assertIn("error:", combined)
self.assertIn("must not be 'nan'", combined)
self.assertFalse(p.exists(), "Should not create file on invalid input")
def test_accepts_hyphen_and_underscore_database_names(self) -> None:
with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv"
run_seed(p, "docker.test", "my_db-1", "alice", "pw")
rows = read_csv_semicolon(p)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["database"], "my_db-1")
def test_file_is_semicolon_delimited_and_has_header(self) -> None:
with tempfile.TemporaryDirectory() as td:
p = Path(td) / "databases.csv"
run_seed(p, "docker.test", "appdb", "alice", "pw")
txt = read_text(p)
self.assertTrue(
txt.startswith("instance;database;username;password"),
f"Unexpected header / delimiter in file:\n{txt}",
)
self.assertIn(";", txt)
if __name__ == "__main__":
unittest.main()