8 Commits

Author SHA1 Message Date
61e138c1a6 Optimize OpenLDAP container resources for up to 5k users (1.25 CPU / 1.5GB RAM / 1024 PIDs). See https://chatgpt.com/share/68ef7228-4028-800f-8986-54206a51b9c1 2025-10-15 12:06:51 +02:00
07c8e036ec Deactivated change when because its anyhow not trackable 2025-10-15 10:27:12 +02:00
0b36059cd2 feat(web-app-gitea): add optional Redis integration for caching, sessions, and queues
This update introduces conditional Redis support for Gitea, allowing connection
to either a local or centralized Redis instance depending on configuration.
Includes resource limits for the Redis service and corresponding environment
variables for cache, session, and queue backends.

Reference: ChatGPT conversation on centralized vs per-app Redis architecture (2025-10-15).
https://chatgpt.com/share/68ef5930-49c8-800f-b6b8-069e6fefda01
2025-10-15 10:20:18 +02:00
d76e384ae3 Enhance CertUtils to return the newest matching certificate and add comprehensive unit tests
- Added run_openssl_dates() to extract notBefore/notAfter timestamps.
- Modified mapping logic to store multiple cert entries per SAN with metadata.
- find_cert_for_domain() now selects the newest certificate based on notBefore and mtime.
- Exact SAN matches take precedence over wildcard matches.
- Added new unit tests (test_cert_utils_newest.py) verifying freshness logic, fallback handling, and wildcard behavior.

Reference: https://chatgpt.com/share/68ef4b4c-41d4-800f-9e50-5da4b6be1105
2025-10-15 09:21:00 +02:00
e6f4f3a6a4 feat(cli/build/defaults): ensure deterministic alphabetical sorting for applications and users
- Added sorting by application key and user key before YAML output.
- Ensures stable and reproducible file generation across runs.
- Added comprehensive unit tests verifying key order and output stability.

See: https://chatgpt.com/share/68ef4778-a848-800f-a50b-a46a3b878797
2025-10-15 09:04:39 +02:00
a80b26ed9e Moved bbb database seeding 2025-10-15 08:50:21 +02:00
45ec7b0ead Optimized include text 2025-10-15 08:39:37 +02:00
ec396d130c Optimized time schedule 2025-10-15 08:37:51 +02:00
13 changed files with 538 additions and 41 deletions

View File

@@ -83,6 +83,13 @@ class DefaultsGenerator:
print(f"Error during rendering: {e}", file=sys.stderr)
sys.exit(1)
# Sort applications by application key for stable output
apps = result.get("defaults_applications", {})
if isinstance(apps, dict) and apps:
result["defaults_applications"] = {
k: apps[k] for k in sorted(apps.keys())
}
# Write output
self.output_file.parent.mkdir(parents=True, exist_ok=True)
with self.output_file.open("w", encoding="utf-8") as f:

View File

@@ -220,6 +220,10 @@ def main():
print(f"Error building user entries: {e}", file=sys.stderr)
sys.exit(1)
# Sort users by key for deterministic output
if isinstance(users, dict) and users:
users = OrderedDict(sorted(users.items()))
# Convert OrderedDict into plain dict for YAML
default_users = {'default_users': users}
plain_data = dictify(default_users)

View File

