mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-12-17 22:32:52 +00:00
Refactor setup workflow and make install robust via virtualenv
- Introduce a dedicated Python virtualenv (deps target) and run all setup scripts through it - Fix missing PyYAML errors in clean, CI, and Nix environments - Refactor build defaults into cli/setup for clearer semantics - Make setup deterministic and independent from system Python - Replace early Makefile shell expansion with runtime evaluation - Rename messy-test to test-messy and update deploy logic and tests accordingly - Keep setup and test targets consistent across Makefile, CLI, and unit tests https://chatgpt.com/share/693de226-00ac-800f-8cbd-06552b2f283c
This commit is contained in:
0
cli/setup/__init__.py
Normal file
0
cli/setup/__init__.py
Normal file
212
cli/setup/applications.py
Normal file
212
cli/setup/applications.py
Normal file
@@ -0,0 +1,212 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import yaml
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure project root on PYTHONPATH so module_utils is importable
|
||||
repo_root = Path(__file__).resolve().parent.parent.parent.parent
|
||||
sys.path.insert(0, str(repo_root))
|
||||
|
||||
# Add lookup_plugins for application_gid
|
||||
plugin_path = repo_root / "lookup_plugins"
|
||||
sys.path.insert(0, str(plugin_path))
|
||||
|
||||
from module_utils.dict_renderer import DictRenderer
|
||||
from application_gid import LookupModule
|
||||
|
||||
def load_yaml_file(path: Path) -> dict:
|
||||
if not path.exists():
|
||||
return {}
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
|
||||
class DefaultsGenerator:
|
||||
def __init__(self, roles_dir: Path, output_file: Path, verbose: bool, timeout: float):
|
||||
self.roles_dir = roles_dir
|
||||
self.output_file = output_file
|
||||
self.verbose = verbose
|
||||
self.renderer = DictRenderer(verbose=verbose, timeout=timeout)
|
||||
self.gid_lookup = LookupModule()
|
||||
|
||||
def log(self, message: str):
|
||||
if self.verbose:
|
||||
print(f"[DefaultsGenerator] {message}")
|
||||
|
||||
def run(self):
|
||||
result = {"defaults_applications": {}}
|
||||
|
||||
for role_dir in sorted(self.roles_dir.iterdir()):
|
||||
role_name = role_dir.name
|
||||
vars_main = role_dir / "vars" / "main.yml"
|
||||
config_file = role_dir / "config" / "main.yml"
|
||||
|
||||
if not vars_main.exists():
|
||||
self.log(f"Skipping {role_name}: vars/main.yml missing")
|
||||
continue
|
||||
|
||||
vars_data = load_yaml_file(vars_main)
|
||||
application_id = vars_data.get("application_id")
|
||||
if not application_id:
|
||||
self.log(f"Skipping {role_name}: application_id not defined")
|
||||
continue
|
||||
|
||||
if not config_file.exists():
|
||||
self.log(f"Config missing for {role_name}, adding empty defaults for '{application_id}'")
|
||||
result["defaults_applications"][application_id] = {}
|
||||
continue
|
||||
|
||||
config_data = load_yaml_file(config_file)
|
||||
if not config_data:
|
||||
# Empty or null config → still register the application with empty defaults
|
||||
self.log(f"Empty config for {role_name}, adding empty defaults for '{application_id}'")
|
||||
result["defaults_applications"][application_id] = {}
|
||||
continue
|
||||
|
||||
# Existing non-empty config: keep current behavior
|
||||
try:
|
||||
gid_number = self.gid_lookup.run([application_id], roles_dir=str(self.roles_dir))[0]
|
||||
except Exception as e:
|
||||
print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
config_data["group_id"] = gid_number
|
||||
result["defaults_applications"][application_id] = config_data
|
||||
|
||||
# Inject users mapping as Jinja2 references (unchanged)
|
||||
users_meta = load_yaml_file(role_dir / "users" / "main.yml")
|
||||
users_data = users_meta.get("users", {})
|
||||
transformed = {user: f"{{{{ users[\"{user}\"] }}}}" for user in users_data}
|
||||
if transformed:
|
||||
result["defaults_applications"][application_id]["users"] = transformed
|
||||
|
||||
# Render placeholders in entire result context
|
||||
self.log("Starting placeholder rendering...")
|
||||
try:
|
||||
result = self.renderer.render(result)
|
||||
except Exception as e:
|
||||
print(f"Error during rendering: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Sort applications by application key for stable output
|
||||
apps = result.get("defaults_applications", {})
|
||||
if isinstance(apps, dict) and apps:
|
||||
result["defaults_applications"] = {
|
||||
k: apps[k] for k in sorted(apps.keys())
|
||||
}
|
||||
|
||||
# Write output
|
||||
self.output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
with self.output_file.open("w", encoding="utf-8") as f:
|
||||
yaml.dump(result, f, sort_keys=False)
|
||||
|
||||
# Print location of generated file (absolute if not under cwd)
|
||||
try:
|
||||
rel = self.output_file.relative_to(Path.cwd())
|
||||
except ValueError:
|
||||
rel = self.output_file
|
||||
print(f"✅ Generated: {rel}")
|
||||
|
||||
def test_empty_config_mapping_adds_empty_defaults(self):
|
||||
"""
|
||||
If a role has vars/main.yml and config/main.yml exists but contains an
|
||||
empty mapping ({}), the generator must still emit an empty-dict entry
|
||||
for that application_id.
|
||||
"""
|
||||
role_empty_cfg = self.roles_dir / "role-empty-config"
|
||||
(role_empty_cfg / "vars").mkdir(parents=True, exist_ok=True)
|
||||
(role_empty_cfg / "config").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# application_id is defined…
|
||||
(role_empty_cfg / "vars" / "main.yml").write_text(
|
||||
"application_id: emptycfg\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
# …but config is an explicit empty mapping
|
||||
(role_empty_cfg / "config" / "main.yml").write_text(
|
||||
"{}\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
"python3",
|
||||
str(self.script_path),
|
||||
"--roles-dir",
|
||||
str(self.roles_dir),
|
||||
"--output-file",
|
||||
str(self.output_file),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
self.assertEqual(result.returncode, 0, msg=result.stderr)
|
||||
|
||||
data = yaml.safe_load(self.output_file.read_text())
|
||||
apps = data.get("defaults_applications", {})
|
||||
|
||||
self.assertIn("emptycfg", apps)
|
||||
self.assertEqual(
|
||||
apps["emptycfg"],
|
||||
{},
|
||||
msg="Role with {} config should produce an empty defaults mapping",
|
||||
)
|
||||
|
||||
def test_empty_config_file_adds_empty_defaults(self):
|
||||
"""
|
||||
If a role has vars/main.yml and config/main.yml exists but is an empty
|
||||
file (or only whitespace), the generator must still emit an empty-dict
|
||||
entry for that application_id.
|
||||
"""
|
||||
role_empty_file = self.roles_dir / "role-empty-config-file"
|
||||
(role_empty_file / "vars").mkdir(parents=True, exist_ok=True)
|
||||
(role_empty_file / "config").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
(role_empty_file / "vars" / "main.yml").write_text(
|
||||
"application_id: emptyfileapp\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
# Create an empty file (no YAML content at all)
|
||||
(role_empty_file / "config" / "main.yml").write_text(
|
||||
"",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
"python3",
|
||||
str(self.script_path),
|
||||
"--roles-dir",
|
||||
str(self.roles_dir),
|
||||
"--output-file",
|
||||
str(self.output_file),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
self.assertEqual(result.returncode, 0, msg=result.stderr)
|
||||
|
||||
data = yaml.safe_load(self.output_file.read_text())
|
||||
apps = data.get("defaults_applications", {})
|
||||
|
||||
self.assertIn("emptyfileapp", apps)
|
||||
self.assertEqual(
|
||||
apps["emptyfileapp"],
|
||||
{},
|
||||
msg="Role with empty config file should produce an empty defaults mapping",
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Generate defaults_applications YAML...")
|
||||
parser.add_argument("--roles-dir", default="roles", help="Path to the roles directory")
|
||||
parser.add_argument("--output-file", required=True, help="Path to output YAML file")
|
||||
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
|
||||
parser.add_argument("--timeout", type=float, default=10.0, help="Timeout for rendering")
|
||||
|
||||
args = parser.parse_args()
|
||||
cwd = Path.cwd()
|
||||
roles_dir = (cwd / args.roles_dir).resolve()
|
||||
output_file = (cwd / args.output_file).resolve()
|
||||
|
||||
DefaultsGenerator(roles_dir, output_file, args.verbose, args.timeout).run()
|
||||
253
cli/setup/users.py
Normal file
253
cli/setup/users.py
Normal file
@@ -0,0 +1,253 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
import glob
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
def represent_str(dumper, data):
|
||||
"""
|
||||
Custom YAML string representer that forces double quotes around any string
|
||||
containing a Jinja2 placeholder ({{ ... }}).
|
||||
"""
|
||||
if isinstance(data, str) and '{{' in data:
|
||||
return dumper.represent_scalar(
|
||||
'tag:yaml.org,2002:str',
|
||||
data,
|
||||
style='"'
|
||||
)
|
||||
return dumper.represent_scalar(
|
||||
'tag:yaml.org,2002:str',
|
||||
data
|
||||
)
|
||||
|
||||
|
||||
def build_users(defs, primary_domain, start_id, become_pwd):
|
||||
"""
|
||||
Construct user entries with auto-incremented UID/GID, default username/email,
|
||||
and optional description.
|
||||
|
||||
Args:
|
||||
defs (OrderedDict): Mapping of user keys to their override settings.
|
||||
primary_domain (str): The primary domain for email addresses (e.g. 'example.com').
|
||||
start_id (int): Starting number for UID/GID allocation (e.g. 1001).
|
||||
become_pwd (str): Default password string for users without an override.
|
||||
|
||||
Returns:
|
||||
OrderedDict: Complete user definitions with all required fields filled in.
|
||||
|
||||
Raises:
|
||||
ValueError: If there are duplicate UIDs, usernames, or emails.
|
||||
"""
|
||||
users = OrderedDict()
|
||||
used_uids = set()
|
||||
|
||||
# Collect any preset UIDs to avoid collisions
|
||||
for key, overrides in defs.items():
|
||||
if 'uid' in overrides:
|
||||
uid = overrides['uid']
|
||||
if uid in used_uids:
|
||||
raise ValueError(f"Duplicate uid {uid} for user '{key}'")
|
||||
used_uids.add(uid)
|
||||
|
||||
next_uid = start_id
|
||||
def allocate_uid():
|
||||
nonlocal next_uid
|
||||
# Find the next free UID not already used
|
||||
while next_uid in used_uids:
|
||||
next_uid += 1
|
||||
free_uid = next_uid
|
||||
used_uids.add(free_uid)
|
||||
next_uid += 1
|
||||
return free_uid
|
||||
|
||||
# Build each user entry
|
||||
for key, overrides in defs.items():
|
||||
username = overrides.get('username', key)
|
||||
email = overrides.get('email', f"{username}@{primary_domain}")
|
||||
description = overrides.get('description')
|
||||
roles = overrides.get('roles', [])
|
||||
password = overrides.get('password', become_pwd)
|
||||
reserved = overrides.get('reserved', False)
|
||||
|
||||
# Determine UID and GID
|
||||
if 'uid' in overrides:
|
||||
uid = overrides['uid']
|
||||
else:
|
||||
uid = allocate_uid()
|
||||
gid = overrides.get('gid', uid)
|
||||
|
||||
entry = {
|
||||
'username': username,
|
||||
'email': email,
|
||||
'password': password,
|
||||
'uid': uid,
|
||||
'gid': gid,
|
||||
'roles': roles
|
||||
}
|
||||
if description is not None:
|
||||
entry['description'] = description
|
||||
|
||||
if reserved:
|
||||
entry['reserved'] = reserved
|
||||
|
||||
users[key] = entry
|
||||
|
||||
# Ensure uniqueness of usernames and emails
|
||||
seen_usernames = set()
|
||||
seen_emails = set()
|
||||
|
||||
for key, entry in users.items():
|
||||
un = entry['username']
|
||||
em = entry['email']
|
||||
if un in seen_usernames:
|
||||
raise ValueError(f"Duplicate username '{un}' in merged users")
|
||||
if em in seen_emails:
|
||||
raise ValueError(f"Duplicate email '{em}' in merged users")
|
||||
seen_usernames.add(un)
|
||||
seen_emails.add(em)
|
||||
|
||||
return users
|
||||
|
||||
|
||||
def load_user_defs(roles_directory):
|
||||
"""
|
||||
Scan all roles/*/users/main.yml files and merge any 'users:' sections.
|
||||
|
||||
Args:
|
||||
roles_directory (str): Path to the directory containing role subdirectories.
|
||||
|
||||
Returns:
|
||||
OrderedDict: Merged user definitions from all roles.
|
||||
|
||||
Raises:
|
||||
ValueError: On invalid format or conflicting override values.
|
||||
"""
|
||||
pattern = os.path.join(roles_directory, '*/users/main.yml')
|
||||
files = sorted(glob.glob(pattern))
|
||||
merged = OrderedDict()
|
||||
|
||||
for filepath in files:
|
||||
with open(filepath, 'r') as f:
|
||||
data = yaml.safe_load(f) or {}
|
||||
users = data.get('users', {})
|
||||
if not isinstance(users, dict):
|
||||
continue
|
||||
|
||||
for key, overrides in users.items():
|
||||
if not isinstance(overrides, dict):
|
||||
raise ValueError(f"Invalid definition for user '{key}' in {filepath}")
|
||||
|
||||
if key not in merged:
|
||||
merged[key] = overrides.copy()
|
||||
else:
|
||||
existing = merged[key]
|
||||
for field, value in overrides.items():
|
||||
if field in existing and existing[field] != value:
|
||||
raise ValueError(
|
||||
f"Conflict for user '{key}': field '{field}' has existing value '{existing[field]}', tried to set '{value}' in {filepath}"
|
||||
)
|
||||
existing.update(overrides)
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
def dictify(data):
|
||||
"""
|
||||
Recursively convert OrderedDict to regular dict for YAML dumping.
|
||||
"""
|
||||
if isinstance(data, OrderedDict):
|
||||
return {k: dictify(v) for k, v in data.items()}
|
||||
if isinstance(data, dict):
|
||||
return {k: dictify(v) for k, v in data.items()}
|
||||
if isinstance(data, list):
|
||||
return [dictify(v) for v in data]
|
||||
return data
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Generate a users.yml by merging all roles/*/users/main.yml definitions.'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--roles-dir', '-r', required=True,
|
||||
help='Directory containing roles (e.g., roles/*/users/main.yml).'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--output', '-o', required=True,
|
||||
help='Path to the output YAML file (e.g., users.yml).'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--start-id', '-s', type=int, default=1001,
|
||||
help='Starting UID/GID number (default: 1001).'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--reserved-usernames', '-e',
|
||||
help='Comma-separated list of usernames to reserve.',
|
||||
default=None
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
primary_domain = '{{ SYSTEM_EMAIL.DOMAIN }}'
|
||||
become_pwd = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}'
|
||||
|
||||
try:
|
||||
definitions = load_user_defs(args.roles_dir)
|
||||
except ValueError as e:
|
||||
print(f"Error merging user definitions: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Add reserved/ users if specified
|
||||
if args.reserved_usernames:
|
||||
for name in args.reserved_usernames.split(','):
|
||||
user_key = name.strip()
|
||||
if not user_key:
|
||||
continue
|
||||
if user_key in definitions:
|
||||
print(
|
||||
f"Warning: reserved user '{user_key}' already defined; skipping (not changing existing definition).",
|
||||
file=sys.stderr
|
||||
)
|
||||
else:
|
||||
definitions[user_key] = {}
|
||||
# Mark user as reserved
|
||||
definitions[user_key]["reserved"] = True
|
||||
try:
|
||||
users = build_users(
|
||||
definitions,
|
||||
primary_domain,
|
||||
args.start_id,
|
||||
become_pwd
|
||||
)
|
||||
except ValueError as e:
|
||||
print(f"Error building user entries: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Sort users by key for deterministic output
|
||||
if isinstance(users, dict) and users:
|
||||
users = OrderedDict(sorted(users.items()))
|
||||
|
||||
# Convert OrderedDict into plain dict for YAML
|
||||
default_users = {'default_users': users}
|
||||
plain_data = dictify(default_users)
|
||||
|
||||
# Register custom string representer
|
||||
yaml.SafeDumper.add_representer(str, represent_str)
|
||||
|
||||
# Dump the YAML file
|
||||
with open(args.output, 'w') as f:
|
||||
yaml.safe_dump(
|
||||
plain_data,
|
||||
f,
|
||||
default_flow_style=False,
|
||||
sort_keys=False,
|
||||
width=120
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user