5 Commits

Author SHA1 Message Date
05d7ddc491 svc-bkp-rmt-2-loc: migrate pull script to Python + add unit tests; lock down backup-provider ACLs
- Replace Bash pull-specific-host.sh with Python pull-specific-host.py (argparse, identical logic)
- Update role vars and runner template to call python script
- Add __init__.py files for test discovery/imports
- Add unittest: tests/unit/roles/svc-bkp-rmt-2-loc/files/test_pull_specific_host.py (mocks subprocess/os/time; covers success, no types, find-fail, retry-exhaustion)
- Backup provider SSH wrapper: align allowed ls path (backup-docker-to-local)
- Split user role tasks: 01_core (sudoers), 02_permissions_ssh (SSH keys + wrapper), 03_permissions_folders (ownership + default ACLs + depth-limited chown/chmod)
- Ensure default ACLs grant rwx to 'backup' and none to group/other; keep sudo rsync working

Ref: ChatGPT discussion (2025-10-14) — https://chatgpt.com/share/68ee920a-9b98-800f-8806-ddcfe0255149
2025-10-14 20:10:49 +02:00
e54436821c Refactor sys-front-inj-all dependencies handling
Moved CDN and logout role inclusions into a dedicated '01_dependencies.yml' file for better modularity and reusability.
Added variable injection support via 'vars:' to allow flexible configuration like 'proxy_extra_configuration'.

See: https://chatgpt.com/share/68ee880d-cd80-800f-8dda-9e981631a5c7
2025-10-14 19:27:56 +02:00
ed73a37795 Improve get_app_conf robustness and add skip_missing_app parameter support
- Added new optional parameter 'skip_missing_app' to get_app_conf() in module_utils/config_utils.py to safely return defaults when applications are missing.
- Updated group_vars/all/00_general.yml and roles/web-app-nextcloud/config/main.yml to include skip_missing_app=True in all Nextcloud-related calls.
- Added comprehensive unit tests under tests/unit/module_utils/test_config_utils.py covering missing app handling, schema enforcement, nested lists, and index edge cases.

Ref: https://chatgpt.com/share/68ee6b5c-6db0-800f-bc20-d51470d7b39f
2025-10-14 17:25:37 +02:00
adff9271fd Solved rmt backup bugs 2025-10-14 16:29:42 +02:00
2f0fb2cb69 Merged network definitions before application definitions 2025-10-14 15:52:28 +02:00
23 changed files with 527 additions and 148 deletions

View File

@@ -76,8 +76,9 @@ _applications_nextcloud_oidc_flavor: >-
False,
'oidc_login'
if applications
| get_app_conf('web-app-nextcloud','features.ldap',False, True)
else 'sociallogin'
| get_app_conf('web-app-nextcloud','features.ldap',False, True, True)
else 'sociallogin',
True
)
}}

View File