@@ -1,4 +1,3 @@
# Service Timers
## Meta
@@ -30,22 +29,22 @@ SYS_SCHEDULE_HEALTH_NGINX: "*-*-* {{ HOURS_SERVER_AWAKE }}:00
SYS_SCHEDULE_HEALTH_MSMTP: "*-*-* 00:00:00" # Check once per day SMTP Server
### Schedule for cleanup tasks
SYS_SCHEDULE_CLEANUP_CERTS: "*-*-* 11,23:00:00" # Deletes and revokes unused certs
SYS_SCHEDULE_CLEANUP_FAILED_BACKUPS: "*-*-* 12:00:00" # Clean up failed docker backups every noon
SYS_SCHEDULE_CLEANUP_BACKUPS: "*-*-* 00,06,12,18:15:00" # Cleanup backups every 6 hours, MUST be called before disc space cleanup
SYS_SCHEDULE_CLEANUP_DISC_SPACE: "*-*-* 00,06,12,18:30:00" # Cleanup disc space every 6 hours
SYS_SCHEDULE_CLEANUP_CERTS: "*-*-* 20:00" # Deletes and revokes unused certs once per day
SYS_SCHEDULE_CLEANUP_FAILED_BACKUPS: "*-*-* 21:00" # Clean up failed docker backups once per day
SYS_SCHEDULE_CLEANUP_BACKUPS: "*-*-* 22:00" # Cleanup backups once per day, MUST be called before disc space cleanup
SYS_SCHEDULE_CLEANUP_DISC_SPACE: "*-*-* 23:00" # Cleanup disc space once per day
### Schedule for repair services
SYS_SCHEDULE_REPAIR_BTRFS_AUTO_BALANCER: "Sat *-*-01..07 00:00:00" # Execute btrfs auto balancer every first Saturday of a month
SYS_SCHEDULE_REPAIR_DOCKER_HARD: "Sun *-*-* 05:30:00" # Restart docker instances every Sunday at 8:00 AM
SYS_SCHEDULE_REPAIR_DOCKER_HARD: "Sun *-*-* 00:00:00" # Restart docker instances every Sunday
### Schedule for backup tasks
SYS_SCHEDULE_BACKUP_REMOTE_TO_LOCAL: "*-*-* 00:15:00" # Pull Backup of the previous day
SYS_SCHEDULE_BACKUP_DOCKER_TO_LOCAL: "*-*-* 00:30:00" # Backup the current day
SYS_SCHEDULE_BACKUP_REMOTE_TO_LOCAL: "*-*-* 00:30:00" # Pull Backup of the previous day
SYS_SCHEDULE_BACKUP_DOCKER_TO_LOCAL: "*-*-* 01:00:00" # Backup the current day
### Schedule for Maintenance Tasks
SYS_SCHEDULE_MAINTANANCE_LETSENCRYPT_RENEW: "*-*-* 12,00:15:00" # Renew Mailu certificates twice per day
SYS_SCHEDULE_MAINTANANCE_LETSENCRYPT_DEPLOY: "*-*-* 12,00:30:00" # Deploy letsencrypt certificates twice per day to docker containers
SYS_SCHEDULE_MAINTANANCE_LETSENCRYPT_RENEW: "*-*-* 10,22:00:00" # Renew Mailu certificates twice per day
SYS_SCHEDULE_MAINTANANCE_LETSENCRYPT_DEPLOY: "*-*-* 11,23:00:00" # Deploy letsencrypt certificates twice per day to docker containers
SYS_SCHEDULE_MAINTANANCE_NEXTCLOUD: "21" # Do nextcloud maintanace between 21:00 and 01:00
### Animation

View File

