#!/usr/bin/env python3 import os import sys import argparse import yaml import glob from collections import OrderedDict def represent_str(dumper, data): """ Custom YAML string representer that forces double quotes around any string containing a Jinja2 placeholder ({{ ... }}). """ if isinstance(data, str) and '{{' in data: return dumper.represent_scalar( 'tag:yaml.org,2002:str', data, style='"' ) return dumper.represent_scalar( 'tag:yaml.org,2002:str', data ) def build_users(defs, primary_domain, start_id, become_pwd): """ Construct user entries with auto-incremented UID/GID, default username/email, and optional description. Args: defs (OrderedDict): Mapping of user keys to their override settings. primary_domain (str): The primary domain for email addresses (e.g. 'example.com'). start_id (int): Starting number for UID/GID allocation (e.g. 1001). become_pwd (str): Default password string for users without an override. Returns: OrderedDict: Complete user definitions with all required fields filled in. Raises: ValueError: If there are duplicate UIDs, usernames, or emails. """ users = OrderedDict() used_uids = set() # Collect any preset UIDs to avoid collisions for key, overrides in defs.items(): if 'uid' in overrides: uid = overrides['uid'] if uid in used_uids: raise ValueError(f"Duplicate uid {uid} for user '{key}'") used_uids.add(uid) next_uid = start_id def allocate_uid(): nonlocal next_uid # Find the next free UID not already used while next_uid in used_uids: next_uid += 1 free_uid = next_uid used_uids.add(free_uid) next_uid += 1 return free_uid # Build each user entry for key, overrides in defs.items(): username = overrides.get('username', key) email = overrides.get('email', f"{username}@{primary_domain}") description = overrides.get('description') roles = overrides.get('roles', []) password = overrides.get('password', become_pwd) # Determine UID and GID if 'uid' in overrides: uid = overrides['uid'] else: uid = allocate_uid() gid = overrides.get('gid', uid) entry = { 'username': username, 'email': email, 'password': password, 'uid': uid, 'gid': gid, 'roles': roles } if description is not None: entry['description'] = description users[key] = entry # Ensure uniqueness of usernames and emails seen_usernames = set() seen_emails = set() for key, entry in users.items(): un = entry['username'] em = entry['email'] if un in seen_usernames: raise ValueError(f"Duplicate username '{un}' in merged users") if em in seen_emails: raise ValueError(f"Duplicate email '{em}' in merged users") seen_usernames.add(un) seen_emails.add(em) return users def load_user_defs(roles_directory): """ Scan all roles/*/meta/users.yml files and merge any 'users:' sections. Args: roles_directory (str): Path to the directory containing role subdirectories. Returns: OrderedDict: Merged user definitions from all roles. Raises: ValueError: On invalid format or conflicting override values. """ pattern = os.path.join(roles_directory, '*/meta/users.yml') files = sorted(glob.glob(pattern)) merged = OrderedDict() for filepath in files: with open(filepath, 'r') as f: data = yaml.safe_load(f) or {} users = data.get('users', {}) if not isinstance(users, dict): continue for key, overrides in users.items(): if not isinstance(overrides, dict): raise ValueError(f"Invalid definition for user '{key}' in {filepath}") if key not in merged: merged[key] = overrides.copy() else: existing = merged[key] for field, value in overrides.items(): if field in existing and existing[field] != value: raise ValueError( f"Conflict for user '{key}': field '{field}' has existing value '{existing[field]}', tried to set '{value}' in {filepath}" ) existing.update(overrides) return merged def dictify(data): """ Recursively convert OrderedDict to regular dict for YAML dumping. """ if isinstance(data, OrderedDict): return {k: dictify(v) for k, v in data.items()} if isinstance(data, dict): return {k: dictify(v) for k, v in data.items()} if isinstance(data, list): return [dictify(v) for v in data] return data def parse_args(): parser = argparse.ArgumentParser( description='Generate a users.yml by merging all roles/*/meta/users.yml definitions.' ) parser.add_argument( '--roles-dir', '-r', required=True, help='Directory containing roles (e.g., roles/*/meta/users.yml).' ) parser.add_argument( '--output', '-o', required=True, help='Path to the output YAML file (e.g., users.yml).' ) parser.add_argument( '--start-id', '-s', type=int, default=1001, help='Starting UID/GID number (default: 1001).' ) parser.add_argument( '--extra-users', '-e', help='Comma-separated list of additional usernames to include.', default=None ) return parser.parse_args() def main(): args = parse_args() primary_domain = '{{ primary_domain }}' become_pwd = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}' try: definitions = load_user_defs(args.roles_dir) except ValueError as e: print(f"Error merging user definitions: {e}", file=sys.stderr) sys.exit(1) # Add extra users if specified if args.extra_users: for name in args.extra_users.split(','): user_key = name.strip() if not user_key: continue if user_key in definitions: print(f"Warning: extra user '{user_key}' already defined; skipping.", file=sys.stderr) else: definitions[user_key] = {} try: users = build_users( definitions, primary_domain, args.start_id, become_pwd ) except ValueError as e: print(f"Error building user entries: {e}", file=sys.stderr) sys.exit(1) # Convert OrderedDict into plain dict for YAML default_users = {'default_users': users} plain_data = dictify(default_users) # Register custom string representer yaml.SafeDumper.add_representer(str, represent_str) # Dump the YAML file with open(args.output, 'w') as f: yaml.safe_dump( plain_data, f, default_flow_style=False, sort_keys=False, width=120 ) if __name__ == '__main__': main()