Compare commits

15 Commits

10 changed files with 520 additions and 161 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
databases.csv
__pycache__

4
Makefile Normal file
View File

@@ -0,0 +1,4 @@
.PHONY: test
test:
python -m unittest discover -s tests/unit -p "test_*.py"

2
Todo.md Normal file
View File

@@ -0,0 +1,2 @@
# Todo
- Verify that restore backup is correct implemented

0
__init__.py Normal file
View File

View File

@@ -16,10 +16,19 @@ class BackupException(Exception):
def execute_shell_command(command):
"""Execute a shell command and return its output."""
print(command)
process = subprocess.Popen([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
process = subprocess.Popen(
[command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
out, err = process.communicate()
if process.returncode != 0:
raise BackupException(f"Error in command: {command}\nOutput: {out}\nError: {err}\nExit code: {process.returncode}")
raise BackupException(
f"Error in command: {command}\n"
f"Output: {out}\nError: {err}\n"
f"Exit code: {process.returncode}"
)
return [line.decode("utf-8") for line in out.splitlines()]
def create_version_directory():
@@ -34,29 +43,18 @@ def get_machine_id():
### GLOBAL CONFIGURATION ###
# Container names treated as special instances for database backups
DATABASE_CONTAINERS = ['central-mariadb', 'central-postgres']
# Images which do not require container stop for file backups
IMAGES_NO_STOP_REQUIRED = []
# Images to skip entirely
IMAGES_NO_BACKUP_REQUIRED = []
# Compose dirs requiring hard restart
DOCKER_COMPOSE_HARD_RESTART_REQUIRED = ['mailu']
IMAGES_NO_STOP_REQUIRED = [
'akaunting',
'baserow',
'discourse',
'element',
'gitea',
'listmonk',
'mastodon',
'matomo',
'nextcloud',
'openproject',
'peertube',
'pixelfed',
'wordpress'
]
IMAGES_NO_BACKUP_REQUIRED = [
'redis',
'memcached'
]
# DEFINE CONSTANTS
DIRNAME = os.path.dirname(__file__)
SCRIPTS_DIRECTORY = pathlib.Path(os.path.realpath(__file__)).parent.parent
@@ -69,32 +67,20 @@ BACKUP_TIME = datetime.now().strftime("%Y%m%d%H%M%S")
VERSION_DIR = create_version_directory()
def get_instance(container):
# The function is defined to take one parameter, 'container',
# which is expected to be a string.
# This line uses regular expressions to split the 'container' string.
# 're.split' is a method that divides a string into a list, based on the occurrences of a pattern.
if container in ['central-mariadb', 'central-postgres']:
"""Extract the database instance name based on container name."""
if container in DATABASE_CONTAINERS:
instance_name = container
else:
instance_name = re.split("(_|-)(database|db|postgres)", container)[0]
# The pattern "(_|-)(database|db|postgres)" is explained as follows:
# - "(_|-)": Matches an underscore '_' or a hyphen '-'.
# - "(database|db|postgres)": Matches one of the strings "database", "db", or "postgres".
# So, this pattern will match segments like "_database", "-db", "_postgres", etc.
# For example, in "central-db", it matches "-db".
# After splitting, [0] is used to select the first element of the list resulting from the split.
# This element is the string portion before the matched pattern.
# For "central-db", the split results in ["central", "db"], and [0] selects "central".
print(f"Extracted instance name: {instance_name}")
return instance_name
def stamp_directory():
"""Stamp a directory using directory-validator."""
stamp_command = f"python {SCRIPTS_DIRECTORY}/directory-validator/directory-validator.py --stamp {VERSION_DIR}"
stamp_command = (
f"python {SCRIPTS_DIRECTORY}/directory-validator/"
f"directory-validator.py --stamp {VERSION_DIR}"
)
try:
execute_shell_command(stamp_command)
print(f"Successfully stamped directory: {VERSION_DIR}")
@@ -106,66 +92,80 @@ def backup_database(container, volume_dir, db_type):
"""Backup database (MariaDB or PostgreSQL) if applicable."""
print(f"Starting database backup for {container} using {db_type}...")
instance_name = get_instance(container)
# Filter the DataFrame for the given instance_name
database_entries = DATABASES.loc[DATABASES['instance'] == instance_name]
# Check if there is no entry
if database_entries.empty:
raise BackupException(f"No entry found for instance '{instance_name}'")
# Get the first (and only) entry
for database_entry in database_entries.iloc:
database_name = database_entry['database']
database_username = database_entry['username']
database_password = database_entry['password']
backup_destination_dir = os.path.join(volume_dir, "sql")
pathlib.Path(backup_destination_dir).mkdir(parents=True, exist_ok=True)
backup_destination_file = os.path.join(backup_destination_dir, f"{database_name}.backup.sql")
backup_destination_file = os.path.join(
backup_destination_dir,
f"{database_name}.backup.sql"
)
if db_type == 'mariadb':
backup_command = f"docker exec {container} /usr/bin/mariadb-dump -u {database_username} -p{database_password} {database_name} > {backup_destination_file}"
execute_shell_command(backup_command)
cmd = (
f"docker exec {container} "
f"/usr/bin/mariadb-dump -u {database_username} "
f"-p{database_password} {database_name} > {backup_destination_file}"
)
execute_shell_command(cmd)
if db_type == 'postgres':
cluster_file = os.path.join(backup_destination_dir, f"{instance_name}.cluster.backup.sql")
cluster_file = os.path.join(
backup_destination_dir,
f"{instance_name}.cluster.backup.sql"
)
if not database_name:
fallback_pg_dumpall(container, database_username, database_password, cluster_file)
fallback_pg_dumpall(
container,
database_username,
database_password,
cluster_file
)
return
try:
if database_password:
backup_command = (
cmd = (
f"PGPASSWORD={database_password} docker exec -i {container} "
f"pg_dump -U {database_username} -d {database_name} "
f"-h localhost > {backup_destination_file}"
)
else:
backup_command = (
cmd = (
f"docker exec -i {container} pg_dump -U {database_username} "
f"-d {database_name} -h localhost --no-password "
f"> {backup_destination_file}"
)
execute_shell_command(backup_command)
execute_shell_command(cmd)
except BackupException as e:
print(f"pg_dump failed: {e}")
print(f"Falling back to pg_dumpall for instance '{instance_name}'")
fallback_pg_dumpall(container, database_username, database_password, cluster_file)
fallback_pg_dumpall(
container,
database_username,
database_password,
cluster_file
)
print(f"Database backup for database {container} completed.")
def get_last_backup_dir(volume_name, current_backup_dir):
"""Get the most recent backup directory for the specified volume."""
versions = sorted(os.listdir(VERSIONS_DIR), reverse=True)
for version in versions:
backup_dir = os.path.join(VERSIONS_DIR, version, volume_name, "files", "")
# Ignore current backup dir
if backup_dir != current_backup_dir:
if os.path.isdir(backup_dir):
return backup_dir
backup_dir = os.path.join(
VERSIONS_DIR, version, volume_name, "files", ""
)
if backup_dir != current_backup_dir and os.path.isdir(backup_dir):
return backup_dir
print(f"No previous backups available for volume: {volume_name}")
return None
def getStoragePath(volume_name):
path = execute_shell_command(f"docker volume inspect --format '{{{{ .Mountpoint }}}}' {volume_name}")[0]
path = execute_shell_command(
f"docker volume inspect --format '{{{{ .Mountpoint }}}}' {volume_name}"
)[0]
return f"{path}/"
def getFileRsyncDestinationPath(volume_dir):
@@ -175,133 +175,132 @@ def getFileRsyncDestinationPath(volume_dir):
def fallback_pg_dumpall(container, username, password, backup_destination_file):
"""Fallback function to run pg_dumpall if pg_dump fails or no DB is defined."""
print(f"Running pg_dumpall for container '{container}'...")
command = (
cmd = (
f"PGPASSWORD={password} docker exec -i {container} "
f"pg_dumpall -U {username} -h localhost > {backup_destination_file}"
)
execute_shell_command(command)
execute_shell_command(cmd)
def backup_volume(volume_name, volume_dir):
"""Perform incremental file backup of a Docker volume."""
try:
"""Backup files of a volume with incremental backups."""
print(f"Starting backup routine for volume: {volume_name}")
files_rsync_destination_path = getFileRsyncDestinationPath(volume_dir)
pathlib.Path(files_rsync_destination_path).mkdir(parents=True, exist_ok=True)
last_backup_dir = get_last_backup_dir(volume_name, files_rsync_destination_path)
link_dest_option = f"--link-dest='{last_backup_dir}'" if last_backup_dir else ""
source_dir = getStoragePath(volume_name)
rsync_command = f"rsync -abP --delete --delete-excluded {link_dest_option} {source_dir} {files_rsync_destination_path}"
execute_shell_command(rsync_command)
dest = getFileRsyncDestinationPath(volume_dir)
pathlib.Path(dest).mkdir(parents=True, exist_ok=True)
last = get_last_backup_dir(volume_name, dest)
link_dest = f"--link-dest='{last}'" if last else ""
source = getStoragePath(volume_name)
cmd = (
f"rsync -abP --delete --delete-excluded "
f"{link_dest} {source} {dest}"
)
execute_shell_command(cmd)
except BackupException as e:
if "file has vanished" in e.args[0]:
if "file has vanished" in str(e):
print("Warning: Some files vanished before transfer. Continuing.")
else:
raise
print(f"Backup routine for volume: {volume_name} completed.")
def get_image_info(container):
return execute_shell_command(f"docker inspect --format '{{{{.Config.Image}}}}' {container}")
return execute_shell_command(
f"docker inspect --format '{{{{.Config.Image}}}}' {container}"
)
def has_image(container,image):
def has_image(container, image):
"""Check if the container is using the image"""
image_info = get_image_info(container)
return image in image_info[0]
info = get_image_info(container)[0]
return image in info
def change_containers_status(containers,status):
"""Stop a list of containers."""
def change_containers_status(containers, status):
"""Stop or start a list of containers."""
if containers:
container_list = ' '.join(containers)
print(f"{status} containers {container_list}...")
execute_shell_command(f"docker {status} {container_list}")
names = ' '.join(containers)
print(f"{status.capitalize()} containers: {names}...")
execute_shell_command(f"docker {status} {names}")
else:
print(f"No containers to {status}.")
def get_container_with_image(containers,image):
for container in containers:
if has_image(container,image):
return container
return False
print(f"No containers to {status}.")
def is_image_whitelisted(container, images):
"""Check if the container's image is one of the whitelisted images."""
image_info = get_image_info(container)
container_image = image_info[0]
"""
Return True if the container's image matches any of the whitelist patterns.
Also prints out the image name and the match result.
"""
# fetch the image (e.g. "nextcloud:23-fpm-alpine")
info = get_image_info(container)[0]
for image in images:
if image in container_image:
return True
return False
# check against each pattern
whitelisted = any(pattern in info for pattern in images)
# log the result
print(f"Container {container!r} → image {info!r} → whitelisted? {whitelisted}", flush=True)
return whitelisted
def is_container_stop_required(containers):
"""Check if any of the containers are using images that are not whitelisted."""
return any(not is_image_whitelisted(container, IMAGES_NO_STOP_REQUIRED) for container in containers)
"""
Check if any of the containers are using images that are not whitelisted.
If so, print them out and return True; otherwise return False.
"""
# Find all containers whose image isnt on the whitelist
not_whitelisted = [
c for c in containers
if not is_image_whitelisted(c, IMAGES_NO_STOP_REQUIRED)
]
if not_whitelisted:
print(f"Containers requiring stop because they are not whitelisted: {', '.join(not_whitelisted)}")
return True
return False
def create_volume_directory(volume_name):
"""Create necessary directories for backup."""
volume_dir = os.path.join(VERSION_DIR, volume_name)
pathlib.Path(volume_dir).mkdir(parents=True, exist_ok=True)
return volume_dir
path = os.path.join(VERSION_DIR, volume_name)
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
return path
def is_image_ignored(container):
"""Check if the container's image is one of the ignored images."""
for image in IMAGES_NO_BACKUP_REQUIRED:
if has_image(container, image):
return True
return False
return any(has_image(container, img) for img in IMAGES_NO_BACKUP_REQUIRED)
def backup_with_containers_paused(volume_name, volume_dir, containers, shutdown):
change_containers_status(containers,'stop')
change_containers_status(containers, 'stop')
backup_volume(volume_name, volume_dir)
# Just restart containers if shutdown is false
if not shutdown:
change_containers_status(containers,'start')
change_containers_status(containers, 'start')
def backup_mariadb_or_postgres(container, volume_dir):
'''Performs database image specific backup procedures'''
for image in ['mariadb','postgres']:
if has_image(container, image):
backup_database(container, volume_dir, image)
"""Performs database image specific backup procedures"""
for img in ['mariadb', 'postgres']:
if has_image(container, img):
backup_database(container, volume_dir, img)
return True
return False
def default_backup_routine_for_volume(volume_name, containers, shutdown):
"""Perform backup routine for a given volume."""
volume_dir=""
for container in containers:
# Skip ignored images
if is_image_ignored(container):
print(f"Ignoring volume '{volume_name}' linked to container '{container}' with ignored image.")
continue
# Directory which contains files and sqls
volume_dir = create_volume_directory(volume_name)
# Execute Database backup and exit if successfull
if backup_mariadb_or_postgres(container, volume_dir):
vol_dir = ""
for c in containers:
if is_image_ignored(c):
print(f"Ignoring volume '{volume_name}' linked to container '{c}'.")
continue
vol_dir = create_volume_directory(volume_name)
if backup_mariadb_or_postgres(c, vol_dir):
return
# Execute backup if image is not ignored
if volume_dir:
backup_volume(volume_name, volume_dir)
if vol_dir:
backup_volume(volume_name, vol_dir)
if is_container_stop_required(containers):
backup_with_containers_paused(volume_name, volume_dir, containers, shutdown)
backup_with_containers_paused(volume_name, vol_dir, containers, shutdown)
def backup_everything(volume_name, containers, shutdown):
"""Perform file backup routine for a given volume."""
volume_dir=create_volume_directory(volume_name)
# Execute sql dumps
for container in containers:
backup_mariadb_or_postgres(container, volume_dir)
vol_dir = create_volume_directory(volume_name)
for c in containers:
backup_mariadb_or_postgres(c, vol_dir)
backup_volume(volume_name, vol_dir)
backup_with_containers_paused(volume_name, vol_dir, containers, shutdown)
# Execute file backups
backup_volume(volume_name, volume_dir)
backup_with_containers_paused(volume_name, volume_dir, containers, shutdown)
def hard_restart_docker_services(dir_path):
"""Perform a hard restart of docker-compose services in the given directory."""
try:
@@ -315,18 +314,16 @@ def hard_restart_docker_services(dir_path):
def handle_docker_compose_services(parent_directory):
"""Iterate through directories and restart or hard restart services as needed."""
for dir_entry in os.scandir(parent_directory):
if dir_entry.is_dir():
dir_path = dir_entry.path
dir_name = os.path.basename(dir_path)
for entry in os.scandir(parent_directory):
if entry.is_dir():
dir_path = entry.path
name = os.path.basename(dir_path)
print(f"Checking directory: {dir_path}")
docker_compose_file = os.path.join(dir_path, "docker-compose.yml")
if os.path.isfile(docker_compose_file):
compose_file = os.path.join(dir_path, "docker-compose.yml")
if os.path.isfile(compose_file):
print(f"Found docker-compose.yml in {dir_path}.")
if dir_name in DOCKER_COMPOSE_HARD_RESTART_REQUIRED:
print(f"Directory {dir_name} detected. Performing hard restart...")
if name in DOCKER_COMPOSE_HARD_RESTART_REQUIRED:
print(f"Directory {name} detected. Performing hard restart...")
hard_restart_docker_services(dir_path)
else:
print(f"No restart required for services in {dir_path}...")
@@ -334,28 +331,53 @@ def handle_docker_compose_services(parent_directory):
print(f"No docker-compose.yml found in {dir_path}. Skipping.")
def main():
global DATABASE_CONTAINERS, IMAGES_NO_STOP_REQUIRED
parser = argparse.ArgumentParser(description='Backup Docker volumes.')
parser.add_argument('--everything', action='store_true',
help='Force file backup for all volumes and additional execute database dumps')
parser.add_argument('--shutdown', action='store_true',
help='Doesn\'t restart containers after backup')
parser.add_argument('--compose-dir', type=str, required=True, help='Path to the parent directory containing docker-compose setups')
parser.add_argument('--compose-dir', type=str, required=True,
help='Path to the parent directory containing docker-compose setups')
parser.add_argument(
'--database-containers',
nargs='+',
required=True,
help='List of container names treated as special instances for database backups'
)
parser.add_argument(
'--images-no-stop-required',
nargs='+',
required=True,
help='List of image names for which containers should not be stopped during file backup'
)
parser.add_argument(
'--images-no-backup-required',
nargs='+',
help='List of image names for which no backup should be performed (optional)'
)
args = parser.parse_args()
DATABASE_CONTAINERS = args.database_containers
IMAGES_NO_STOP_REQUIRED = args.images_no_stop_required
if args.images_no_backup_required is not None:
global IMAGES_NO_BACKUP_REQUIRED
IMAGES_NO_BACKUP_REQUIRED = args.images_no_backup_required
print('Start volume backups...')
print('💾 Start volume backups...', flush=True)
volume_names = execute_shell_command("docker volume ls --format '{{.Name}}'")
for volume_name in volume_names:
print(f'Start backup routine for volume: {volume_name}')
containers = execute_shell_command(f"docker ps --filter volume=\"{volume_name}\" --format '{{{{.Names}}}}'")
containers = execute_shell_command(
f"docker ps --filter volume=\"{volume_name}\" --format '{{{{.Names}}}}'"
)
if args.everything:
backup_everything(volume_name, containers, args.shutdown)
else:
else:
default_backup_routine_for_volume(volume_name, containers, args.shutdown)
stamp_directory()
print('Finished volume backups.')
# Handle Docker Compose services
print('Handling Docker Compose services...')
handle_docker_compose_services(args.compose_dir)

170
restore_backup.py Normal file
View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
# @todo Not tested yet. Needs to be tested
"""
restore_backup.py
A script to recover Docker volumes and database dumps from local backups.
Supports an --empty flag to clear the database objects before import (drops all tables/functions etc.).
"""
import argparse
import os
import sys
import subprocess
def run_command(cmd, capture_output=False, input=None, **kwargs):
"""Run a subprocess command and handle errors."""
try:
result = subprocess.run(cmd, check=True, capture_output=capture_output, input=input, **kwargs)
return result
except subprocess.CalledProcessError as e:
print(f"ERROR: Command '{' '.join(cmd)}' failed with exit code {e.returncode}")
if e.stdout:
print(e.stdout.decode())
if e.stderr:
print(e.stderr.decode())
sys.exit(1)
def recover_postgres(container, password, db_name, user, backup_sql, empty=False):
print("Recovering PostgreSQL dump...")
os.environ['PGPASSWORD'] = password
if empty:
print("Dropping existing PostgreSQL objects...")
# Drop all tables, views, sequences, functions in public schema
drop_sql = """
DO $$ DECLARE r RECORD;
BEGIN
FOR r IN (
SELECT table_name AS name, 'TABLE' AS type FROM information_schema.tables WHERE table_schema='public'
UNION ALL
SELECT routine_name AS name, 'FUNCTION' AS type FROM information_schema.routines WHERE specific_schema='public'
UNION ALL
SELECT sequence_name AS name, 'SEQUENCE' AS type FROM information_schema.sequences WHERE sequence_schema='public'
) LOOP
-- Use %s for type to avoid quoting the SQL keyword
EXECUTE format('DROP %s public.%I CASCADE', r.type, r.name);
END LOOP;
END
$$;
"""
run_command([
'docker', 'exec', '-i', container,
'psql', '-v', 'ON_ERROR_STOP=1', '-U', user, '-d', db_name
], input=drop_sql.encode())
print("Existing objects dropped.")
print("Importing the dump...")
with open(backup_sql, 'rb') as f:
run_command([
'docker', 'exec', '-i', container,
'psql', '-v', 'ON_ERROR_STOP=1', '-U', user, '-d', db_name
], stdin=f)
print("PostgreSQL recovery complete.")
def recover_mariadb(container, password, db_name, user, backup_sql, empty=False):
print("Recovering MariaDB dump...")
if empty:
print("Dropping existing MariaDB tables...")
# Disable foreign key checks
run_command([
'docker', 'exec', container,
'mysql', '-u', user, f"--password={password}", '-e', 'SET FOREIGN_KEY_CHECKS=0;'
])
# Get all table names
result = run_command([
'docker', 'exec', container,
'mysql', '-u', user, f"--password={password}", '-N', '-e',
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{db_name}';"
], capture_output=True)
tables = result.stdout.decode().split()
for tbl in tables:
run_command([
'docker', 'exec', container,
'mysql', '-u', user, f"--password={password}", '-e',
f"DROP TABLE IF EXISTS `{db_name}`.`{tbl}`;"
])
# Enable foreign key checks
run_command([
'docker', 'exec', container,
'mysql', '-u', user, f"--password={password}", '-e', 'SET FOREIGN_KEY_CHECKS=1;'
])
print("Existing tables dropped.")
print("Importing the dump...")
with open(backup_sql, 'rb') as f:
run_command([
'docker', 'exec', '-i', container,
'mariadb', '-u', user, f"--password={password}", db_name
], stdin=f)
print("MariaDB recovery complete.")
def recover_files(volume_name, backup_files):
print(f"Inspecting volume {volume_name}...")
inspect = subprocess.run(['docker', 'volume', 'inspect', volume_name], stdout=subprocess.DEVNULL)
if inspect.returncode != 0:
print(f"Volume {volume_name} does not exist. Creating...")
run_command(['docker', 'volume', 'create', volume_name])
else:
print(f"Volume {volume_name} already exists.")
if not os.path.isdir(backup_files):
print(f"ERROR: Backup files folder '{backup_files}' does not exist.")
sys.exit(1)
print("Recovering files...")
run_command([
'docker', 'run', '--rm',
'-v', f"{volume_name}:/recover/",
'-v', f"{backup_files}:/backup/",
'kevinveenbirkenbach/alpine-rsync',
'sh', '-c', 'rsync -avv --delete /backup/ /recover/'
])
print("File recovery complete.")
def main():
parser = argparse.ArgumentParser(
description='Recover Docker volumes and database dumps from local backups.'
)
parser.add_argument('volume_name', help='Name of the Docker volume')
parser.add_argument('backup_hash', help='Hashed Machine ID')
parser.add_argument('version', help='Version to recover')
parser.add_argument('--db-type', choices=['postgres', 'mariadb'], help='Type of database backup')
parser.add_argument('--db-container', help='Docker container name for the database')
parser.add_argument('--db-password', help='Password for the database user')
parser.add_argument('--db-name', help='Name of the database')
parser.add_argument('--empty', action='store_true', help='Drop existing database objects before importing')
args = parser.parse_args()
volume = args.volume_name
backup_hash = args.backup_hash
version = args.version
backup_folder = os.path.join('Backups', backup_hash, 'backup-docker-to-local', version, volume)
backup_files = os.path.join(os.sep, backup_folder, 'files')
backup_sql = None
if args.db_name:
backup_sql = os.path.join(os.sep, backup_folder, 'sql', f"{args.db_name}.backup.sql")
# Database recovery
if args.db_type:
if not (args.db_container and args.db_password and args.db_name):
print("ERROR: A database backup exists, aber ein Parameter fehlt.")
sys.exit(1)
user = args.db_name
if args.db_type == 'postgres':
recover_postgres(args.db_container, args.db_password, args.db_name, user, backup_sql, empty=args.empty)
else:
recover_mariadb(args.db_container, args.db_password, args.db_name, user, backup_sql, empty=args.empty)
sys.exit(0)
# File recovery
recover_files(volume, backup_files)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
Restore multiple PostgreSQL databases from .backup.sql files via a Docker container.
Usage:
./restore_databases.py /path/to/backup_dir container_name
"""
import argparse
import subprocess
import sys
import os
import glob
def run_command(cmd, stdin=None):
"""
Run a subprocess command and abort immediately on any failure.
:param cmd: list of command parts
:param stdin: file-like object to use as stdin
"""
subprocess.run(cmd, stdin=stdin, check=True)
def main():
parser = argparse.ArgumentParser(
description="Restore Postgres databases from backup SQL files via Docker container."
)
parser.add_argument(
"backup_dir",
help="Path to directory containing .backup.sql files"
)
parser.add_argument(
"container",
help="Name of the Postgres Docker container"
)
args = parser.parse_args()
backup_dir = args.backup_dir
container = args.container
pattern = os.path.join(backup_dir, "*.backup.sql")
sql_files = sorted(glob.glob(pattern))
if not sql_files:
print(f"No .backup.sql files found in {backup_dir}", file=sys.stderr)
sys.exit(1)
for sqlfile in sql_files:
# Extract database name by stripping the full suffix '.backup.sql'
filename = os.path.basename(sqlfile)
if not filename.endswith('.backup.sql'):
continue
dbname = filename[:-len('.backup.sql')]
print(f"=== Processing {sqlfile} → database: {dbname} ===")
# Drop the database, forcing disconnect of sessions if necessary
run_command([
"docker", "exec", "-i", container,
"psql", "-U", "postgres", "-c",
f"DROP DATABASE IF EXISTS \"{dbname}\" WITH (FORCE);"
])
# Create a fresh database
run_command([
"docker", "exec", "-i", container,
"psql", "-U", "postgres", "-c",
f"CREATE DATABASE \"{dbname}\";"
])
# Ensure the ownership role exists
print(f"Ensuring role '{dbname}' exists...")
run_command([
"docker", "exec", "-i", container,
"psql", "-U", "postgres", "-c",
(
"DO $$BEGIN "
f"IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = '{dbname}') THEN "
f"CREATE ROLE \"{dbname}\"; "
"END IF; "
"END$$;"
)
])
# Restore the dump into the database by streaming file (will abort on first error)
print(f"Restoring dump into {dbname} (this may take a while)…")
with open(sqlfile, 'rb') as infile:
run_command([
"docker", "exec", "-i", container,
"psql", "-U", "postgres", "-d", dbname
], stdin=infile)
print(f"{dbname} restored.")
print("All databases have been restored.")
if __name__ == "__main__":
main()

0
tests/__init__.py Normal file
View File

0
tests/unit/__init__.py Normal file
View File

64
tests/unit/test_backup.py Normal file
View File

@@ -0,0 +1,64 @@
# tests/unit/test_backup.py
import unittest
from unittest.mock import patch
import importlib.util
import sys
import os
import pathlib
# Prevent actual directory creation in backup script import
dummy_mkdir = lambda self, *args, **kwargs: None
original_mkdir = pathlib.Path.mkdir
pathlib.Path.mkdir = dummy_mkdir
# Create a virtual databases.csv in the project root for the module import
test_dir = os.path.dirname(__file__)
project_root = os.path.abspath(os.path.join(test_dir, '../../'))
sys.path.insert(0, project_root)
db_csv_path = os.path.join(project_root, 'databases.csv')
with open(db_csv_path, 'w') as f:
f.write('instance;database;username;password\n')
# Dynamically load the hyphenated script as module 'backup'
script_path = os.path.join(project_root, 'backup-docker-to-local.py')
spec = importlib.util.spec_from_file_location('backup', script_path)
backup = importlib.util.module_from_spec(spec)
sys.modules['backup'] = backup
spec.loader.exec_module(backup)
# Restore original mkdir
pathlib.Path.mkdir = original_mkdir
class TestIsImageWhitelisted(unittest.TestCase):
@patch('backup.get_image_info')
def test_returns_true_when_image_matches(self, mock_get_image_info):
# Simulate a container image containing 'mastodon'
mock_get_image_info.return_value = ['repo/mastodon:v4']
images = ['mastodon', 'wordpress']
self.assertTrue(
backup.is_image_whitelisted('any_container', images),
"Should return True when at least one image substring matches"
)
@patch('backup.get_image_info')
def test_returns_false_when_no_image_matches(self, mock_get_image_info):
# Simulate a container image without matching substrings
mock_get_image_info.return_value = ['repo/nginx:latest']
images = ['mastodon', 'wordpress']
self.assertFalse(
backup.is_image_whitelisted('any_container', images),
"Should return False when no image substring matches"
)
@patch('backup.get_image_info')
def test_returns_false_with_empty_image_list(self, mock_get_image_info):
# Even if get_image_info returns something, an empty list yields False
mock_get_image_info.return_value = ['repo/element:1.0']
self.assertFalse(
backup.is_image_whitelisted('any_container', []),
"Should return False when the images list is empty"
)
if __name__ == '__main__':
unittest.main()