@@ -6,6 +6,7 @@ __metaclass__ = type
import os
import subprocess
import time
from datetime import datetime
class CertUtils:
_domain_cert_mapping = None
@@ -22,6 +23,30 @@ class CertUtils:
except subprocess.CalledProcessError:
return ""
@staticmethod
def run_openssl_dates(cert_path):
"""
Returns (not_before_ts, not_after_ts) as POSIX timestamps or (None, None) on failure.
"""
try:
output = subprocess.check_output(
['openssl', 'x509', '-in', cert_path, '-noout', '-startdate', '-enddate'],
universal_newlines=True
)
nb, na = None, None
for line in output.splitlines():
line = line.strip()
if line.startswith('notBefore='):
nb = line.split('=', 1)[1].strip()
elif line.startswith('notAfter='):
na = line.split('=', 1)[1].strip()
def _parse(openssl_dt):
# OpenSSL format example: "Oct 10 12:34:56 2025 GMT"
return int(datetime.strptime(openssl_dt, "%b %d %H:%M:%S %Y %Z").timestamp())
return (_parse(nb) if nb else None, _parse(na) if na else None)
except Exception:
return (None, None)
@staticmethod
def extract_sans(cert_text):
dns_entries = []
@@ -59,7 +84,6 @@ class CertUtils:
else:
return domain == san
@classmethod
def build_snapshot(cls, cert_base_path):
snapshot = []
@@ -82,6 +106,17 @@ class CertUtils:
@classmethod
def refresh_cert_mapping(cls, cert_base_path, debug=False):
"""
Build mapping: SAN -> list of entries
entry = {
'folder': str,
'cert_path': str,
'mtime': float,
'not_before': int|None,
'not_after': int|None,
'is_wildcard': bool
}
"""
cert_files = cls.list_cert_files(cert_base_path)
mapping = {}
for cert_path in cert_files:
@@ -90,46 +125,82 @@ class CertUtils:
continue
sans = cls.extract_sans(cert_text)
folder = os.path.basename(os.path.dirname(cert_path))
try:
mtime = os.stat(cert_path).st_mtime
except FileNotFoundError:
mtime = 0.0
nb, na = cls.run_openssl_dates(cert_path)
for san in sans:
if san not in mapping:
mapping[san] = folder
entry = {
'folder': folder,
'cert_path': cert_path,
'mtime': mtime,
'not_before': nb,
'not_after': na,
'is_wildcard': san.startswith('*.'),
}
mapping.setdefault(san, []).append(entry)
cls._domain_cert_mapping = mapping
if debug:
print(f"[DEBUG] Refreshed domain-to-cert mapping: {mapping}")
print(f"[DEBUG] Refreshed domain-to-cert mapping (counts): "
f"{ {k: len(v) for k, v in mapping.items()} }")
@classmethod
def ensure_cert_mapping(cls, cert_base_path, debug=False):
if cls._domain_cert_mapping is None or cls.snapshot_changed(cert_base_path):
cls.refresh_cert_mapping(cert_base_path, debug)
@staticmethod
def _score_entry(entry):
"""
Return tuple used for sorting newest-first:
(not_before or -inf, mtime)
"""
nb = entry.get('not_before')
mtime = entry.get('mtime', 0.0)
return (nb if nb is not None else -1, mtime)
@classmethod
def find_cert_for_domain(cls, domain, cert_base_path, debug=False):
cls.ensure_cert_mapping(cert_base_path, debug)
exact_match = None
wildcard_match = None
candidates_exact = []
candidates_wild = []
for san, folder in cls._domain_cert_mapping.items():
for san, entries in cls._domain_cert_mapping.items():
if san == domain:
exact_match = folder
break
if san.startswith('*.'):
candidates_exact.extend(entries)
elif san.startswith('*.'):
base = san[2:]
if domain.count('.') == base.count('.') + 1 and domain.endswith('.' + base):
wildcard_match = folder
candidates_wild.extend(entries)
if exact_match:
if debug:
print(f"[DEBUG] Exact match for {domain} found in {exact_match}")
return exact_match
def _pick_newest(entries):
if not entries:
return None
# newest by (not_before, mtime)
best = max(entries, key=cls._score_entry)
return best
if wildcard_match:
if debug:
print(f"[DEBUG] Wildcard match for {domain} found in {wildcard_match}")
return wildcard_match
best_exact = _pick_newest(candidates_exact)
best_wild = _pick_newest(candidates_wild)
if best_exact and debug:
print(f"[DEBUG] Best exact match for {domain}: {best_exact['folder']} "
f"(not_before={best_exact['not_before']}, mtime={best_exact['mtime']})")
if best_wild and debug:
print(f"[DEBUG] Best wildcard match for {domain}: {best_wild['folder']} "
f"(not_before={best_wild['not_before']}, mtime={best_wild['mtime']})")
# Prefer exact if it exists; otherwise wildcard
chosen = best_exact or best_wild
if chosen:
return chosen['folder']
if debug:
print(f"[DEBUG] No certificate folder found for {domain}")
return None

View File

@@ -8,6 +8,11 @@ docker:
image: "bitnamilegacy/openldap"
name: "openldap"
version: "latest"
cpus: 1.25
# Optimized up to 5k user
mem_reservation: 1g
mem_limit: 1.5g
pids_limit: 1024
network: "openldap"
volumes:
data: "openldap_data"

View File

@@ -57,8 +57,10 @@
- name: Fix ownership level 0..2 directories to backup:backup
ansible.builtin.shell: >
find "{{ BACKUPS_FOLDER_PATH }}" -mindepth 0 -maxdepth 2 -xdev -type d -exec chown backup:backup {} +
changed_when: false
- name: Fix perms level 0..2 directories to 0700
ansible.builtin.shell: >
find "{{ BACKUPS_FOLDER_PATH }}" -mindepth 0 -maxdepth 2 -xdev -type d -exec chmod 700 {} +
changed_when: false

View File

