mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2024-11-10 15:01:05 +01:00
188 lines
7.0 KiB
Python
188 lines
7.0 KiB
Python
import argparse
|
|
import subprocess
|
|
import time
|
|
import os
|
|
from datetime import datetime
|
|
|
|
# Global variable definition
|
|
BREAK_TIME_SECONDS = 5
|
|
FREEZER_SERVICES_PREFIX="system-maintenance-service-"
|
|
|
|
class AttemptException(Exception):
|
|
"""A custom exception for maximum number of attempts."""
|
|
pass
|
|
|
|
def parse_time_to_seconds(time_str):
|
|
"""
|
|
Convert a time string (e.g., '1h', '30min', '45s') to seconds.
|
|
"""
|
|
units = {"s": 1, "min": 60, "h": 3600}
|
|
if time_str[-3:] in units:
|
|
number, unit = time_str[:-3], time_str[-3:]
|
|
elif time_str[-2:] in units:
|
|
number, unit = time_str[:-2], time_str[-2:]
|
|
elif time_str[-1:] in units:
|
|
number, unit = time_str[:-1], time_str[-1:]
|
|
else:
|
|
raise ValueError("Invalid time unit")
|
|
return int(number) * units[unit]
|
|
|
|
def service_file_exists(service_name, service_type="service"):
|
|
"""
|
|
Check if a systemd service file of a given type exists for a service.
|
|
"""
|
|
path = "/etc/systemd/system/"
|
|
service_file_name = f"{service_name}.{service_type}"
|
|
full_path = os.path.join(path, service_file_name)
|
|
|
|
# Debug output for checking the service file existence
|
|
print(f"Checking {full_path}")
|
|
return os.path.isfile(full_path)
|
|
|
|
def check_service_active(service_name):
|
|
"""
|
|
Check if a systemd service is currently active or activating.
|
|
"""
|
|
result = subprocess.run(['systemctl', 'is-active', service_name], stdout=subprocess.PIPE)
|
|
service_status = result.stdout.decode('utf-8').strip()
|
|
is_active = service_status in ['active', 'activating']
|
|
print(f"Service {service_name} is {'active' if is_active else 'not active'}.")
|
|
return is_active
|
|
|
|
def check_any_service_active(services):
|
|
"""
|
|
Check if any service in a given list is active or activating.
|
|
"""
|
|
return any(check_service_active(service) for service in services)
|
|
|
|
def manage_timer(service, action):
|
|
"""
|
|
Manage a systemd timer for a service.
|
|
action can be 'start' or 'stop'.
|
|
"""
|
|
if action not in ['start', 'stop']:
|
|
raise ValueError("Invalid action specified for manage_timer")
|
|
|
|
timer_name = f"{service}.timer"
|
|
try:
|
|
subprocess.run(['systemctl', action, timer_name], check=True)
|
|
if action == 'start':
|
|
subprocess.run(['systemctl', 'enable', timer_name], check=True)
|
|
elif action == 'stop':
|
|
subprocess.run(['systemctl', 'disable', timer_name], check=True)
|
|
print(f"{timer_name} {action}ed and {'enabled' if action == 'start' else 'disabled'}.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error managing timer {timer_name}: {e}")
|
|
exit(1)
|
|
|
|
def stop_timer(service):
|
|
"""
|
|
Stop and disable a systemd timer for a service if it exists.
|
|
"""
|
|
if service == f"{FREEZER_SERVICES_PREFIX}defrost":
|
|
print(f"Ignoring {service}. It's the initializer of freezer.")
|
|
if service_file_exists(service, "timer"):
|
|
manage_timer(service, 'stop')
|
|
else:
|
|
print(f"Timer {service}.timer does not exist.")
|
|
|
|
def filter_services(services, ignored_services):
|
|
"""
|
|
Filter out services that are in the ignored_services list from services list.
|
|
"""
|
|
return [service for service in services if service not in ignored_services]
|
|
|
|
def stop_all_timers(services):
|
|
"""
|
|
Stop and disable timers for all services in a given list.
|
|
"""
|
|
for service in services:
|
|
stop_timer(service)
|
|
|
|
def wait_for_all_services_to_stop(filtered_services, max_attempts, attempt):
|
|
"""
|
|
Wait until all services in the list have stopped, with a maximum number of attempts.
|
|
"""
|
|
for service in filtered_services:
|
|
while check_service_active(service):
|
|
attempt += 1
|
|
if attempt > max_attempts:
|
|
raise AttemptException(f"Maximum attempts ({max_attempts}) reached. Exiting.")
|
|
print(f"{datetime.now().isoformat()}#{attempt}/{max_attempts}: Waiting for {BREAK_TIME_SECONDS} seconds for {service} to stop...")
|
|
time.sleep(BREAK_TIME_SECONDS)
|
|
return attempt
|
|
|
|
def freeze(filtered_services, timeout_sec):
|
|
"""
|
|
Freeze services by stopping them and their timers, waiting up to a timeout.
|
|
"""
|
|
attempt = 0
|
|
max_attempts = get_max_attempts(timeout_sec)
|
|
|
|
while check_any_service_active(filtered_services):
|
|
stop_all_timers(filtered_services)
|
|
attempt = wait_for_all_services_to_stop(filtered_services, max_attempts, attempt)
|
|
print("All required services have stopped.")
|
|
|
|
def get_max_attempts(timeout_sec):
|
|
return timeout_sec // BREAK_TIME_SECONDS
|
|
|
|
def defrost(filtered_services,timeout_sec):
|
|
"""
|
|
Defrost services by starting and enabling their timers.
|
|
"""
|
|
running_service = f"{FREEZER_SERVICES_PREFIX}defrost"
|
|
attempt = 0
|
|
max_attempts = get_max_attempts(timeout_sec)
|
|
try:
|
|
wait_for_all_services_to_stop(filtered_services, max_attempts, attempt)
|
|
except AttemptException as e:
|
|
print(e)
|
|
print("Defrosting was not possible. The execution of other services took to long.")
|
|
manage_timer(running_service, "stop")
|
|
exit(0)
|
|
|
|
for service in filtered_services + [running_service]:
|
|
print(f"Unfreezing: {service}")
|
|
if service_file_exists(service, "timer"):
|
|
manage_timer(service, "start")
|
|
else:
|
|
print("No timer to activate for service.")
|
|
print("All required services are started.")
|
|
|
|
def main(services, ignored_services, action, timeout_sec):
|
|
"""
|
|
Main function to process the command-line arguments and perform actions.
|
|
"""
|
|
|
|
# Ignoring the current running service
|
|
running_service=f"{FREEZER_SERVICES_PREFIX}{action}"
|
|
if running_service not in ignored_services:
|
|
ignored_services.append(running_service)
|
|
|
|
filtered_services = filter_services(services, ignored_services)
|
|
print(f"Services to handle: {services}")
|
|
print(f"Services to ignore: {ignored_services}")
|
|
print(f"Services filtered: {filtered_services}")
|
|
|
|
if action == 'freeze':
|
|
print("Freezing services.")
|
|
freeze(filtered_services, timeout_sec)
|
|
elif action == 'defrost':
|
|
print("Unfreezing services.")
|
|
defrost(filtered_services, timeout_sec)
|
|
print("Overview:")
|
|
subprocess.run(['systemctl', 'list-timers'])
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description='Freezes and defrosts systemd services and timers.')
|
|
parser.add_argument('action', choices=['freeze', 'defrost'], help='Action to perform: freeze or defrost services.')
|
|
parser.add_argument('services', nargs='+', help='List of services to apply the action to.')
|
|
parser.add_argument('--ignore', nargs='*', help='List of services to ignore in the action.', default=[])
|
|
parser.add_argument('--timeout', help='Timeout for freezing services (e.g., 1h, 30min, 45s).', default='1h')
|
|
args = parser.parse_args()
|
|
services = args.services
|
|
ignored_services = args.ignore if args.ignore else []
|
|
timeout_seconds = parse_time_to_seconds(args.timeout)
|
|
main(services, ignored_services, args.action, timeout_seconds)
|