feat(backup): stricter databases.csv semantics + atomic SQL dumps

- read databases.csv with stable types (dtype=str, keep_default_na=False)
- validate database field: require '*' or concrete name (no empty/NaN)
- support Postgres cluster dumps via '*' entries (pg_dumpall)
- write SQL dumps atomically to avoid partial/empty files
- early-skip fully ignored volumes before creating backup directories
- update seed CLI to enforce new contract and update by (instance,database)
- adjust tests: sql dir naming + add E2E coverage for early-skip and '*' seeding
This commit is contained in:
2025-12-29 11:39:57 +01:00
parent c01ab55f2d
commit f3ef86a444
8 changed files with 667 additions and 109 deletions

View File

@@ -132,7 +132,12 @@ def main() -> int:
versions_dir = os.path.join(args.backups_dir, machine_id, args.repo_name)
version_dir = create_version_directory(versions_dir, backup_time)
databases_df = pandas.read_csv(args.databases_csv, sep=";")
# IMPORTANT:
# - keep_default_na=False prevents empty fields from turning into NaN
# - dtype=str keeps all columns stable for comparisons/validation
databases_df = pandas.read_csv(
args.databases_csv, sep=";", keep_default_na=False, dtype=str
)
print("💾 Start volume backups...", flush=True)
@@ -140,8 +145,16 @@ def main() -> int:
print(f"Start backup routine for volume: {volume_name}", flush=True)
containers = containers_using_volume(volume_name)
# EARLY SKIP: if all linked containers are ignored, do not create any dirs
if volume_is_fully_ignored(containers, args.images_no_backup_required):
print(
f"Skipping volume '{volume_name}' entirely (all linked containers are ignored).",
flush=True,
)
continue
vol_dir = create_volume_directory(version_dir, volume_name)
found_db, dumped_any = _backup_dumps_for_volume(
containers=containers,
vol_dir=vol_dir,
@@ -163,16 +176,6 @@ def main() -> int:
continue
# Non-DB volume -> always do file backup (fall through)
# skip file backup if all linked containers are ignored
if volume_is_fully_ignored(containers, args.images_no_backup_required):
print(
f"Skipping file backup for volume '{volume_name}' (all linked containers are ignored).",
flush=True,
)
continue
if args.everything:
# "everything": always do pre-rsync, then stop + rsync again
backup_volume(versions_dir, volume_name, vol_dir)

View File

@@ -4,6 +4,8 @@ import os
import pathlib
import re
import logging
from typing import Optional
import pandas
from .shell import BackupException, execute_shell_command
@@ -12,17 +14,53 @@ log = logging.getLogger(__name__)
def get_instance(container: str, database_containers: list[str]) -> str:
"""
Derive a stable instance name from the container name.
"""
if container in database_containers:
return container
return re.split(r"(_|-)(database|db|postgres)", container)[0]
def _validate_database_value(value: Optional[str], *, instance: str) -> str:
"""
Enforce explicit database semantics:
- "*" => dump ALL databases (cluster dump for Postgres)
- "<name>" => dump exactly this database
- "" => invalid configuration (would previously result in NaN / nan.backup.sql)
"""
v = (value or "").strip()
if v == "":
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': "
"column 'database' must be '*' or a concrete database name (not empty)."
)
return v
def _atomic_write_cmd(cmd: str, out_file: str) -> None:
"""
Write dump output atomically:
- write to <file>.tmp
- rename to <file> only on success
This prevents empty or partial dump files from being treated as valid backups.
"""
tmp = f"{out_file}.tmp"
execute_shell_command(f"{cmd} > {tmp}")
execute_shell_command(f"mv {tmp} {out_file}")
def fallback_pg_dumpall(container: str, username: str, password: str, out_file: str) -> None:
"""
Perform a full Postgres cluster dump using pg_dumpall.
"""
cmd = (
f"PGPASSWORD={password} docker exec -i {container} "
f"pg_dumpall -U {username} -h localhost > {out_file}"
f"pg_dumpall -U {username} -h localhost"
)
execute_shell_command(cmd)
_atomic_write_cmd(cmd, out_file)
def backup_database(
@@ -34,12 +72,15 @@ def backup_database(
database_containers: list[str],
) -> bool:
"""
Returns True if at least one dump file was produced, else False.
Backup databases for a given DB container.
Returns True if at least one dump was produced.
"""
instance_name = get_instance(container, database_containers)
entries = databases_df.loc[databases_df["instance"] == instance_name]
entries = databases_df[databases_df["instance"] == instance_name]
if entries.empty:
log.warning("No entry found for instance '%s' (skipping DB dump)", instance_name)
log.debug("No database entries for instance '%s'", instance_name)
return False
out_dir = os.path.join(volume_dir, "sql")
@@ -48,43 +89,56 @@ def backup_database(
produced = False
for row in entries.itertuples(index=False):
db_name = row.database
user = row.username
password = row.password
raw_db = getattr(row, "database", "")
user = (getattr(row, "username", "") or "").strip()
password = (getattr(row, "password", "") or "").strip()
db_value = _validate_database_value(raw_db, instance=instance_name)
# Explicit: dump ALL databases
if db_value == "*":
if db_type != "postgres":
raise ValueError(
f"databases.csv entry for instance '{instance_name}': "
"'*' is currently only supported for Postgres."
)
cluster_file = os.path.join(
out_dir, f"{instance_name}.cluster.backup.sql"
)
fallback_pg_dumpall(container, user, password, cluster_file)
produced = True
continue
# Concrete database dump
db_name = db_value
dump_file = os.path.join(out_dir, f"{db_name}.backup.sql")
if db_type == "mariadb":
cmd = (
f"docker exec {container} /usr/bin/mariadb-dump "
f"-u {user} -p{password} {db_name} > {dump_file}"
f"-u {user} -p{password} {db_name}"
)
execute_shell_command(cmd)
_atomic_write_cmd(cmd, dump_file)
produced = True
continue
if db_type == "postgres":
cluster_file = os.path.join(out_dir, f"{instance_name}.cluster.backup.sql")
if not db_name:
fallback_pg_dumpall(container, user, password, cluster_file)
return True
try:
cmd = (
f"PGPASSWORD={password} docker exec -i {container} "
f"pg_dump -U {user} -d {db_name} -h localhost > {dump_file}"
f"pg_dump -U {user} -d {db_name} -h localhost"
)
execute_shell_command(cmd)
_atomic_write_cmd(cmd, dump_file)
produced = True
except BackupException as e:
print(f"pg_dump failed: {e}", flush=True)
print(
f"Falling back to pg_dumpall for instance '{instance_name}'",
flush=True,
# Explicit DB dump failed -> hard error
raise BackupException(
f"Postgres dump failed for instance '{instance_name}', "
f"database '{db_name}'. This database was explicitly configured "
"and therefore must succeed.\n"
f"{e}"
)
fallback_pg_dumpall(container, user, password, cluster_file)
produced = True
continue
return produced

View File

@@ -1,67 +1,106 @@
import pandas as pd
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import re
import sys
import pandas as pd
from typing import Optional
def check_and_add_entry(file_path, instance, database, username, password):
# Check if the file exists and is not empty
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
# Read the existing CSV file with header
df = pd.read_csv(file_path, sep=";")
else:
# Create a new DataFrame with columns if file does not exist
df = pd.DataFrame(columns=["instance", "database", "username", "password"])
DB_NAME_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_-]*$")
# Check if the entry exists and remove it
mask = (
(df["instance"] == instance)
& (
(df["database"] == database)
| (((df["database"].isna()) | (df["database"] == "")) & (database == ""))
def _validate_database_value(value: Optional[str], *, instance: str) -> str:
v = (value or "").strip()
if v == "":
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': "
"column 'database' must be '*' or a concrete database name (not empty)."
)
& (df["username"] == username)
)
if v == "*":
return "*"
if v.lower() == "nan":
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': database must not be 'nan'."
)
if not DB_NAME_RE.match(v):
raise ValueError(
f"Invalid databases.csv entry for instance '{instance}': "
f"invalid database name '{v}'. Allowed: letters, numbers, '_' and '-'."
)
return v
if not df[mask].empty:
print("Replacing existing entry.")
df = df[~mask]
def check_and_add_entry(
file_path: str,
instance: str,
database: Optional[str],
username: str,
password: str,
) -> None:
"""
Add or update an entry in databases.csv.
The function enforces strict validation:
- database MUST be set
- database MUST be '*' or a valid database name
"""
database = _validate_database_value(database, instance=instance)
if os.path.exists(file_path):
df = pd.read_csv(
file_path,
sep=";",
dtype=str,
keep_default_na=False,
)
else:
df = pd.DataFrame(
columns=["instance", "database", "username", "password"]
)
mask = (df["instance"] == instance) & (df["database"] == database)
if mask.any():
print("Updating existing entry.")
df.loc[mask, ["username", "password"]] = [username, password]
else:
print("Adding new entry.")
new_entry = pd.DataFrame(
[[instance, database, username, password]],
columns=["instance", "database", "username", "password"],
)
df = pd.concat([df, new_entry], ignore_index=True)
# Create a new DataFrame for the new entry
new_entry = pd.DataFrame(
[
{
"instance": instance,
"database": database,
"username": username,
"password": password,
}
]
)
# Add (or replace) the entry using concat
df = pd.concat([df, new_entry], ignore_index=True)
# Save the updated CSV file
df.to_csv(file_path, sep=";", index=False)
def main():
def main() -> None:
parser = argparse.ArgumentParser(
description="Check and replace (or add) a database entry in a CSV file."
description="Seed or update databases.csv for backup configuration."
)
parser.add_argument("file_path", help="Path to the CSV file")
parser.add_argument("instance", help="Database instance")
parser.add_argument("database", help="Database name")
parser.add_argument("username", help="Username")
parser.add_argument("password", nargs="?", default="", help="Password (optional)")
parser.add_argument("file", help="Path to databases.csv")
parser.add_argument("instance", help="Instance name (e.g. bigbluebutton)")
parser.add_argument(
"database",
help="Database name or '*' to dump all databases",
)
parser.add_argument("username", help="Database username")
parser.add_argument("password", help="Database password")
args = parser.parse_args()
check_and_add_entry(
args.file_path, args.instance, args.database, args.username, args.password
)
try:
check_and_add_entry(
file_path=args.file,
instance=args.instance,
database=args.database,
username=args.username,
password=args.password,
)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":