@@ -14,13 +14,14 @@
name: sys-stk-full-stateless
vars:
docker_compose_flush_handlers: false
- name: "include 04_seed-database-to-backup.yml"
include_tasks: "{{ [ playbook_dir, 'roles/sys-ctl-bkp-docker-2-loc/tasks/04_seed-database-to-backup.yml' ] | path_join }}"
- name: "Unset 'proxy_extra_configuration'"
set_fact:
proxy_extra_configuration: null
- name: "Include Seed routines for '{{ application_id }}' database backup"
include_tasks: "{{ [ playbook_dir, 'roles/sys-ctl-bkp-docker-2-loc/tasks/04_seed-database-to-backup.yml' ] | path_join }}"
- name: configure websocket_upgrade.conf
copy:
src: "websocket_upgrade.conf"

View File

@@ -47,7 +47,17 @@ docker:
version: "latest"
backup:
no_stop_required: true
port: 3000
name: "gitea"
port: 3000
name: "gitea"
cpus: 1.0
mem_reservation: 1g
mem_limit: 2g
pids_limit: 1024
redis:
enabled: false
cpus: 0.25
mem_reservation: 0.2g
mem_limit: 0.3g
pids_limit: 512
volumes:
data: "gitea_data"

View File

@@ -11,7 +11,7 @@ USER_GID=1000
# Logging configuration
GITEA__log__MODE=console
GITEA__log__LEVEL={% if MODE_DEBUG | bool %}Debug{% else %}Info{% endif %}
GITEA__log__LEVEL={% if MODE_DEBUG | bool %}Debug{% else %}Info{% endif %}
# Database
DB_TYPE=mysql
@@ -20,6 +20,28 @@ DB_NAME={{ database_name }}
DB_USER={{ database_username }}
DB_PASSWD={{ database_password }}
{% if GITEA_REDIS_ENABLED | bool %}
# ------------------------------------------------
# Redis Configuration for Gitea
# ------------------------------------------------
# @see https://docs.gitea.com/administration/config-cheat-sheet#cache-cache
GITEA__cache__ENABLED=true
GITEA__cache__ADAPTER=redis
# use a different Redis DB index than oauth2-proxy
GITEA__cache__HOST=redis://{{ GITEA_REDIS_ADDRESS }}/1
# Store sessions in Redis (instead of the internal DB)
GITEA__session__PROVIDER=redis
GITEA__session__PROVIDER_CONFIG=network=tcp,addr={{ GITEA_REDIS_ADDRESS }},db=2,pool_size=100,idle_timeout=180
# Use Redis for background task queues
GITEA__queue__TYPE=redis
GITEA__queue__CONN_STR=redis://{{ GITEA_REDIS_ADDRESS }}/3
{% endif %}
# SSH
SSH_PORT={{ports.public.ssh[application_id]}}
SSH_LISTEN_PORT=22

View File

@@ -22,9 +22,13 @@ GITEA_LDAP_AUTH_ARGS:
- '--email-attribute "{{ LDAP.USER.ATTRIBUTES.MAIL }}"'
- '--public-ssh-key-attribute "{{ LDAP.USER.ATTRIBUTES.SSH_PUBLIC_KEY }}"'
- '--synchronize-users'
GITEA_VERSION: "{{ applications | get_app_conf(application_id, 'docker.services.gitea.version') }}"
GITEA_IMAGE: "{{ applications | get_app_conf(application_id, 'docker.services.gitea.image') }}"
GITEA_CONTAINER: "{{ applications | get_app_conf(application_id, 'docker.services.gitea.name') }}"
GITEA_VOLUME: "{{ applications | get_app_conf(application_id, 'docker.volumes.data') }}"
GITEA_USER: "git"
GITEA_CONFIG: "/data/gitea/conf/app.ini"
GITEA_VERSION: "{{ applications | get_app_conf(application_id, 'docker.services.gitea.version') }}"
GITEA_IMAGE: "{{ applications | get_app_conf(application_id, 'docker.services.gitea.image') }}"
GITEA_CONTAINER: "{{ applications | get_app_conf(application_id, 'docker.services.gitea.name') }}"
GITEA_VOLUME: "{{ applications | get_app_conf(application_id, 'docker.volumes.data') }}"
GITEA_USER: "git"
GITEA_CONFIG: "/data/gitea/conf/app.ini"
## Redis
GITEA_REDIS_ENABLED: "{{ applications | get_app_conf(application_id, 'docker.services.redis.enabled') }}"
GITEA_REDIS_ADDRESS: "redis:6379"

