mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-11-26 06:43:10 +00:00
Refactor JVM memory filters, add Redis sizing and Docker cleanup service
- Replace jvm_filters with unified memory_filters (JVM + Redis helpers) - Add redis_maxmemory_mb filter and unit tests - Introduce sys-ctl-cln-docker role (systemd-based Docker prune + anon volumes) - Refactor disk space health check to Python script and wire SIZE_PERCENT_CLEANUP_DISC_SPACE - Adjust schedules and services for Docker cleanup and disk space health See discussion: https://chatgpt.com/share/6925c1c5-ee38-800f-84b6-da29ccfa7537
This commit is contained in:
@@ -1,123 +0,0 @@
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
# Importiere das Filtermodul
|
||||
# Pfad relativ zum Projekt; falls nötig, passe den Importpfad an
|
||||
import importlib
|
||||
jvm_filters = importlib.import_module("filter_plugins.jvm_filters")
|
||||
|
||||
|
||||
class TestJvmFilters(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Dummy applications dict – Inhalt egal, da get_app_conf gemockt wird
|
||||
self.apps = {"whatever": True}
|
||||
self.app_id = "web-app-confluence" # entity_name wird gemockt
|
||||
|
||||
# -----------------------------
|
||||
# Helpers
|
||||
# -----------------------------
|
||||
def _with_conf(self, mem_limit: str, mem_res: str):
|
||||
"""
|
||||
Context manager der get_app_conf/get_entity_name passend patched.
|
||||
"""
|
||||
patches = [
|
||||
patch("filter_plugins.jvm_filters.get_entity_name", return_value="confluence"),
|
||||
patch(
|
||||
"filter_plugins.jvm_filters.get_app_conf",
|
||||
side_effect=lambda apps, app_id, key, required=True: (
|
||||
mem_limit if key.endswith(".mem_limit")
|
||||
else mem_res if key.endswith(".mem_reservation")
|
||||
else None
|
||||
),
|
||||
),
|
||||
]
|
||||
ctxs = [p.start() for p in patches]
|
||||
self.addCleanup(lambda: [p.stop() for p in patches])
|
||||
return ctxs
|
||||
|
||||
# -----------------------------
|
||||
# Tests: jvm_max_mb / jvm_min_mb Sizing
|
||||
# -----------------------------
|
||||
def test_sizing_8g_limit_6g_reservation(self):
|
||||
# mem_limit=8g → candidates: 70% = 5734MB (floor 8*0.7=5.6GB→ 5734MB via int math 8*7//10=5)
|
||||
# int math: (8*1024)*7//10 = (8192)*7//10 = 5734
|
||||
# limit-1024 = 8192-1024 = 7168
|
||||
# 12288
|
||||
# → Xmx = min(5734, 7168, 12288) = 5734 → floor at 1024 keeps 5734
|
||||
# Xms = min(Xmx//2=2867, res=6144, Xmx=5734) = 2867 (>=512)
|
||||
self._with_conf("8g", "6g")
|
||||
xmx = jvm_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = jvm_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 5734)
|
||||
self.assertEqual(xms, 2867)
|
||||
|
||||
def test_sizing_6g_limit_4g_reservation(self):
|
||||
# limit=6g → 70%: (6144*7)//10 = 4300, limit-1024=5120, 12288 → Xmx=4300
|
||||
# Xms=min(4300//2=2150, 4096, 4300)=2150
|
||||
self._with_conf("6g", "4g")
|
||||
xmx = jvm_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = jvm_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 4300)
|
||||
self.assertEqual(xms, 2150)
|
||||
|
||||
def test_sizing_16g_limit_12g_reservation_cap_12288(self):
|
||||
# limit=16g → 70%: (16384*7)//10 = 11468, limit-1024=15360, cap=12288 → Xmx=min(11468,15360,12288)=11468
|
||||
# Xms=min(11468//2=5734, 12288 (12g), 11468) = 5734
|
||||
self._with_conf("16g", "12g")
|
||||
xmx = jvm_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = jvm_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 11468)
|
||||
self.assertEqual(xms, 5734)
|
||||
|
||||
def test_floor_small_limit_results_in_min_1024(self):
|
||||
# limit=1g → 70%: 716, limit-1024=0, 12288 → min=0 → floor → 1024
|
||||
self._with_conf("1g", "512m")
|
||||
xmx = jvm_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 1024)
|
||||
|
||||
def test_floor_small_reservation_results_in_min_512(self):
|
||||
# limit groß genug, aber reservation sehr klein → Xms floored to 512
|
||||
self._with_conf("4g", "128m")
|
||||
xms = jvm_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xms, 512)
|
||||
|
||||
# -----------------------------
|
||||
# Tests: Fehlerfälle / Validierung
|
||||
# -----------------------------
|
||||
def test_invalid_unit_raises(self):
|
||||
with patch("filter_plugins.jvm_filters.get_entity_name", return_value="confluence"), \
|
||||
patch("filter_plugins.jvm_filters.get_app_conf", side_effect=lambda apps, app_id, key, required=True:
|
||||
"8Q" if key.endswith(".mem_limit") else "4g"):
|
||||
with self.assertRaises(jvm_filters.AnsibleFilterError):
|
||||
jvm_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
|
||||
def test_zero_limit_raises(self):
|
||||
with patch("filter_plugins.jvm_filters.get_entity_name", return_value="confluence"), \
|
||||
patch("filter_plugins.jvm_filters.get_app_conf", side_effect=lambda apps, app_id, key, required=True:
|
||||
"0" if key.endswith(".mem_limit") else "4g"):
|
||||
with self.assertRaises(jvm_filters.AnsibleFilterError):
|
||||
jvm_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
|
||||
def test_zero_reservation_raises(self):
|
||||
with patch("filter_plugins.jvm_filters.get_entity_name", return_value="confluence"), \
|
||||
patch("filter_plugins.jvm_filters.get_app_conf", side_effect=lambda apps, app_id, key, required=True:
|
||||
"8g" if key.endswith(".mem_limit") else "0"):
|
||||
with self.assertRaises(jvm_filters.AnsibleFilterError):
|
||||
jvm_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
|
||||
def test_entity_name_is_derived_not_passed(self):
|
||||
# Sicherstellen, dass get_entity_name() aufgerufen wird und kein externer Parameter nötig ist
|
||||
with patch("filter_plugins.jvm_filters.get_entity_name", return_value="confluence") as mock_entity, \
|
||||
patch("filter_plugins.jvm_filters.get_app_conf", side_effect=lambda apps, app_id, key, required=True:
|
||||
"8g" if key.endswith(".mem_limit") else "6g"):
|
||||
xmx = jvm_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = jvm_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertGreater(xmx, 0)
|
||||
self.assertGreater(xms, 0)
|
||||
self.assertEqual(mock_entity.call_count, 3)
|
||||
for call in mock_entity.call_args_list:
|
||||
self.assertEqual(call.args[0], self.app_id)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
225
tests/unit/filter_plugins/test_memory_filters.py
Normal file
225
tests/unit/filter_plugins/test_memory_filters.py
Normal file
@@ -0,0 +1,225 @@
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
import importlib
|
||||
memory_filters = importlib.import_module("filter_plugins.memory_filters")
|
||||
|
||||
|
||||
class TestMemoryFilters(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Dummy applications dict – content does not matter because get_app_conf is mocked
|
||||
self.apps = {"whatever": True}
|
||||
self.app_id = "web-app-confluence" # entity_name will be mocked
|
||||
|
||||
# -----------------------------
|
||||
# Helpers
|
||||
# -----------------------------
|
||||
def _with_conf(self, mem_limit: str, mem_res: str):
|
||||
"""
|
||||
Patch get_app_conf/get_entity_name so that mem_limit and mem_reservation
|
||||
can be controlled in tests.
|
||||
"""
|
||||
patches = [
|
||||
patch("filter_plugins.memory_filters.get_entity_name", return_value="confluence"),
|
||||
patch(
|
||||
"filter_plugins.memory_filters.get_app_conf",
|
||||
side_effect=lambda apps, app_id, key, required=True, **kwargs: (
|
||||
mem_limit if key.endswith(".mem_limit")
|
||||
else mem_res if key.endswith(".mem_reservation")
|
||||
else None
|
||||
),
|
||||
),
|
||||
]
|
||||
mocks = [p.start() for p in patches]
|
||||
self.addCleanup(lambda: [p.stop() for p in patches])
|
||||
return mocks
|
||||
|
||||
# -----------------------------
|
||||
# Tests: jvm_max_mb / jvm_min_mb sizing
|
||||
# -----------------------------
|
||||
def test_sizing_8g_limit_6g_reservation(self):
|
||||
# mem_limit = 8g
|
||||
# candidates:
|
||||
# 70%: (8 * 1024) * 7 // 10 = 5734
|
||||
# limit - 1024: 8192 - 1024 = 7168
|
||||
# cap: 12288
|
||||
# -> Xmx = 5734
|
||||
# Xms = min(5734 // 2 = 2867, 6144, 5734) = 2867
|
||||
self._with_conf("8g", "6g")
|
||||
xmx = memory_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = memory_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 5734)
|
||||
self.assertEqual(xms, 2867)
|
||||
|
||||
def test_sizing_6g_limit_4g_reservation(self):
|
||||
# mem_limit = 6g
|
||||
# 70%: (6144 * 7) // 10 = 4300
|
||||
# limit - 1024: 6144 - 1024 = 5120
|
||||
# cap: 12288
|
||||
# -> Xmx = 4300
|
||||
# Xms = min(4300 // 2 = 2150, 4096, 4300) = 2150
|
||||
self._with_conf("6g", "4g")
|
||||
xmx = memory_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = memory_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 4300)
|
||||
self.assertEqual(xms, 2150)
|
||||
|
||||
def test_sizing_16g_limit_12g_reservation_cap_12288(self):
|
||||
# mem_limit = 16g
|
||||
# 70%: (16384 * 7) // 10 = 11468
|
||||
# limit - 1024: 16384 - 1024 = 15360
|
||||
# cap: 12288
|
||||
# -> Xmx = 11468
|
||||
# Xms = min(11468 // 2 = 5734, 12288, 11468) = 5734
|
||||
self._with_conf("16g", "12g")
|
||||
xmx = memory_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = memory_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 11468)
|
||||
self.assertEqual(xms, 5734)
|
||||
|
||||
def test_floor_small_limit_results_in_min_1024(self):
|
||||
# mem_limit = 1g
|
||||
# 70%: ~716 MB, limit - 1024 = 0, cap: 12288
|
||||
# -> min candidates = 0 => floored to 1024 MB
|
||||
self._with_conf("1g", "512m")
|
||||
xmx = memory_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xmx, 1024)
|
||||
|
||||
def test_floor_small_reservation_results_in_min_512(self):
|
||||
# mem_limit is large enough, but reservation is tiny -> floored to 512
|
||||
self._with_conf("4g", "128m")
|
||||
xms = memory_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertEqual(xms, 512)
|
||||
|
||||
# -----------------------------
|
||||
# Tests: JVM failure cases / validation
|
||||
# -----------------------------
|
||||
def test_invalid_unit_raises(self):
|
||||
with patch("filter_plugins.memory_filters.get_entity_name", return_value="confluence"), \
|
||||
patch("filter_plugins.memory_filters.get_app_conf",
|
||||
side_effect=lambda apps, app_id, key, required=True, **kwargs:
|
||||
"8Q" if key.endswith(".mem_limit") else "4g"):
|
||||
with self.assertRaises(memory_filters.AnsibleFilterError):
|
||||
memory_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
|
||||
def test_zero_limit_raises(self):
|
||||
with patch("filter_plugins.memory_filters.get_entity_name", return_value="confluence"), \
|
||||
patch("filter_plugins.memory_filters.get_app_conf",
|
||||
side_effect=lambda apps, app_id, key, required=True, **kwargs:
|
||||
"0" if key.endswith(".mem_limit") else "4g"):
|
||||
with self.assertRaises(memory_filters.AnsibleFilterError):
|
||||
memory_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
|
||||
def test_zero_reservation_raises(self):
|
||||
with patch("filter_plugins.memory_filters.get_entity_name", return_value="confluence"), \
|
||||
patch("filter_plugins.memory_filters.get_app_conf",
|
||||
side_effect=lambda apps, app_id, key, required=True, **kwargs:
|
||||
"8g" if key.endswith(".mem_limit") else "0"):
|
||||
with self.assertRaises(memory_filters.AnsibleFilterError):
|
||||
memory_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
|
||||
def test_entity_name_is_derived_not_passed(self):
|
||||
"""
|
||||
Ensure get_entity_name() is called internally and the app_id is not
|
||||
passed around manually from the template.
|
||||
"""
|
||||
with patch("filter_plugins.memory_filters.get_entity_name", return_value="confluence") as mock_entity, \
|
||||
patch("filter_plugins.memory_filters.get_app_conf",
|
||||
side_effect=lambda apps, app_id, key, required=True, **kwargs:
|
||||
"8g" if key.endswith(".mem_limit") else "6g"):
|
||||
xmx = memory_filters.jvm_max_mb(self.apps, self.app_id)
|
||||
xms = memory_filters.jvm_min_mb(self.apps, self.app_id)
|
||||
self.assertGreater(xmx, 0)
|
||||
self.assertGreater(xms, 0)
|
||||
self.assertEqual(mock_entity.call_count, 3)
|
||||
for call in mock_entity.call_args_list:
|
||||
self.assertEqual(call.args[0], self.app_id)
|
||||
|
||||
# -----------------------------
|
||||
# Tests: redis_maxmemory_mb
|
||||
# -----------------------------
|
||||
def test_redis_maxmemory_default_factor_uses_80_percent_of_limit(self):
|
||||
# mem_limit = 1g → 1024 MB
|
||||
# factor = 0.8 → int(1024 * 0.8) = 819
|
||||
self._with_conf("1g", "512m")
|
||||
maxmem = memory_filters.redis_maxmemory_mb(self.apps, self.app_id)
|
||||
self.assertEqual(maxmem, 819)
|
||||
|
||||
def test_redis_maxmemory_custom_factor_and_min_mb(self):
|
||||
# mem_limit = 1g → 1024 MB
|
||||
# factor = 0.5 → 512 MB
|
||||
# min_mb = 128 → result stays 512
|
||||
self._with_conf("1g", "512m")
|
||||
maxmem = memory_filters.redis_maxmemory_mb(
|
||||
self.apps,
|
||||
self.app_id,
|
||||
factor=0.5,
|
||||
min_mb=128,
|
||||
)
|
||||
self.assertEqual(maxmem, 512)
|
||||
|
||||
def test_redis_maxmemory_honors_minimum_floor(self):
|
||||
# mem_limit = 32m → 32 MB
|
||||
# factor = 0.8 → int(32 * 0.8) = 25 < min_mb(64)
|
||||
# → result = 64
|
||||
self._with_conf("32m", "16m")
|
||||
maxmem = memory_filters.redis_maxmemory_mb(self.apps, self.app_id)
|
||||
self.assertEqual(maxmem, 64)
|
||||
|
||||
def test_redis_maxmemory_zero_limit_raises(self):
|
||||
# mem_limit = 0 → must raise AnsibleFilterError
|
||||
self._with_conf("0", "512m")
|
||||
with self.assertRaises(memory_filters.AnsibleFilterError):
|
||||
memory_filters.redis_maxmemory_mb(self.apps, self.app_id)
|
||||
|
||||
def test_redis_maxmemory_invalid_unit_raises(self):
|
||||
# mem_limit = "8Q" → invalid unit → must raise
|
||||
self._with_conf("8Q", "512m")
|
||||
with self.assertRaises(memory_filters.AnsibleFilterError):
|
||||
memory_filters.redis_maxmemory_mb(self.apps, self.app_id)
|
||||
|
||||
def test_redis_maxmemory_does_not_call_get_entity_name(self):
|
||||
"""
|
||||
Ensure redis_maxmemory_mb does NOT rely on entity name resolution
|
||||
(it should always use the hard-coded 'redis' service name).
|
||||
"""
|
||||
patches = [
|
||||
patch("filter_plugins.memory_filters.get_entity_name"),
|
||||
patch(
|
||||
"filter_plugins.memory_filters.get_app_conf",
|
||||
side_effect=lambda apps, app_id, key, required=True, **kwargs: (
|
||||
"4g" if key.endswith(".mem_limit") else "2g"
|
||||
),
|
||||
),
|
||||
]
|
||||
mocks = [p.start() for p in patches]
|
||||
self.addCleanup(lambda: [p.stop() for p in patches])
|
||||
|
||||
entity_mock = mocks[0]
|
||||
|
||||
maxmem = memory_filters.redis_maxmemory_mb(self.apps, self.app_id)
|
||||
# 4g → 4096 MB, factor 0.8 → 3276
|
||||
self.assertEqual(maxmem, 3276)
|
||||
entity_mock.assert_not_called()
|
||||
|
||||
def test_redis_maxmemory_uses_default_when_mem_limit_missing(self):
|
||||
"""
|
||||
When docker.services.redis.mem_limit is not configured, the filter
|
||||
should fall back to its internal default (256m).
|
||||
"""
|
||||
def fake_get_app_conf(apps, app_id, key, required=True, **kwargs):
|
||||
# Simulate missing mem_limit: return the provided default
|
||||
if key.endswith(".mem_limit"):
|
||||
return kwargs.get("default")
|
||||
return None
|
||||
|
||||
with patch("filter_plugins.memory_filters.get_app_conf", side_effect=fake_get_app_conf), \
|
||||
patch("filter_plugins.memory_filters.get_entity_name", return_value="confluence"):
|
||||
maxmem = memory_filters.redis_maxmemory_mb(self.apps, self.app_id)
|
||||
|
||||
# default_mb = 256 → factor 0.8 → floor(256 * 0.8) = 204
|
||||
self.assertEqual(maxmem, 204)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
123
tests/unit/roles/sys-ctl-hlth-disc-space/files/script.py
Normal file
123
tests/unit/roles/sys-ctl-hlth-disc-space/files/script.py
Normal file
@@ -0,0 +1,123 @@
|
||||
#!/usr/bin/env python3
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
import pathlib
|
||||
import contextlib
|
||||
import importlib.util
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
|
||||
def load_target_module():
|
||||
"""
|
||||
Load the target script (roles/sys-ctl-hlth-disc-space/files/script.py)
|
||||
via its file path so that dashes in the directory name are not an issue.
|
||||
"""
|
||||
# tests/unit/roles/sys-ctl-hlth-disc-space/files/script.py
|
||||
test_file_path = pathlib.Path(__file__).resolve()
|
||||
repo_root = test_file_path.parents[4] # go up: files -> ... -> unit -> tests -> <root>
|
||||
|
||||
script_path = repo_root / "roles" / "sys-ctl-hlth-disc-space" / "files" / "script.py"
|
||||
if not script_path.is_file():
|
||||
raise FileNotFoundError(f"Target script not found at: {script_path}")
|
||||
|
||||
spec = importlib.util.spec_from_file_location("disk_space_script", script_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
# Load the module once for all tests
|
||||
SCRIPT_MODULE = load_target_module()
|
||||
|
||||
|
||||
class TestDiskSpaceScript(unittest.TestCase):
|
||||
def test_get_disk_usage_percentages_parses_output(self):
|
||||
"""
|
||||
Ensure get_disk_usage_percentages parses 'df --output=pcent' correctly
|
||||
and returns integer percentages without the '%' sign.
|
||||
"""
|
||||
# Fake df output, including header line and various spacings
|
||||
fake_df_output = "Use%\n 10%\n 50%\n100%\n"
|
||||
|
||||
with mock.patch.object(
|
||||
SCRIPT_MODULE.subprocess,
|
||||
"run",
|
||||
return_value=SimpleNamespace(stdout=fake_df_output, returncode=0),
|
||||
):
|
||||
result = SCRIPT_MODULE.get_disk_usage_percentages()
|
||||
|
||||
self.assertEqual(result, [10, 50, 100])
|
||||
|
||||
def test_main_exits_zero_when_below_threshold(self):
|
||||
"""
|
||||
If all filesystems are below or equal the threshold,
|
||||
main() should exit with status code 0.
|
||||
"""
|
||||
# First call: 'df' (printing only) -> we don't care about stdout here
|
||||
df_print_cp = SimpleNamespace(stdout="Filesystem ...\n", returncode=0)
|
||||
# Second call: 'df --output=pcent'
|
||||
df_pcent_cp = SimpleNamespace(stdout="Use%\n 10%\n 50%\n 80%\n", returncode=0)
|
||||
|
||||
def fake_run(args, capture_output=False, text=False, check=False):
|
||||
# Decide which fake result to return based on the arguments
|
||||
if args == ["df", "--output=pcent"]:
|
||||
return df_pcent_cp
|
||||
elif args == ["df"]:
|
||||
return df_print_cp
|
||||
else:
|
||||
raise AssertionError(f"Unexpected subprocess.run args: {args}")
|
||||
|
||||
with mock.patch.object(SCRIPT_MODULE.subprocess, "run", side_effect=fake_run):
|
||||
with mock.patch.object(sys, "argv", ["script.py", "80"]):
|
||||
with mock.patch.object(SCRIPT_MODULE.sys, "exit", side_effect=SystemExit) as mock_exit:
|
||||
# Capture stdout to avoid clutter in test output
|
||||
with contextlib.redirect_stdout(io.StringIO()):
|
||||
with self.assertRaises(SystemExit):
|
||||
SCRIPT_MODULE.main()
|
||||
|
||||
# Expect no filesystem above 80% -> exit code 0
|
||||
mock_exit.assert_called_once_with(0)
|
||||
|
||||
def test_main_exits_with_error_count_and_prints_warnings(self):
|
||||
"""
|
||||
If some filesystems exceed the threshold, main() should:
|
||||
- Print a warning for each filesystem that exceeds it
|
||||
- Exit with a status code equal to the number of such filesystems
|
||||
"""
|
||||
df_print_cp = SimpleNamespace(stdout="Filesystem ...\n", returncode=0)
|
||||
# Two filesystems above threshold (90%, 95%), one below (60%)
|
||||
df_pcent_cp = SimpleNamespace(stdout="Use%\n 60%\n 90%\n 95%\n", returncode=0)
|
||||
|
||||
def fake_run(args, capture_output=False, text=False, check=False):
|
||||
if args == ["df", "--output=pcent"]:
|
||||
return df_pcent_cp
|
||||
elif args == ["df"]:
|
||||
return df_print_cp
|
||||
else:
|
||||
raise AssertionError(f"Unexpected subprocess.run args: {args}")
|
||||
|
||||
with mock.patch.object(SCRIPT_MODULE.subprocess, "run", side_effect=fake_run):
|
||||
with mock.patch.object(sys, "argv", ["script.py", "80"]):
|
||||
with mock.patch.object(SCRIPT_MODULE.sys, "exit", side_effect=SystemExit) as mock_exit:
|
||||
buffer = io.StringIO()
|
||||
with contextlib.redirect_stdout(buffer):
|
||||
with self.assertRaises(SystemExit):
|
||||
SCRIPT_MODULE.main()
|
||||
|
||||
# Expect exit code 2 (two filesystems over 80%)
|
||||
mock_exit.assert_called_once_with(2)
|
||||
|
||||
output = buffer.getvalue()
|
||||
self.assertIn("Checking disk space usage...", output)
|
||||
self.assertIn("WARNING: 90% exceeds the limit of 80%.", output)
|
||||
self.assertIn("WARNING: 95% exceeds the limit of 80%.", output)
|
||||
# Ensure the "below threshold" value does not produce a warning
|
||||
self.assertNotIn("60% exceeds the limit of 80%.", output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user