@@ -24,7 +24,7 @@ class ConfigEntryNotSetError(AppConfigKeyError):
pass
def get_app_conf(applications, application_id, config_path, strict=True, default=None):
def get_app_conf(applications, application_id, config_path, strict=True, default=None, skip_missing_app=False):
# Path to the schema file for this application
schema_path = os.path.join('roles', application_id, 'schema', 'main.yml')
@@ -133,6 +133,9 @@ def get_app_conf(applications, application_id, config_path, strict=True, default
try:
obj = applications[application_id]
except KeyError:
if skip_missing_app:
# Simply return default instead of failing
return default if default is not None else False
raise AppConfigKeyError(
f"Application ID '{application_id}' not found in applications dict.\n"
f"path_trace: {path_trace}\n"

View File

@@ -153,6 +153,11 @@ roles:
description: "Core AI building blocks—model serving, OpenAI-compatible gateways, vector databases, orchestration, and chat UIs."
icon: "fas fa-brain"
invokable: true
bkp:
title: "Backup Services"
description: "Service-level backup and recovery components—handling automated data snapshots, remote backups, synchronization services, and backup orchestration across databases, files, and containers."
icon: "fas fa-database"
invokable: true
user:
title: "Users & Access"
description: "User accounts & access control"

View File

View File

@@ -0,0 +1,132 @@
#!/usr/bin/env python3
import argparse
import os
import subprocess
import time
import sys
def run_command(command, capture_output=True, check=False, shell=True):
"""Run a shell command and return its output as string."""
try:
result = subprocess.run(
command,
capture_output=capture_output,
shell=shell,
text=True,
check=check
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
if capture_output:
print(e.stdout)
print(e.stderr)
raise
def pull_backups(hostname: str):
print(f"pulling backups from: {hostname}")
errors = 0
print("loading meta data...")
remote_host = f"backup@{hostname}"
print(f"host address: {remote_host}")
remote_machine_id = run_command(f'ssh "{remote_host}" sha256sum /etc/machine-id')[:64]
print(f"remote machine id: {remote_machine_id}")
general_backup_machine_dir = f"/Backups/{remote_machine_id}/"
print(f"backup dir: {general_backup_machine_dir}")
try:
remote_backup_types = run_command(
f'ssh "{remote_host}" "find {general_backup_machine_dir} -maxdepth 1 -type d -execdir basename {{}} ;"'
).splitlines()
print(f"backup types: {' '.join(remote_backup_types)}")
except subprocess.CalledProcessError:
sys.exit(1)
for backup_type in remote_backup_types:
if backup_type == remote_machine_id:
continue
print(f"backup type: {backup_type}")
general_backup_type_dir = f"{general_backup_machine_dir}{backup_type}/"
general_versions_dir = general_backup_type_dir
# local previous version
try:
local_previous_version_dir = run_command(f"ls -d {general_versions_dir}* | tail -1")
except subprocess.CalledProcessError:
local_previous_version_dir = ""
print(f"last local backup: {local_previous_version_dir}")
# remote versions
remote_backup_versions = run_command(
f'ssh "{remote_host}" "ls -d /Backups/{remote_machine_id}/backup-docker-to-local/*"'
).splitlines()
print(f"remote backup versions: {' '.join(remote_backup_versions)}")
remote_last_backup_dir = remote_backup_versions[-1] if remote_backup_versions else ""
print(f"last remote backup: {remote_last_backup_dir}")
remote_source_path = f"{remote_host}:{remote_last_backup_dir}/"
print(f"source path: {remote_source_path}")
local_backup_destination_path = remote_last_backup_dir
print(f"backup destination: {local_backup_destination_path}")
print("creating local backup destination folder...")
os.makedirs(local_backup_destination_path, exist_ok=True)
rsync_command = (
f'rsync -abP --delete --delete-excluded --rsync-path="sudo rsync" '
f'--link-dest="{local_previous_version_dir}" "{remote_source_path}" "{local_backup_destination_path}"'
)
print("starting backup...")
print(f"executing: {rsync_command}")
retry_count = 0
max_retries = 12
retry_delay = 300 # 5 minutes
last_retry_start = 0
max_retry_duration = 43200 # 12 hours
rsync_exit_code = 1
while retry_count < max_retries:
print(f"Retry attempt: {retry_count + 1}")
if retry_count > 0:
current_time = int(time.time())
last_retry_duration = current_time - last_retry_start
if last_retry_duration >= max_retry_duration:
print("Last retry took more than 12 hours, increasing max retries to 12.")
max_retries = 12
last_retry_start = int(time.time())
rsync_exit_code = os.system(rsync_command)
if rsync_exit_code == 0:
break
retry_count += 1
time.sleep(retry_delay)
if rsync_exit_code != 0:
print(f"Error: rsync failed after {max_retries} attempts")
errors += 1
sys.exit(errors)
def main():
parser = argparse.ArgumentParser(
description="Pull backups from a remote backup host via rsync."
)
parser.add_argument(
"hostname",
help="Hostname from which backup should be pulled"
)
args = parser.parse_args()
pull_backups(args.hostname)
if __name__ == "__main__":
main()

View File

@@ -1,85 +0,0 @@
#!/bin/bash
# @param $1 hostname from which backup should be pulled
echo "pulling backups from: $1" &&
# error counter
errors=0 &&
echo "loading meta data..." &&
remote_host="backup@$1" &&
echo "host address: $remote_host" &&
remote_machine_id="$( (ssh "$remote_host" sha256sum /etc/machine-id) | head -c 64 )" &&
echo "remote machine id: $remote_machine_id" &&
general_backup_machine_dir="/Backups/$remote_machine_id/" &&
echo "backup dir: $general_backup_machine_dir" &&
remote_backup_types="$(ssh "$remote_host" "find $general_backup_machine_dir -maxdepth 1 -type d -execdir basename {} ;")" &&
echo "backup types: $remote_backup_types" || exit 1
for backup_type in $remote_backup_types; do
if [ "$backup_type" != "$remote_machine_id" ]; then
echo "backup type: $backup_type" &&
general_backup_type_dir="$general_backup_machine_dir""$backup_type/" &&
general_versions_dir="$general_backup_type_dir" &&
local_previous_version_dir="$(ls -d $general_versions_dir* | tail -1)" &&
echo "last local backup: $local_previous_version_dir" &&
remote_backup_versions="$(ssh "$remote_host" ls -d "$general_backup_type_dir"\*)" &&
echo "remote backup versions: $remote_backup_versions" &&
remote_last_backup_dir=$(echo "$remote_backup_versions" | tail -1) &&
echo "last remote backup: $remote_last_backup_dir" &&
remote_source_path="$remote_host:$remote_last_backup_dir/" &&
echo "source path: $remote_source_path" &&
local_backup_destination_path=$remote_last_backup_dir &&
echo "backup destination: $local_backup_destination_path" &&
echo "creating local backup destination folder..." &&
mkdir -vp "$local_backup_destination_path" &&
echo "starting backup..."
rsync_command='rsync -abP --delete --delete-excluded --rsync-path="sudo rsync" --link-dest="'$local_previous_version_dir'" "'$remote_source_path'" "'$local_backup_destination_path'"'
echo "executing: $rsync_command"
retry_count=0
max_retries=12
retry_delay=300 # Retry delay in seconds (5 minutes)
last_retry_start=0
max_retry_duration=43200 # Maximum duration for a single retry attempt (12 hours)
while [[ $retry_count -lt $max_retries ]]; do
echo "Retry attempt: $((retry_count + 1))"
if [[ $retry_count -gt 0 ]]; then
current_time=$(date +%s)
last_retry_duration=$((current_time - last_retry_start))
if [[ $last_retry_duration -ge $max_retry_duration ]]; then
echo "Last retry took more than 12 hours, increasing max retries to 12."
max_retries=12
fi
fi
last_retry_start=$(date +%s)
eval "$rsync_command"
rsync_exit_code=$?
if [[ $rsync_exit_code -eq 0 ]]; then
break
fi
retry_count=$((retry_count + 1))
sleep $retry_delay
done
if [[ $rsync_exit_code -ne 0 ]]; then
echo "Error: rsync failed after $max_retries attempts"
((errors += 1))
fi
fi
done
exit $errors;

View File

@@ -10,15 +10,15 @@
- include_tasks: utils/run_once.yml
when: run_once_svc_bkp_rmt_2_loc is not defined
- name: "create {{ DOCKER_BACKUP_REMOTE_2_LOCAL_DIR }}"
- name: "Create Directory '{{ DOCKER_BACKUP_REMOTE_2_LOCAL_DIR }}'"
file:
path: "{{ DOCKER_BACKUP_REMOTE_2_LOCAL_DIR }}"
state: directory
mode: "0755"
- name: create svc-bkp-rmt-2-loc.sh
- name: "Deploy '{{ DOCKER_BACKUP_REMOTE_2_LOCAL_SCRIPT }}'"
copy:
src: svc-bkp-rmt-2-loc.sh
src: "{{ DOCKER_BACKUP_REMOTE_2_LOCAL_FILE }}"
dest: "{{ DOCKER_BACKUP_REMOTE_2_LOCAL_SCRIPT }}"
mode: "0755"

View File

@@ -3,6 +3,6 @@
hosts="{{ DOCKER_BACKUP_REMOTE_2_LOCAL_BACKUP_PROVIDERS | join(' ') }}";
errors=0
for host in $hosts; do
bash {{ DOCKER_BACKUP_REMOTE_2_LOCAL_SCRIPT }} $host || ((errors+=1));
python {{ DOCKER_BACKUP_REMOTE_2_LOCAL_SCRIPT }} $host || ((errors+=1));
done;
exit $errors;

View File

@@ -1,5 +1,9 @@
# General
application_id: svc-bkp-rmt-2-loc
system_service_id: "{{ application_id }}"
system_service_id: "{{ application_id }}"
# Role Specific
DOCKER_BACKUP_REMOTE_2_LOCAL_DIR: '{{ PATH_ADMINISTRATOR_SCRIPTS }}{{ application_id }}/'
DOCKER_BACKUP_REMOTE_2_LOCAL_SCRIPT: "{{ DOCKER_BACKUP_REMOTE_2_LOCAL_DIR }}svc-bkp-rmt-2-loc.sh"
DOCKER_BACKUP_REMOTE_2_LOCAL_FILE: 'pull-specific-host.py'
DOCKER_BACKUP_REMOTE_2_LOCAL_SCRIPT: "{{ [ DOCKER_BACKUP_REMOTE_2_LOCAL_DIR , DOCKER_BACKUP_REMOTE_2_LOCAL_FILE ] | path_join }}"
DOCKER_BACKUP_REMOTE_2_LOCAL_BACKUP_PROVIDERS: "{{ applications | get_app_conf(application_id, 'backup_providers') }}"

View File

@@ -13,7 +13,7 @@ get_backup_types="find /Backups/$hashed_machine_id/ -maxdepth 1 -type d -execdir
# @todo This configuration is not scalable yet. If other backup services then sys-ctl-bkp-docker-2-loc are integrated, this logic needs to be optimized
get_version_directories="ls -d /Backups/$hashed_machine_id/sys-ctl-bkp-docker-2-loc/*"
get_version_directories="ls -d /Backups/$hashed_machine_id/backup-docker-to-local/*"
last_version_directory="$($get_version_directories | tail -1)"
rsync_command="sudo rsync --server --sender -blogDtpre.iLsfxCIvu . $last_version_directory/"

View File

@@ -3,30 +3,6 @@
name: backup
create_home: yes
- name: create .ssh directory
file:
path: /home/backup/.ssh
state: directory
owner: backup
group: backup
mode: '0700'
- name: create /home/backup/.ssh/authorized_keys
template:
src: "authorized_keys.j2"
dest: /home/backup/.ssh/authorized_keys
owner: backup
group: backup
mode: '0644'
- name: create /home/backup/ssh-wrapper.sh
copy:
src: "ssh-wrapper.sh"
dest: /home/backup/ssh-wrapper.sh
owner: backup
group: backup
mode: '0700'
- name: grant backup sudo rights
copy:
src: "backup"
@@ -35,3 +11,9 @@
owner: root
group: root
notify: sshd restart
- include_tasks: 02_permissions_ssh.yml
- include_tasks: 03_permissions_folders.yml
- include_tasks: utils/run_once.yml

View File

@@ -0,0 +1,23 @@
- name: create .ssh directory
file:
path: /home/backup/.ssh
state: directory
owner: backup
group: backup
mode: '0700'
- name: create /home/backup/.ssh/authorized_keys
template:
src: "authorized_keys.j2"
dest: /home/backup/.ssh/authorized_keys
owner: backup
group: backup
mode: '0644'
- name: create /home/backup/ssh-wrapper.sh
copy:
src: "ssh-wrapper.sh"
dest: /home/backup/ssh-wrapper.sh
owner: backup
group: backup
mode: '0700'

View File

@@ -0,0 +1,64 @@
# Ensure the backups root exists and is owned by backup
- name: Ensure backups root exists and owned by backup
file:
path: "{{ BACKUPS_FOLDER_PATH }}"
state: directory
owner: backup
group: backup
mode: "0700"
# Explicit ACL so 'backup' has rwx, others none
- name: Grant ACL rwx on backups root to backup user
ansible.posix.acl:
path: "{{ BACKUPS_FOLDER_PATH }}"
entity: backup
etype: user
permissions: rwx
state: present
# Set default ACLs so new entries inherit rwx for backup and nothing for others
- name: Set default ACL (inherit) for backup user under backups root
ansible.posix.acl:
path: "{{ BACKUPS_FOLDER_PATH }}"
entity: backup
etype: user
permissions: rwx
default: true
state: present
# Remove default ACLs for group/others (defensive hardening)
# Default ACLs so new entries inherit only backup's rwx
- name: Default ACL for backup user (inherit)
ansible.posix.acl:
path: "{{ BACKUPS_FOLDER_PATH }}"
etype: user
entity: backup
permissions: rwx
default: true
state: present
# Explicitly set default group/other to no permissions (instead of absent)
- name: Default ACL for group -> none
ansible.posix.acl:
path: "{{ BACKUPS_FOLDER_PATH }}"
etype: group
permissions: '---'
default: true
state: present
- name: Default ACL for other -> none
ansible.posix.acl:
path: "{{ BACKUPS_FOLDER_PATH }}"
etype: other
permissions: '---'
default: true
state: present
- name: Fix ownership level 0..2 directories to backup:backup
ansible.builtin.shell: >
find "{{ BACKUPS_FOLDER_PATH }}" -mindepth 0 -maxdepth 2 -xdev -type d -exec chown backup:backup {} +
- name: Fix perms level 0..2 directories to 0700
ansible.builtin.shell: >
find "{{ BACKUPS_FOLDER_PATH }}" -mindepth 0 -maxdepth 2 -xdev -type d -exec chmod 700 {} +

View File

@@ -1,4 +1,2 @@
- block:
- include_tasks: 01_core.yml
- include_tasks: utils/run_once.yml
- include_tasks: 01_core.yml
when: run_once_sys_bkp_provider_user is not defined

View File

@@ -0,0 +1,16 @@
- name: "Load CDN for '{{ domain }}'"
include_role:
name: web-svc-cdn
public: false
when:
- application_id != 'web-svc-cdn'
- run_once_web_svc_cdn is not defined
- name: Load Logout for '{{ domain }}'
include_role:
name: web-svc-logout
public: false
when:
- run_once_web_svc_logout is not defined
- application_id != 'web-svc-logout'
- inj_enabled.logout

View File

@@ -10,22 +10,10 @@
set_fact:
inj_enabled: "{{ applications | inj_enabled(application_id, SRV_WEB_INJ_COMP_FEATURES_ALL) }}"
- name: "Load CDN for '{{ domain }}'"
include_role:
name: web-svc-cdn
public: false
when:
- application_id != 'web-svc-cdn'
- run_once_web_svc_cdn is not defined
- name: Load Logout for '{{ domain }}'
include_role:
name: web-svc-logout
public: false
when:
- run_once_web_svc_logout is not defined
- application_id != 'web-svc-logout'
- inj_enabled.logout
- name: "Included dependent services"
include_tasks: 01_dependencies.yml
vars:
proxy_extra_configuration: ""
- name: Reinitialize 'inj_enabled' for '{{ domain }}', after loading the required webservices
set_fact:

View File

@@ -91,7 +91,7 @@ docker:
mem_reservation: "128m"
mem_limit: "512m"
pids_limit: 256
enabled: "{{ applications | get_app_conf('web-app-nextcloud', 'features.oidc', False) }}" # Activate OIDC for Nextcloud
enabled: "{{ applications | get_app_conf('web-app-nextcloud', 'features.oidc', False, True, True) }}" # Activate OIDC for Nextcloud
# floavor decides which OICD plugin should be used.
# Available options: oidc_login, sociallogin
# @see https://apps.nextcloud.com/apps/oidc_login
@@ -194,7 +194,7 @@ plugins:
enabled: false
fileslibreofficeedit:
# Nextcloud LibreOffice integration: allows online editing of documents with LibreOffice (https://apps.nextcloud.com/apps/fileslibreofficeedit)
enabled: "{{ not (applications | get_app_conf('web-app-nextcloud', 'plugins.richdocuments.enabled', False, True)) }}"
enabled: "{{ not (applications | get_app_conf('web-app-nextcloud', 'plugins.richdocuments.enabled', False, True, True)) }}"
forms:
# Nextcloud forms: facilitates creation of forms and surveys (https://apps.nextcloud.com/apps/forms)
enabled: true
@@ -292,13 +292,13 @@ plugins:
# enabled: false
twofactor_nextcloud_notification:
# Nextcloud two-factor notification: sends notifications for two-factor authentication events (https://apps.nextcloud.com/apps/twofactor_nextcloud_notification)
enabled: "{{ not applications | get_app_conf('web-app-nextcloud', 'features.oidc', False, True) }}" # Deactivate 2FA if oidc is active
enabled: "{{ not applications | get_app_conf('web-app-nextcloud', 'features.oidc', False, True, True) }}" # Deactivate 2FA if oidc is active
twofactor_totp:
# Nextcloud two-factor TOTP: provides time-based one-time password authentication (https://apps.nextcloud.com/apps/twofactor_totp)
enabled: "{{ not applications | get_app_conf('web-app-nextcloud', 'features.oidc', False, True) }}" # Deactivate 2FA if oidc is active
enabled: "{{ not applications | get_app_conf('web-app-nextcloud', 'features.oidc', False, True, True) }}" # Deactivate 2FA if oidc is active
user_ldap:
# Nextcloud user LDAP: integrates LDAP for user management and authentication (https://apps.nextcloud.com/apps/user_ldap)
enabled: "{{ applications | get_app_conf('web-app-nextcloud', 'features.ldap', False, True) }}"
enabled: "{{ applications | get_app_conf('web-app-nextcloud', 'features.ldap', False, True, True) }}"
user_directory:
enabled: true # Enables the LDAP User Directory Search
user_oidc:

View File

@@ -16,6 +16,10 @@
users: "{{ default_users | combine(users| default({}), recursive=True) }}"
no_log: "{{ MASK_CREDENTIALS_IN_LOGS | bool }}"
- name: Merge networks definitions
set_fact:
networks: "{{ defaults_networks | combine(networks | default({}, true), recursive=True) }}"
- name: Merge application definitions
set_fact:
applications: "{{ defaults_applications | merge_with_defaults(applications | default({}, true)) }}"
@@ -92,10 +96,6 @@
)) |
generate_all_domains(WWW_REDIRECT_ENABLED | bool)
}}
- name: Merge networks definitions
set_fact:
networks: "{{ defaults_networks | combine(networks | default({}, true), recursive=True) }}"
- name: Merge OIDC configuration
set_fact:
@@ -128,6 +128,7 @@
- svc-net # 3. Load network roles
- svc-db # 4. Load database roles
- svc-prx # 5. Load proxy roles
- svc-ai # 6. Load ai roles
- svc-ai # 6. Load AI roles
- svc-bkp # 7. Load Backup Roles
loop_control:
label: "{{ item }}-roles.yml"