View File

@@ -108,6 +108,89 @@ class TestGenerateDefaultApplications(unittest.TestCase):
self.assertIn("nocfgdirapp", apps)
self.assertEqual(apps["nocfgdirapp"], {})
def test_applications_sorted_by_key(self):
"""
Ensure that defaults_applications keys are written in alphabetical order.
"""
# Create several roles in non-sorted order
for name, cfg in [
("web-app-zeta", {"vars_id": "zeta", "cfg": "z: 1\n"}),
("web-app-alpha", {"vars_id": "alpha", "cfg": "a: 1\n"}),
("web-app-mu", {"vars_id": "mu", "cfg": "m: 1\n"}),
]:
role = self.roles_dir / name
(role / "vars").mkdir(parents=True, exist_ok=True)
(role / "config").mkdir(parents=True, exist_ok=True)
(role / "vars" / "main.yml").write_text(f"application_id: {cfg['vars_id']}\n")
(role / "config" / "main.yml").write_text(cfg["cfg"])
# Run generator
result = subprocess.run(
["python3", str(self.script_path),
"--roles-dir", str(self.roles_dir),
"--output-file", str(self.output_file)],
capture_output=True, text=True
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
# Validate order of keys in YAML
data = yaml.safe_load(self.output_file.read_text())
apps = data.get("defaults_applications", {})
# dict preserves insertion order in Python 3.7+, PyYAML keeps document order
keys_in_file = list(apps.keys())
self.assertEqual(
keys_in_file,
sorted(keys_in_file),
msg=f"Applications are not sorted: {keys_in_file}"
)
# Sanity: all expected apps present
for app in ("alpha", "mu", "zeta", "testapp"):
self.assertIn(app, apps)
def test_sorting_is_stable_across_runs(self):
"""
Running the generator multiple times yields identical content (stable sort).
"""
# Create a couple more roles (unsorted)
for name, appid in [
("web-app-beta", "beta"),
("web-app-delta", "delta"),
]:
role = self.roles_dir / name
(role / "vars").mkdir(parents=True, exist_ok=True)
(role / "config").mkdir(parents=True, exist_ok=True)
(role / "vars" / "main.yml").write_text(f"application_id: {appid}\n")
(role / "config" / "main.yml").write_text("key: value\n")
# First run
result1 = subprocess.run(
["python3", str(self.script_path),
"--roles-dir", str(self.roles_dir),
"--output-file", str(self.output_file)],
capture_output=True, text=True
)
self.assertEqual(result1.returncode, 0, msg=result1.stderr)
content_run1 = self.output_file.read_text()
# Second run (simulate potential filesystem order differences by touching dirs)
for p in self.roles_dir.iterdir():
os.utime(p, None)
result2 = subprocess.run(
["python3", str(self.script_path),
"--roles-dir", str(self.roles_dir),
"--output-file", str(self.output_file)],
capture_output=True, text=True
)
self.assertEqual(result2.returncode, 0, msg=result2.stderr)
content_run2 = self.output_file.read_text()
self.assertEqual(
content_run1, content_run2,
msg="Output differs between runs; sorting should be stable."
)
if __name__ == "__main__":
unittest.main()

View File

@@ -132,5 +132,122 @@ class TestGenerateUsers(unittest.TestCase):
finally:
shutil.rmtree(tmp)
def test_cli_users_sorted_by_key(self):
"""
Ensure that default_users keys are written in alphabetical order.
"""
import tempfile
import subprocess
from pathlib import Path
tmpdir = Path(tempfile.mkdtemp())
try:
roles_dir = tmpdir / "roles"
roles_dir.mkdir()
# Create multiple roles with users in unsorted order
for role, users_map in [
("role-zeta", {"zeta": {"email": "z@ex"}}),
("role-alpha", {"alpha": {"email": "a@ex"}}),
("role-mu", {"mu": {"email": "m@ex"}}),
("role-beta", {"beta": {"email": "b@ex"}}),
]:
(roles_dir / role / "users").mkdir(parents=True, exist_ok=True)
with open(roles_dir / role / "users" / "main.yml", "w") as f:
yaml.safe_dump({"users": users_map}, f)
out_file = tmpdir / "users.yml"
# Resolve script path like in other tests (relative to repo root)
script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "users.py"
# Run generator
result = subprocess.run(
["python3", str(script_path),
"--roles-dir", str(roles_dir),
"--output", str(out_file)],
capture_output=True, text=True
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertTrue(out_file.exists(), "Output file was not created.")
data = yaml.safe_load(out_file.read_text())
self.assertIn("default_users", data)
users_map = data["default_users"]
keys_in_file = list(users_map.keys())
# Expect alphabetical order
self.assertEqual(
keys_in_file, sorted(keys_in_file),
msg=f"Users are not sorted alphabetically: {keys_in_file}"
)
# Sanity: all expected keys present
for k in ["alpha", "beta", "mu", "zeta"]:
self.assertIn(k, users_map)
finally:
shutil.rmtree(tmpdir)
def test_cli_users_sorting_stable_across_runs(self):
"""
Running the generator multiple times yields identical content (stable sort).
"""
import tempfile
import subprocess
from pathlib import Path
tmpdir = Path(tempfile.mkdtemp())
try:
roles_dir = tmpdir / "roles"
roles_dir.mkdir()
# Unsorted creation order on purpose
cases = [
("role-d", {"duser": {"email": "d@ex"}}),
("role-a", {"auser": {"email": "a@ex"}}),
("role-c", {"cuser": {"email": "c@ex"}}),
("role-b", {"buser": {"email": "b@ex"}}),
]
for role, users_map in cases:
(roles_dir / role / "users").mkdir(parents=True, exist_ok=True)
with open(roles_dir / role / "users" / "main.yml", "w") as f:
yaml.safe_dump({"users": users_map}, f)
out_file = tmpdir / "users.yml"
script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "users.py"
# First run
r1 = subprocess.run(
["python3", str(script_path),
"--roles-dir", str(roles_dir),
"--output", str(out_file)],
capture_output=True, text=True
)
self.assertEqual(r1.returncode, 0, msg=r1.stderr)
content1 = out_file.read_text()
# Touch dirs to shuffle filesystem mtimes
for p in roles_dir.iterdir():
os.utime(p, None)
# Second run
r2 = subprocess.run(
["python3", str(script_path),
"--roles-dir", str(roles_dir),
"--output", str(out_file)],
capture_output=True, text=True
)
self.assertEqual(r2.returncode, 0, msg=r2.stderr)
content2 = out_file.read_text()
self.assertEqual(
content1, content2,
msg="Output differs between runs; user sorting should be stable."
)
finally:
shutil.rmtree(tmpdir)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
import os
import sys
import unittest
from types import SimpleNamespace
from unittest.mock import patch
# Add the project root/module_utils to the import path
CURRENT_DIR = os.path.dirname(__file__)
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, "../../.."))
sys.path.insert(0, PROJECT_ROOT)
from module_utils.cert_utils import CertUtils
def _san_block(*entries):
"""
Helper: builds a minimal OpenSSL text snippet that contains SAN entries.
Example: _san_block('example.com', '*.example.com')
"""
sans = ", ".join(f"DNS:{e}" for e in entries)
return f"""
Certificate:
Data:
Version: 3 (0x2)
...
X509v3 extensions:
X509v3 Subject Alternative Name:
{sans}
"""
class TestCertUtilsFindNewest(unittest.TestCase):
def setUp(self):
# Reset internal caches before each test
CertUtils._domain_cert_mapping = None
CertUtils._cert_snapshot = None
def _mock_stat_map(self, mtime_map, size_map=None):
size_map = size_map or {}
def _stat_side_effect(path):
return SimpleNamespace(
st_mtime=mtime_map.get(path, 0.0),
st_size=size_map.get(path, 1234),
)
return _stat_side_effect
def test_prefers_newest_by_not_before(self):
"""
Two certs with the same SAN 'www.example.com':
- a/cert.pem: older notBefore
- b/cert.pem: newer notBefore -> should be selected
"""
files = [
"/etc/letsencrypt/live/a/cert.pem",
"/etc/letsencrypt/live/b/cert.pem",
]
san_text = _san_block("www.example.com")
with patch.object(CertUtils, "list_cert_files", return_value=files), \
patch.object(CertUtils, "run_openssl", return_value=san_text), \
patch.object(CertUtils, "run_openssl_dates") as mock_dates, \
patch("os.stat", side_effect=self._mock_stat_map({
files[0]: 1000,
files[1]: 1001,
})):
mock_dates.side_effect = [(10, 100000), (20, 100000)] # older/newer
folder = CertUtils.find_cert_for_domain("www.example.com", "/etc/letsencrypt/live", debug=False)
self.assertEqual(folder, "b", "Should return the folder with the newest notBefore date.")
def test_fallback_to_mtime_when_not_before_missing(self):
"""
When not_before is missing, mtime should be used as a fallback.
"""
files = [
"/etc/letsencrypt/live/a/cert.pem",
"/etc/letsencrypt/live/b/cert.pem",
]
san_text = _san_block("www.example.com")
with patch.object(CertUtils, "list_cert_files", return_value=files), \
patch.object(CertUtils, "run_openssl", return_value=san_text), \
patch.object(CertUtils, "run_openssl_dates", return_value=(None, None)), \
patch("os.stat", side_effect=self._mock_stat_map({
files[0]: 1000,
files[1]: 2000,
})):
folder = CertUtils.find_cert_for_domain("www.example.com", "/etc/letsencrypt/live", debug=False)
self.assertEqual(folder, "b", "Should fall back to mtime and select the newest file.")
def test_exact_beats_wildcard_even_if_wildcard_newer(self):
"""
Exact matches must take precedence over wildcard matches,
even if the wildcard certificate is newer.
"""
files = [
"/etc/letsencrypt/live/exact/cert.pem",
"/etc/letsencrypt/live/wild/cert.pem",
]
text_exact = _san_block("api.example.com")
text_wild = _san_block("*.example.com")
with patch.object(CertUtils, "list_cert_files", return_value=files), \
patch.object(CertUtils, "run_openssl") as mock_text, \
patch.object(CertUtils, "run_openssl_dates") as mock_dates, \
patch("os.stat", side_effect=self._mock_stat_map({
files[0]: 1000, # exact is older
files[1]: 5000, # wildcard is much newer
})):
mock_text.side_effect = [text_exact, text_wild]
mock_dates.side_effect = [(10, 100000), (99, 100000)]
folder = CertUtils.find_cert_for_domain("api.example.com", "/etc/letsencrypt/live", debug=False)
self.assertEqual(
folder, "exact",
"Exact match must win even if the wildcard certificate is newer."
)
def test_wildcard_one_label_only(self):
"""
Wildcards (*.example.com) must only match one additional label.
"""
files = ["/etc/letsencrypt/live/wild/cert.pem"]
text_wild = _san_block("*.example.com")
with patch.object(CertUtils, "list_cert_files", return_value=files), \
patch.object(CertUtils, "run_openssl", return_value=text_wild), \
patch.object(CertUtils, "run_openssl_dates", return_value=(50, 100000)), \
patch("os.stat", side_effect=self._mock_stat_map({files[0]: 1000})):
# should match
self.assertEqual(
CertUtils.find_cert_for_domain("api.example.com", "/etc/letsencrypt/live"),
"wild"
)
# too deep -> should not match
self.assertIsNone(
CertUtils.find_cert_for_domain("deep.api.example.com", "/etc/letsencrypt/live"),
"Wildcard must not match multiple labels."
)
# base domain not covered
self.assertIsNone(
CertUtils.find_cert_for_domain("example.com", "/etc/letsencrypt/live"),
"Base domain is not covered by *.example.com."
)
def test_snapshot_refresh_rebuilds_mapping(self):
"""
ensure_cert_mapping() should rebuild mapping when snapshot changes.
"""
CertUtils._domain_cert_mapping = {"www.example.com": [{"folder": "old", "mtime": 1, "not_before": 1}]}
with patch.object(CertUtils, "snapshot_changed", return_value=True), \
patch.object(CertUtils, "refresh_cert_mapping") as mock_refresh:
def _set_new_mapping(cert_base_path, debug=False):
CertUtils._domain_cert_mapping = {
"www.example.com": [{"folder": "new", "mtime": 999, "not_before": 999}]
}
mock_refresh.side_effect = _set_new_mapping
folder = CertUtils.find_cert_for_domain("www.example.com", "/etc/letsencrypt/live", debug=False)
self.assertEqual(folder, "new", "Mapping must be refreshed when snapshot changes.")
if __name__ == "__main__":
unittest.main()