View File

@@ -0,0 +1,125 @@
import os
import shutil
import tempfile
import unittest
from module_utils.config_utils import (
get_app_conf,
AppConfigKeyError,
ConfigEntryNotSetError,
)
class TestGetAppConf(unittest.TestCase):
def setUp(self):
# Isolate working directory so that schema files can be discovered
self._cwd = os.getcwd()
self.tmpdir = tempfile.mkdtemp(prefix="cfgutilstest_")
os.chdir(self.tmpdir)
# Minimal schema structure:
# roles/web-app-demo/schema/main.yml
os.makedirs(os.path.join("roles", "web-app-demo", "schema"), exist_ok=True)
with open(os.path.join("roles", "web-app-demo", "schema", "main.yml"), "w") as f:
f.write(
# Defines 'features.defined_but_unset' in schema (without a value in applications),
# plus 'features.oidc' and 'features.nested.list'
"features:\n"
" oidc: {}\n"
" defined_but_unset: {}\n"
" nested:\n"
" list:\n"
" - {}\n"
)
# Example configuration with actual values
self.applications = {
"web-app-demo": {
"features": {
"oidc": True,
"nested": {
"list": ["first", "second"]
}
}
}
}
def tearDown(self):
os.chdir(self._cwd)
shutil.rmtree(self.tmpdir, ignore_errors=True)
# --- Tests ---
def test_missing_app_with_skip_missing_app_returns_default_true(self):
"""If app ID is missing and skip_missing_app=True, it should return the default (True)."""
apps = {"some-other-app": {}}
val = get_app_conf(apps, "web-app-nextcloud", "features.oidc",
strict=True, default=True, skip_missing_app=True)
self.assertTrue(val)
def test_missing_app_with_skip_missing_app_returns_default_false(self):
"""If app ID is missing and skip_missing_app=True, it should return the default (False)."""
apps = {"svc-bkp-rmt-2-loc": {}}
val = get_app_conf(apps, "web-app-nextcloud", "features.oidc",
strict=True, default=False, skip_missing_app=True)
self.assertFalse(val)
def test_missing_app_without_skip_missing_app_and_strict_true_raises(self):
"""Missing app ID without skip_missing_app and strict=True should raise."""
apps = {}
with self.assertRaises(AppConfigKeyError):
get_app_conf(apps, "web-app-nextcloud", "features.oidc",
strict=True, default=True, skip_missing_app=False)
def test_missing_app_without_skip_missing_app_and_strict_false_raises(self):
apps = {}
with self.assertRaises(AppConfigKeyError):
get_app_conf(apps, "web-app-nextcloud", "features.oidc",
strict=False, default=True, skip_missing_app=False)
def test_existing_app_returns_expected_value(self):
"""Existing app and key should return the configured value."""
val = get_app_conf(self.applications, "web-app-demo", "features.oidc",
strict=True, default=False, skip_missing_app=False)
self.assertTrue(val)
def test_nested_list_index_access(self):
"""Accessing list indices should work correctly."""
val0 = get_app_conf(self.applications, "web-app-demo", "features.nested.list[0]",
strict=True, default=None, skip_missing_app=False)
val1 = get_app_conf(self.applications, "web-app-demo", "features.nested.list[1]",
strict=True, default=None, skip_missing_app=False)
self.assertEqual(val0, "first")
self.assertEqual(val1, "second")
def test_schema_defined_but_unset_raises_in_strict_mode(self):
"""Schema-defined but unset value should raise in strict mode."""
with self.assertRaises(ConfigEntryNotSetError):
get_app_conf(self.applications, "web-app-demo", "features.defined_but_unset",
strict=True, default=False, skip_missing_app=False)
def test_schema_defined_but_unset_strict_false_returns_default(self):
"""Schema-defined but unset value should return default when strict=False."""
val = get_app_conf(self.applications, "web-app-demo", "features.defined_but_unset",
strict=False, default=True, skip_missing_app=False)
self.assertTrue(val)
def test_invalid_key_format_raises(self):
"""Invalid key format in path should raise AppConfigKeyError."""
with self.assertRaises(AppConfigKeyError):
get_app_conf(self.applications, "web-app-demo", "features.nested.list[not-an-int]",
strict=True, default=None, skip_missing_app=False)
def test_index_out_of_range_respects_strict(self):
"""Out-of-range index should respect strict parameter."""
# strict=False returns default
val = get_app_conf(self.applications, "web-app-demo", "features.nested.list[99]",
strict=False, default="fallback", skip_missing_app=False)
self.assertEqual(val, "fallback")
# strict=True raises
with self.assertRaises(AppConfigKeyError):
get_app_conf(self.applications, "web-app-demo", "features.nested.list[99]",
strict=True, default=None, skip_missing_app=False)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,122 @@
import unittest
import sys
import types
from pathlib import Path
from unittest.mock import patch, MagicMock
import subprocess
import time
import os
def load_module():
"""
Dynamically load the target script:
roles/svc-bkp-rmt-2-loc/files/pull-specific-host.py
relative to this test file.
"""
here = Path(__file__).resolve()
# tests/unit/roles/svc-bkp-rmt-2-loc/files -> up 5 levels to repo root
repo_root = here.parents[5]
target_path = repo_root / "roles" / "svc-bkp-rmt-2-loc" / "files" / "pull-specific-host.py"
if not target_path.exists():
raise FileNotFoundError(f"Cannot find script at {target_path}")
spec = types.ModuleType("pull_specific_host_module")
code = target_path.read_text(encoding="utf-8")
exec(compile(code, str(target_path), "exec"), spec.__dict__)
return spec
class PullSpecificHostTests(unittest.TestCase):
def setUp(self):
self.mod = load_module()
self.hash64 = "a" * 64
self.host = "1.2.3.4"
self.remote = f"backup@{self.host}"
self.base = f"/Backups/{self.hash64}/"
self.backup_type = "backup-docker-to-local"
self.type_dir = f"{self.base}{self.backup_type}/"
self.last_local = f"{self.type_dir}20250101000000"
self.last_remote = f"{self.type_dir}20250202000000"
def _completed(self, stdout="", returncode=0):
return subprocess.CompletedProcess(args="mock", returncode=returncode, stdout=stdout, stderr="")
def _run_side_effect_success(self, command, capture_output=True, shell=True, text=True, check=False):
cmd = command if isinstance(command, str) else " ".join(command)
if cmd.startswith(f'ssh "{self.remote}" sha256sum /etc/machine-id'):
return self._completed(stdout=f"{self.hash64} /etc/machine-id\n")
if cmd.startswith(f'ssh "{self.remote}" "find {self.base} -maxdepth 1 -type d -execdir basename {{}} ;"'):
return self._completed(stdout=f"{self.hash64}\n{self.backup_type}\n")
if cmd.startswith(f"ls -d {self.type_dir}* | tail -1"):
return self._completed(stdout=self.last_local)
if cmd.startswith(f'ssh "{self.remote}" "ls -d {self.type_dir}*'):
return self._completed(stdout=f"{self.last_remote}\n")
return self._completed(stdout="")
def _run_side_effect_find_fail(self, command, capture_output=True, shell=True, text=True, check=False):
cmd = command if isinstance(command, str) else " ".join(command)
if cmd.startswith(f'ssh "backup@{self.host}" "find {self.base} -maxdepth 1 -type d -execdir basename {{}} ;"'):
raise subprocess.CalledProcessError(returncode=1, cmd=cmd, output="", stderr="find: error")
if cmd.startswith(f'ssh "backup@{self.host}" sha256sum /etc/machine-id'):
return self._completed(stdout=f"{self.hash64} /etc/machine-id\n")
return self._completed(stdout="")
def _run_side_effect_no_types(self, command, capture_output=True, shell=True, text=True, check=False):
cmd = command if isinstance(command, str) else " ".join(command)
if cmd.startswith(f'ssh "{self.remote}" sha256sum /etc/machine-id'):
return self._completed(stdout=f"{self.hash64} /etc/machine-id\n")
if cmd.startswith(f'ssh "{self.remote}" "find {self.base} -maxdepth 1 -type d -execdir basename {{}} ;"'):
return self._completed(stdout="")
return self._completed(stdout="")
@patch("time.sleep", new=lambda *a, **k: None)
@patch.object(os, "makedirs")
@patch.object(os, "system")
@patch.object(subprocess, "run")
def test_success_rsync_zero_exit(self, mock_run, mock_system, _mkd):
mock_run.side_effect = self._run_side_effect_success
mock_system.return_value = 0
with self.assertRaises(SystemExit) as cm:
self.mod.pull_backups(self.host)
self.assertEqual(cm.exception.code, 0)
self.assertTrue(mock_system.called, "rsync (os.system) should be called")
@patch("time.sleep", new=lambda *a, **k: None)
@patch.object(os, "makedirs")
@patch.object(os, "system")
@patch.object(subprocess, "run")
def test_no_backup_types_exit_zero(self, mock_run, mock_system, _mkd):
mock_run.side_effect = self._run_side_effect_no_types
mock_system.return_value = 0
with self.assertRaises(SystemExit) as cm:
self.mod.pull_backups(self.host)
self.assertEqual(cm.exception.code, 0)
self.assertFalse(mock_system.called, "rsync should not be called when no types found")
@patch("time.sleep", new=lambda *a, **k: None)
@patch.object(os, "makedirs")
@patch.object(os, "system")
@patch.object(subprocess, "run")
def test_find_failure_exits_one(self, mock_run, mock_system, _mkd):
mock_run.side_effect = self._run_side_effect_find_fail
mock_system.return_value = 0
with self.assertRaises(SystemExit) as cm:
self.mod.pull_backups(self.host)
self.assertEqual(cm.exception.code, 1)
self.assertFalse(mock_system.called, "rsync should not be called when find fails")
@patch("time.sleep", new=lambda *a, **k: None)
@patch.object(os, "makedirs")
@patch.object(os, "system")
@patch.object(subprocess, "run")
def test_rsync_fails_after_retries_exit_nonzero(self, mock_run, mock_system, _mkd):
mock_run.side_effect = self._run_side_effect_success
mock_system.side_effect = [1] * 12 # 12 retries in the script
with self.assertRaises(SystemExit) as cm:
self.mod.pull_backups(self.host)
self.assertEqual(cm.exception.code, 1)
self.assertEqual(mock_system.call_count, 12, "rsync should have retried 12 times")
if __name__ == "__main__":
unittest.main()