diff --git a/cli/build/defaults/applications.py b/cli/build/defaults/applications.py index f485f681..f1e149d5 100644 --- a/cli/build/defaults/applications.py +++ b/cli/build/defaults/applications.py @@ -58,22 +58,28 @@ class DefaultsGenerator: continue config_data = load_yaml_file(config_file) - if config_data: - try: - gid_number = self.gid_lookup.run([application_id], roles_dir=str(self.roles_dir))[0] - except Exception as e: - print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr) - sys.exit(1) + if not config_data: + # Empty or null config → still register the application with empty defaults + self.log(f"Empty config for {role_name}, adding empty defaults for '{application_id}'") + result["defaults_applications"][application_id] = {} + continue - config_data["group_id"] = gid_number - result["defaults_applications"][application_id] = config_data + # Existing non-empty config: keep current behavior + try: + gid_number = self.gid_lookup.run([application_id], roles_dir=str(self.roles_dir))[0] + except Exception as e: + print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr) + sys.exit(1) - # Inject users mapping as Jinja2 references - users_meta = load_yaml_file(role_dir / "users" / "main.yml") - users_data = users_meta.get("users", {}) - transformed = {user: f"{{{{ users[\"{user}\"] }}}}" for user in users_data} - if transformed: - result["defaults_applications"][application_id]["users"] = transformed + config_data["group_id"] = gid_number + result["defaults_applications"][application_id] = config_data + + # Inject users mapping as Jinja2 references (unchanged) + users_meta = load_yaml_file(role_dir / "users" / "main.yml") + users_data = users_meta.get("users", {}) + transformed = {user: f"{{{{ users[\"{user}\"] }}}}" for user in users_data} + if transformed: + result["defaults_applications"][application_id]["users"] = transformed # Render placeholders in entire result context self.log("Starting placeholder rendering...") @@ -102,6 +108,95 @@ class DefaultsGenerator: rel = self.output_file print(f"✅ Generated: {rel}") + def test_empty_config_mapping_adds_empty_defaults(self): + """ + If a role has vars/main.yml and config/main.yml exists but contains an + empty mapping ({}), the generator must still emit an empty-dict entry + for that application_id. + """ + role_empty_cfg = self.roles_dir / "role-empty-config" + (role_empty_cfg / "vars").mkdir(parents=True, exist_ok=True) + (role_empty_cfg / "config").mkdir(parents=True, exist_ok=True) + + # application_id is defined… + (role_empty_cfg / "vars" / "main.yml").write_text( + "application_id: emptycfg\n", + encoding="utf-8", + ) + # …but config is an explicit empty mapping + (role_empty_cfg / "config" / "main.yml").write_text( + "{}\n", + encoding="utf-8", + ) + + result = subprocess.run( + [ + "python3", + str(self.script_path), + "--roles-dir", + str(self.roles_dir), + "--output-file", + str(self.output_file), + ], + capture_output=True, + text=True, + ) + self.assertEqual(result.returncode, 0, msg=result.stderr) + + data = yaml.safe_load(self.output_file.read_text()) + apps = data.get("defaults_applications", {}) + + self.assertIn("emptycfg", apps) + self.assertEqual( + apps["emptycfg"], + {}, + msg="Role with {} config should produce an empty defaults mapping", + ) + + def test_empty_config_file_adds_empty_defaults(self): + """ + If a role has vars/main.yml and config/main.yml exists but is an empty + file (or only whitespace), the generator must still emit an empty-dict + entry for that application_id. + """ + role_empty_file = self.roles_dir / "role-empty-config-file" + (role_empty_file / "vars").mkdir(parents=True, exist_ok=True) + (role_empty_file / "config").mkdir(parents=True, exist_ok=True) + + (role_empty_file / "vars" / "main.yml").write_text( + "application_id: emptyfileapp\n", + encoding="utf-8", + ) + # Create an empty file (no YAML content at all) + (role_empty_file / "config" / "main.yml").write_text( + "", + encoding="utf-8", + ) + + result = subprocess.run( + [ + "python3", + str(self.script_path), + "--roles-dir", + str(self.roles_dir), + "--output-file", + str(self.output_file), + ], + capture_output=True, + text=True, + ) + self.assertEqual(result.returncode, 0, msg=result.stderr) + + data = yaml.safe_load(self.output_file.read_text()) + apps = data.get("defaults_applications", {}) + + self.assertIn("emptyfileapp", apps) + self.assertEqual( + apps["emptyfileapp"], + {}, + msg="Role with empty config file should produce an empty defaults mapping", + ) + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Generate defaults_applications YAML...") parser.add_argument("--roles-dir", default="roles", help="Path to the roles directory") diff --git a/cli/create/credentials.py b/cli/create/credentials.py index 8c7ccb91..f972d52c 100644 --- a/cli/create/credentials.py +++ b/cli/create/credentials.py @@ -9,6 +9,14 @@ Usage example: --inventory-file host_vars/echoserver.yml \ --vault-password-file .pass/echoserver.txt \ --set credentials.database_password=mysecret + +With snippet mode (no file changes, just YAML output): + + infinito create credentials \ + --role-path roles/web-app-akaunting \ + --inventory-file host_vars/echoserver.yml \ + --vault-password-file .pass/echoserver.txt \ + --snippet """ import argparse @@ -92,7 +100,14 @@ def to_vault_block(vault_handler: VaultHandler, value: Union[str, Any], label: s Return a ruamel scalar tagged as !vault. If the input value is already vault-encrypted (string contains $ANSIBLE_VAULT or is a !vault scalar), reuse/wrap. Otherwise, encrypt plaintext via ansible-vault. + + Special rule: + - Empty strings ("") are NOT encrypted and are returned as plain "". """ + # Empty strings should not be encrypted + if isinstance(value, str) and value == "": + return "" + # Already a ruamel !vault scalar → reuse if _is_ruamel_vault(value): return value @@ -105,7 +120,6 @@ def to_vault_block(vault_handler: VaultHandler, value: Union[str, Any], label: s snippet = vault_handler.encrypt_string(str(value), label) return _make_vault_scalar_from_text(snippet) - def parse_overrides(pairs: list[str]) -> Dict[str, str]: """ Parse --set key=value pairs into a dict. @@ -139,6 +153,23 @@ def main() -> int: "-y", "--yes", action="store_true", help="Non-interactive: assume 'yes' for all overwrite confirmations when --force is used" ) + parser.add_argument( + "--snippet", + action="store_true", + help=( + "Do not modify the inventory file. Instead, print a YAML snippet with " + "the generated credentials to stdout. The snippet contains only the " + "application's credentials (and ansible_become_password if provided)." + ), + ) + parser.add_argument( + "--allow-empty-plain", + action="store_true", + help=( + "Allow 'plain' credentials in the schema without an explicit --set override. " + "Missing plain values will be set to an empty string before encryption." + ), + ) args = parser.parse_args() overrides = parse_overrides(args.set) @@ -148,36 +179,82 @@ def main() -> int: role_path=Path(args.role_path), inventory_path=Path(args.inventory_file), vault_pw=args.vault_password_file, - overrides=overrides + overrides=overrides, + allow_empty_plain=args.allow_empty_plain, ) - # 1) Load existing inventory with ruamel (round-trip) yaml_rt = YAML(typ="rt") yaml_rt.preserve_quotes = True + # Get schema-applied structure (defaults etc.) for *non-destructive* merge + schema_inventory: Dict[str, Any] = manager.apply_schema() + schema_apps = schema_inventory.get("applications", {}) + schema_app_block = schema_apps.get(manager.app_id, {}) + schema_creds = schema_app_block.get("credentials", {}) if isinstance(schema_app_block, dict) else {} + + # ------------------------------------------------------------------------- + # SNIPPET MODE: only build a YAML fragment and print to stdout, no file I/O + # ------------------------------------------------------------------------- + if args.snippet: + # Build a minimal structure: + # applications: + # : + # credentials: + # key: !vault | + # ... + # ansible_become_password: !vault | ... + snippet_data = CommentedMap() + apps_snip = ensure_map(snippet_data, "applications") + app_block_snip = ensure_map(apps_snip, manager.app_id) + creds_snip = ensure_map(app_block_snip, "credentials") + + for key, default_val in schema_creds.items(): + # Priority: --set exact key → default from schema → empty string + ov = overrides.get(f"credentials.{key}", None) + if ov is None: + ov = overrides.get(key, None) + + if ov is not None: + value_for_key: Union[str, Any] = ov + else: + if _is_vault_encrypted(default_val): + creds_snip[key] = to_vault_block(manager.vault_handler, default_val, key) + continue + value_for_key = "" if default_val is None else str(default_val) + + creds_snip[key] = to_vault_block(manager.vault_handler, value_for_key, key) + + # Optional ansible_become_password only if provided via overrides + if "ansible_become_password" in overrides: + snippet_data["ansible_become_password"] = to_vault_block( + manager.vault_handler, + overrides["ansible_become_password"], + "ansible_become_password", + ) + + yaml_rt.dump(snippet_data, sys.stdout) + return 0 + + # ------------------------------------------------------------------------- + # DEFAULT MODE: modify the inventory file on disk (previous behavior) + # ------------------------------------------------------------------------- + + # 1) Load existing inventory with ruamel (round-trip) with open(args.inventory_file, "r", encoding="utf-8") as f: data = yaml_rt.load(f) # CommentedMap or None if data is None: data = CommentedMap() - # 2) Get schema-applied structure (defaults etc.) for *non-destructive* merge - schema_inventory: Dict[str, Any] = manager.apply_schema() - - # 3) Ensure structural path exists + # 2) Ensure structural path exists apps = ensure_map(data, "applications") app_block = ensure_map(apps, manager.app_id) creds = ensure_map(app_block, "credentials") - # 4) Determine defaults we could add - schema_apps = schema_inventory.get("applications", {}) - schema_app_block = schema_apps.get(manager.app_id, {}) - schema_creds = schema_app_block.get("credentials", {}) if isinstance(schema_app_block, dict) else {} - - # 5) Add ONLY missing credential keys + # 3) Add ONLY missing credential keys (respect existing values) newly_added_keys = set() for key, default_val in schema_creds.items(): if key in creds: - # existing → do not touch (preserve plaintext/vault/formatting/comments) + # Existing → do not touch (preserve plaintext/vault/formatting/comments) continue # Value to use for the new key @@ -200,7 +277,7 @@ def main() -> int: creds[key] = to_vault_block(manager.vault_handler, value_for_new_key, key) newly_added_keys.add(key) - # 6) ansible_become_password: only add if missing; + # 4) ansible_become_password: only add if missing; # never rewrite an existing one unless --force (+ confirm/--yes) and override provided. if "ansible_become_password" not in data: val = overrides.get("ansible_become_password", None) @@ -216,7 +293,7 @@ def main() -> int: manager.vault_handler, overrides["ansible_become_password"], "ansible_become_password" ) - # 7) Overrides for existing credential keys (only with --force) + # 5) Overrides for existing credential keys (only with --force) if args.force: for ov_key, ov_val in overrides.items(): # Accept both 'credentials.key' and bare 'key' @@ -228,7 +305,7 @@ def main() -> int: if args.yes or ask_for_confirmation(key): creds[key] = to_vault_block(manager.vault_handler, ov_val, key) - # 8) Write back with ruamel (preserve formatting & comments) + # 6) Write back with ruamel (preserve formatting & comments) with open(args.inventory_file, "w", encoding="utf-8") as f: yaml_rt.dump(data, f) diff --git a/cli/create/inventory.py b/cli/create/inventory.py new file mode 100644 index 00000000..90c3ce55 --- /dev/null +++ b/cli/create/inventory.py @@ -0,0 +1,695 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Create or update a full Ansible inventory for a single host and automatically +generate credentials for all selected applications. + +This subcommand: + +1. Uses `build inventory full` to generate a dynamic inventory for the given + host containing all invokable applications. +2. Optionally filters the resulting groups by a user-provided list of + application_ids (`--roles`). +3. Merges the generated inventory into an existing inventory file, without + deleting or overwriting unrelated entries. +4. Ensures `host_vars/.yml` exists and stores base settings such as: + - PRIMARY_DOMAIN + - WEB_PROTOCOL + Existing keys are preserved (only missing keys are added). +5. For every application_id in the final inventory, uses: + - `meta/applications/role_name.py` to resolve the role path + - `create/credentials.py --snippet` to generate credentials YAML + snippets, and merges all snippets into host_vars in a single write. +""" + +import argparse +import subprocess +import sys +from pathlib import Path +from typing import Dict, Any, List, Set, Optional +import concurrent.futures +import os + +try: + import yaml +except ImportError: # pragma: no cover + raise SystemExit("Please `pip install pyyaml` to use `infinito create inventory`.") + +try: + from ruamel.yaml import YAML + from ruamel.yaml.comments import CommentedMap +except ImportError: # pragma: no cover + raise SystemExit("Please `pip install ruamel.yaml` to use `infinito create inventory`.") + + +# --------------------------------------------------------------------------- +# Generic helpers +# --------------------------------------------------------------------------- + +def run_subprocess( + cmd: List[str], + capture_output: bool = False, + env: Optional[Dict[str, str]] = None, +) -> subprocess.CompletedProcess: + """ + Run a subprocess command and either stream output or capture it. + Raise SystemExit on non-zero return code. + """ + if capture_output: + result = subprocess.run(cmd, text=True, capture_output=True, env=env) + else: + result = subprocess.run(cmd, text=True, env=env) + if result.returncode != 0: + msg = f"Command failed: {' '.join(str(c) for c in cmd)}\n" + if capture_output: + msg += f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}\n" + raise SystemExit(msg) + return result + +def build_env_with_project_root(project_root: Path) -> Dict[str, str]: + """ + Return an environment dict where PYTHONPATH includes the project root. + This makes `module_utils` and other top-level packages importable when + running project scripts as subprocesses. + """ + env = os.environ.copy() + root_str = str(project_root) + existing = env.get("PYTHONPATH") + if existing: + if root_str not in existing.split(os.pathsep): + env["PYTHONPATH"] = root_str + os.pathsep + existing + else: + env["PYTHONPATH"] = root_str + return env + +def detect_project_root() -> Path: + """ + Detect project root assuming this file is at: /cli/create/inventory.py + """ + here = Path(__file__).resolve() + # .../repo/cli/create/inventory.py → parents[2] == repo + return here.parents[2] + + +def load_yaml(path: Path) -> Dict[str, Any]: + if not path.exists(): + return {} + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + if not isinstance(data, dict): + raise SystemExit(f"Expected a mapping at top-level in {path}, got {type(data)}") + return data + + +def dump_yaml(path: Path, data: Dict[str, Any]) -> None: + with path.open("w", encoding="utf-8") as f: + yaml.safe_dump(data, f, sort_keys=False, default_flow_style=False) + + +def parse_roles_list(raw_roles: Optional[List[str]]) -> Optional[Set[str]]: + """ + Parse a list of roles supplied on the CLI. Supports: + --roles web-app-nextcloud web-app-mastodon + --roles web-app-nextcloud,web-app-mastodon + """ + if not raw_roles: + return None + result: Set[str] = set() + for token in raw_roles: + token = token.strip() + if not token: + continue + # Allow comma-separated tokens as well + for part in token.split(","): + part = part.strip() + if part: + result.add(part) + return result + + +# --------------------------------------------------------------------------- +# Inventory generation (servers.yml via build/inventory/full.py) +# --------------------------------------------------------------------------- + +def generate_dynamic_inventory( + host: str, + roles_dir: Path, + categories_file: Path, + tmp_inventory: Path, + project_root: Path, +) -> Dict[str, Any]: + """ + Call `cli/build/inventory/full.py` directly to generate a dynamic inventory + YAML for the given host and return it as a Python dict. + """ + script = project_root / "cli" / "build" / "inventory" / "full.py" + env = build_env_with_project_root(project_root) + cmd = [ + sys.executable, + str(script), + "--host", host, + "--format", "yaml", + "--inventory-style", "group", + "-c", str(categories_file), + "-r", str(roles_dir), + "-o", str(tmp_inventory), + ] + run_subprocess(cmd, capture_output=False, env=env) + data = load_yaml(tmp_inventory) + tmp_inventory.unlink(missing_ok=True) + return data + +def filter_inventory_by_roles(inv_data: Dict[str, Any], roles_filter: Set[str]) -> Dict[str, Any]: + """ + Return a new inventory dict that contains only the groups whose names + are in `roles_filter`. All other structure is preserved. + """ + all_block = inv_data.get("all", {}) + children = all_block.get("children", {}) or {} + + filtered_children: Dict[str, Any] = {} + for group_name, group_data in children.items(): + if group_name in roles_filter: + filtered_children[group_name] = group_data + + new_all = dict(all_block) + new_all["children"] = filtered_children + return {"all": new_all} + + +def merge_inventories( + base: Dict[str, Any], + new: Dict[str, Any], + host: str, +) -> Dict[str, Any]: + """ + Merge `new` inventory into `base` inventory without deleting any + existing groups/hosts/vars. + + For each group in `new`: + - ensure the group exists in `base` + - ensure `hosts` exists + - ensure the given `host` is present in that group's `hosts` + (keep existing hosts and host vars untouched) + """ + base_all = base.setdefault("all", {}) + base_children = base_all.setdefault("children", {}) + + new_all = new.get("all", {}) + new_children = new_all.get("children", {}) or {} + + for group_name, group_data in new_children.items(): + # Ensure group exists in base + base_group = base_children.setdefault(group_name, {}) + base_hosts = base_group.setdefault("hosts", {}) + + # Try to propagate host vars from new inventory if they exist + new_hosts = (group_data or {}).get("hosts", {}) or {} + host_vars = {} + if isinstance(new_hosts, dict) and host in new_hosts: + host_vars = new_hosts.get(host) or {} + + # Ensure the target host exists in this group + if host not in base_hosts: + base_hosts[host] = host_vars or {} + + return base + +# --------------------------------------------------------------------------- +# host_vars helpers +# --------------------------------------------------------------------------- + +def ensure_host_vars_file( + host_vars_file: Path, + host: str, + primary_domain: str, + web_protocol: str, +) -> None: + """ + Ensure host_vars/.yml exists and contains base settings. + + Important: Existing keys are NOT overwritten. Only missing keys are added: + - PRIMARY_DOMAIN + - WEB_PROTOCOL + + Uses ruamel.yaml so that custom tags like !vault are preserved and do not + break parsing (unlike PyYAML safe_load). + """ + yaml_rt = YAML(typ="rt") + yaml_rt.preserve_quotes = True + + if host_vars_file.exists(): + with host_vars_file.open("r", encoding="utf-8") as f: + data = yaml_rt.load(f) + if data is None: + data = CommentedMap() + else: + data = CommentedMap() + + if not isinstance(data, CommentedMap): + tmp = CommentedMap() + for k, v in dict(data).items(): + tmp[k] = v + data = tmp + + # Only set defaults; do NOT override existing values + if "PRIMARY_DOMAIN" not in data: + data["PRIMARY_DOMAIN"] = primary_domain + if "WEB_PROTOCOL" not in data: + data["WEB_PROTOCOL"] = web_protocol + + host_vars_file.parent.mkdir(parents=True, exist_ok=True) + with host_vars_file.open("w", encoding="utf-8") as f: + yaml_rt.dump(data, f) + +def ensure_ruamel_map(node: CommentedMap, key: str) -> CommentedMap: + """ + Ensure node[key] exists and is a mapping (CommentedMap). + """ + if key not in node or not isinstance(node.get(key), CommentedMap): + node[key] = CommentedMap() + return node[key] + + +# --------------------------------------------------------------------------- +# Role resolution (meta/applications/role_name.py) +# --------------------------------------------------------------------------- + +def resolve_role_path( + application_id: str, + roles_dir: Path, + project_root: Path, +) -> Optional[Path]: + """ + Use `cli/meta/applications/role_name.py` to resolve the role path + for a given application_id. Returns an absolute Path or None on failure. + + We expect the helper to print either: + - a bare role folder name (e.g. 'web-app-nextcloud'), or + - a relative path like 'roles/web-app-nextcloud', or + - an absolute path. + + We try, in order: + 1) / + 2) / + 3) use printed as-is if absolute + """ + script = project_root / "cli" / "meta" / "applications" / "role_name.py" + env = build_env_with_project_root(project_root) + cmd = [ + sys.executable, + str(script), + application_id, + "-r", str(roles_dir), + ] + result = run_subprocess(cmd, capture_output=True, env=env) + raw = (result.stdout or "").strip() + + if not raw: + print(f"[WARN] Could not resolve role for application_id '{application_id}'. Skipping.", file=sys.stderr) + return None + + printed = Path(raw) + + # 1) If it's absolute, just use it + if printed.is_absolute(): + role_path = printed + else: + # 2) Prefer resolving below roles_dir + candidate = roles_dir / printed + if candidate.exists(): + role_path = candidate + else: + # 3) Fallback: maybe the helper already printed something like 'roles/web-app-nextcloud' + candidate2 = project_root / printed + if candidate2.exists(): + role_path = candidate2 + else: + print( + f"[WARN] Resolved role path does not exist after probing: " + f"{candidate} and {candidate2} (application_id={application_id})", + file=sys.stderr, + ) + return None + + if not role_path.exists(): + print(f"[WARN] Resolved role path does not exist: {role_path} (application_id={application_id})", file=sys.stderr) + return None + + return role_path + +def fatal(msg: str) -> "NoReturn": + """Print a fatal error and exit with code 1.""" + sys.stderr.write(f"[FATAL] {msg}\n") + sys.exit(1) + +# --------------------------------------------------------------------------- +# Credentials generation via create/credentials.py --snippet +# --------------------------------------------------------------------------- + +def _generate_credentials_snippet_for_app( + app_id: str, + roles_dir: Path, + host_vars_file: Path, + vault_password_file: Path, + project_root: Path, + credentials_script: Path, +) -> Optional[CommentedMap]: + """ + Worker function for a single application_id: + + 1. Resolve role path via meta/applications/role_name.py. + 2. Skip if role path cannot be resolved. + 3. Skip if schema/main.yml does not exist. + 4. Call create/credentials.py with --snippet to get a YAML fragment. + + Returns a ruamel CommentedMap (snippet) or None on failure. + Errors are logged but do NOT abort the whole run. + """ + try: + role_path = resolve_role_path(app_id, roles_dir, project_root) + except SystemExit as exc: + sys.stderr.write(f"[ERROR] Failed to resolve role for {app_id}: {exc}\n") + return None + except Exception as exc: # pragma: no cover + sys.stderr.write( + f"[ERROR] Unexpected error while resolving role for {app_id}: {exc}\n" + ) + return None + + if role_path is None: + # resolve_role_path already logged a warning + return None + + schema_path = role_path / "schema" / "main.yml" + if not schema_path.exists(): + print( + f"[INFO] Skipping {app_id}: no schema/main.yml found at {schema_path}", + file=sys.stderr, + ) + return None + + cmd = [ + sys.executable, + str(credentials_script), + "--role-path", str(role_path), + "--inventory-file", str(host_vars_file), + "--vault-password-file", str(vault_password_file), + "--snippet", + "--allow-empty-plain", + ] + print(f"[INFO] Generating credentials snippet for {app_id} (role: {role_path})") + + env = build_env_with_project_root(project_root) + result = subprocess.run(cmd, text=True, capture_output=True, env=env) + if result.returncode != 0: + stdout = result.stdout or "" + stderr = result.stderr or "" + fatal( + f"Command failed ({result.returncode}): {' '.join(map(str, cmd))}\n" + f"STDOUT:\n{stdout}\nSTDERR:\n{stderr}" + ) + + snippet_text = (result.stdout or "").strip() + if not snippet_text: + # No output means nothing to merge + return None + + yaml_rt = YAML(typ="rt") + try: + data = yaml_rt.load(snippet_text) + except Exception as exc: # pragma: no cover + sys.stderr.write( + f"[ERROR] Failed to parse credentials snippet for {app_id}: {exc}\n" + f"Snippet was:\n{snippet_text}\n" + ) + return None + + if data is None: + return None + if not isinstance(data, CommentedMap): + # Normalize to CommentedMap + cm = CommentedMap() + for k, v in dict(data).items(): + cm[k] = v + return cm + + return data + + +def generate_credentials_for_roles( + application_ids: List[str], + roles_dir: Path, + host_vars_file: Path, + vault_password_file: Path, + project_root: Path, + workers: int = 4, +) -> None: + """ + Generate credentials for all given application_ids using create/credentials.py --snippet. + + Steps: + 1) In parallel, for each app_id: + - resolve role path + - skip roles without schema/main.yml + - run create/credentials.py --snippet + - return a YAML snippet (ruamel CommentedMap) + 2) Sequentially, merge all snippets into host_vars/.yml in a single write: + - applications..credentials. is added only if missing + - ansible_become_password is added only if missing + """ + if not application_ids: + print("[WARN] No application_ids to process for credential generation.", file=sys.stderr) + return + + credentials_script = project_root / "cli" / "create" / "credentials.py" + max_workers = max(1, workers) + print( + f"[INFO] Running credentials snippet generation for {len(application_ids)} " + f"applications with {max_workers} worker threads..." + ) + + snippets: List[CommentedMap] = [] + + # 1) Parallel: collect snippets + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_app: Dict[concurrent.futures.Future, str] = {} + + for app_id in application_ids: + future = executor.submit( + _generate_credentials_snippet_for_app, + app_id, + roles_dir, + host_vars_file, + vault_password_file, + project_root, + credentials_script, + ) + future_to_app[future] = app_id + + for future in concurrent.futures.as_completed(future_to_app): + app_id = future_to_app[future] + try: + snippet = future.result() + except Exception as exc: + fatal(f"Worker for {app_id} failed with exception: {exc}") + + if snippet is not None: + snippets.append(snippet) + + if not snippets: + print("[WARN] No credentials snippets were generated.", file=sys.stderr) + return + + # 2) Sequential: merge snippets into host_vars + yaml_rt = YAML(typ="rt") + yaml_rt.preserve_quotes = True + + if host_vars_file.exists(): + with host_vars_file.open("r", encoding="utf-8") as f: + doc = yaml_rt.load(f) + if doc is None: + doc = CommentedMap() + else: + doc = CommentedMap() + + if not isinstance(doc, CommentedMap): + tmp = CommentedMap() + for k, v in dict(doc).items(): + tmp[k] = v + doc = tmp + + # Merge each snippet + for snippet in snippets: + apps_snip = snippet.get("applications", {}) or {} + if isinstance(apps_snip, dict): + apps_doc = ensure_ruamel_map(doc, "applications") + for app_id, app_block_snip in apps_snip.items(): + if not isinstance(app_block_snip, dict): + continue + app_doc = ensure_ruamel_map(apps_doc, app_id) + creds_doc = ensure_ruamel_map(app_doc, "credentials") + + creds_snip = app_block_snip.get("credentials", {}) or {} + if not isinstance(creds_snip, dict): + continue + + for key, val in creds_snip.items(): + # Only add missing keys; do not overwrite existing credentials + if key not in creds_doc: + creds_doc[key] = val + + # ansible_become_password: only add if missing + if "ansible_become_password" in snippet and "ansible_become_password" not in doc: + doc["ansible_become_password"] = snippet["ansible_become_password"] + + with host_vars_file.open("w", encoding="utf-8") as f: + yaml_rt.dump(doc, f) + + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- + +def main(argv: Optional[List[str]] = None) -> None: + parser = argparse.ArgumentParser( + description=( + "Create or update a full inventory for a host and generate " + "credentials for all selected applications." + ) + ) + parser.add_argument( + "--host", + required=True, + help="Hostname to use in the inventory (e.g. galaxyserver, localhost).", + ) + parser.add_argument( + "--primary-domain", + required=True, + help="Primary domain for this host (e.g. infinito.nexus).", + ) + parser.add_argument( + "--web-protocol", + default="https", + choices=("http", "https"), + help="Web protocol to use for this host (default: https).", + ) + parser.add_argument( + "--inventory-dir", + required=True, + help="Path to the inventory directory (e.g. inventories/galaxyserver).", + ) + parser.add_argument( + "--inventory-file", + help="Inventory YAML file path (default: /servers.yml).", + ) + parser.add_argument( + "--roles", + nargs="+", + help=( + "Optional list of application_ids to include. " + "If omitted, all invokable applications are used. " + "Supports comma-separated values as well." + ), + ) + parser.add_argument( + "--vault-password-file", + required=True, + help="Path to the Vault password file for credentials generation.", + ) + parser.add_argument( + "--roles-dir", + help="Path to the roles/ directory (default: /roles).", + ) + parser.add_argument( + "--categories-file", + help="Path to roles/categories.yml (default: /categories.yml).", + ) + parser.add_argument( + "--workers", + type=int, + default=4, + help="Number of worker threads for parallel credentials snippet generation (default: 4).", + ) + + args = parser.parse_args(argv) + + project_root = detect_project_root() + roles_dir = Path(args.roles_dir) if args.roles_dir else (project_root / "roles") + categories_file = Path(args.categories_file) if args.categories_file else (roles_dir / "categories.yml") + + inventory_dir = Path(args.inventory_dir).resolve() + inventory_dir.mkdir(parents=True, exist_ok=True) + + inventory_file = Path(args.inventory_file) if args.inventory_file else (inventory_dir / "servers.yml") + inventory_file = inventory_file.resolve() + + host_vars_dir = inventory_dir / "host_vars" + host_vars_file = host_vars_dir / f"{args.host}.yml" + + vault_password_file = Path(args.vault_password_file).resolve() + + roles_filter = parse_roles_list(args.roles) + tmp_inventory = inventory_dir / "_inventory_full_tmp.yml" + + # 1) Generate dynamic inventory via build/inventory/full.py + print("[INFO] Generating dynamic inventory via cli/build/inventory/full.py ...") + dyn_inv = generate_dynamic_inventory( + host=args.host, + roles_dir=roles_dir, + categories_file=categories_file, + tmp_inventory=tmp_inventory, + project_root=project_root, + ) + + # 2) Optional: filter by roles + if roles_filter: + print(f"[INFO] Filtering inventory to roles: {', '.join(sorted(roles_filter))}") + dyn_inv = filter_inventory_by_roles(dyn_inv, roles_filter) + + # Collect final application_ids from dynamic inventory for credential generation + dyn_all = dyn_inv.get("all", {}) + dyn_children = dyn_all.get("children", {}) or {} + application_ids = sorted(dyn_children.keys()) + + if not application_ids: + print("[WARN] No application_ids found in dynamic inventory after filtering. Nothing to do.", file=sys.stderr) + + # 3) Merge with existing inventory file (if any) + if inventory_file.exists(): + print(f"[INFO] Merging into existing inventory: {inventory_file}") + base_inv = load_yaml(inventory_file) + else: + print(f"[INFO] Creating new inventory file: {inventory_file}") + base_inv = {} + + merged_inv = merge_inventories(base_inv, dyn_inv, host=args.host) + dump_yaml(inventory_file, merged_inv) + + # 4) Ensure host_vars/.yml exists and has base settings + print(f"[INFO] Ensuring host_vars for host '{args.host}' at {host_vars_file}") + ensure_host_vars_file( + host_vars_file=host_vars_file, + host=args.host, + primary_domain=args.primary_domain, + web_protocol=args.web_protocol, + ) + + # 5) Generate credentials for all application_ids (snippets + single merge) + if application_ids: + print(f"[INFO] Generating credentials for {len(application_ids)} applications...") + generate_credentials_for_roles( + application_ids=application_ids, + roles_dir=roles_dir, + host_vars_file=host_vars_file, + vault_password_file=vault_password_file, + project_root=project_root, + workers=args.workers, + ) + + print("[INFO] Done. Inventory and host_vars updated without deleting existing values.") + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/module_utils/manager/inventory.py b/module_utils/manager/inventory.py index 75a9905e..2901f585 100644 --- a/module_utils/manager/inventory.py +++ b/module_utils/manager/inventory.py @@ -10,12 +10,20 @@ import sys import base64 class InventoryManager: - def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]): + def __init__( + self, + role_path: Path, + inventory_path: Path, + vault_pw: str, + overrides: Dict[str, str], + allow_empty_plain: bool = False, + ): """Initialize the Inventory Manager.""" self.role_path = role_path self.inventory_path = inventory_path self.vault_pw = vault_pw self.overrides = overrides + self.allow_empty_plain = allow_empty_plain self.inventory = YamlHandler.load_yaml(inventory_path) self.schema = YamlHandler.load_yaml(role_path / "schema" / "main.yml") self.app_id = self.load_application_id(role_path) @@ -43,12 +51,10 @@ class InventoryManager: # Check if 'central-database' is enabled in the features section of data if "features" in data: - if "central_database" in data["features"] and \ - data["features"]["central_database"]: + if "central_database" in data["features"] and data["features"]["central_database"]: # Add 'central_database' value (password) to credentials target.setdefault("credentials", {})["database_password"] = self.generate_value("alphanumeric") - if "oauth2" in data["features"] and \ - data["features"]["oauth2"]: + if "oauth2" in data["features"] and data["features"]["oauth2"]: target.setdefault("credentials", {})["oauth2_proxy_cookie_secret"] = self.generate_value("random_hex_16") # Apply recursion only for the `credentials` section @@ -59,46 +65,59 @@ class InventoryManager: """Recursively process only the 'credentials' section and generate values.""" for key, meta in branch.items(): full_key = f"{prefix}.{key}" if prefix else key - + # Only process 'credentials' section for encryption - if prefix == "credentials" and isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")): + if prefix == "credentials" and isinstance(meta, dict) and all( + k in meta for k in ("description", "algorithm", "validation") + ): alg = meta["algorithm"] if alg == "plain": - # Must be supplied via --set + # Must be supplied via --set, unless allow_empty_plain=True if full_key not in self.overrides: - print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=", file=sys.stderr) - sys.exit(1) - plain = self.overrides[full_key] + if self.allow_empty_plain: + plain = "" + else: + print( + f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=", + file=sys.stderr, + ) + sys.exit(1) + else: + plain = self.overrides[full_key] else: plain = self.overrides.get(full_key, self.generate_value(alg)) - + # Check if the value is already vaulted or if it's a dictionary existing_value = dest.get(key) - + # If existing_value is a dictionary, print a warning and skip encryption if isinstance(existing_value, dict): - print(f"Skipping encryption for '{key}', as it is a dictionary.") + print(f"Skipping encryption for '{key}', as it is a dictionary.", file=sys.stderr) continue - + # Check if the value is a VaultScalar and already vaulted if existing_value and isinstance(existing_value, VaultScalar): - print(f"Skipping encryption for '{key}', as it is already vaulted.") + print(f"Skipping encryption for '{key}', as it is already vaulted.", file=sys.stderr) continue - + + # Empty strings should *not* be encrypted + if plain == "": + dest[key] = "" + continue + # Encrypt only if it's not already vaulted snippet = self.vault_handler.encrypt_string(plain, key) lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) body = "\n".join(line[indent:] for line in lines[1:]) dest[key] = VaultScalar(body) - + elif isinstance(meta, dict): sub = dest.setdefault(key, {}) self.recurse_credentials(meta, sub, full_key) else: dest[key] = meta - def generate_secure_alphanumeric(self, length: int) -> str: """Generate a cryptographically secure random alphanumeric string of the given length.""" characters = string.ascii_letters + string.digits # a-zA-Z0-9 diff --git a/roles/desk-nextcloud/config/main.yml b/roles/desk-nextcloud/config/main.yml new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/roles/desk-nextcloud/config/main.yml @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/roles/desk-nextcloud/schema/main.yml b/roles/desk-nextcloud/schema/main.yml index 1f3e54a6..fabf3019 100644 --- a/roles/desk-nextcloud/schema/main.yml +++ b/roles/desk-nextcloud/schema/main.yml @@ -1 +1,5 @@ -cloud_fqdn: # @todo Add detailled scheme for this entry \ No newline at end of file +credentials: + cloud_fqdn: + description: "Cloud fqdn" + algorithm: "plain" + validation: "^.*$" \ No newline at end of file diff --git a/roles/desk-nextcloud/vars/main.yml b/roles/desk-nextcloud/vars/main.yml index a573fba0..6c994ba5 100644 --- a/roles/desk-nextcloud/vars/main.yml +++ b/roles/desk-nextcloud/vars/main.yml @@ -1,4 +1,4 @@ application_id: desk-nextcloud nextcloud_user_home_directory: "/home/{{ users[desktop_username].username }}/" -nextcloud_cloud_fqdn: "{{ applications | get_app_conf(application_id, 'cloud_fqdn') }}" -nextcloud_cloud_directory: '{{nextcloud_user_home_directory}}Clouds/{{nextcloud_cloud_fqdn}}/{{ users[desktop_username].username }}/' +nextcloud_cloud_fqdn: "{{ applications | get_app_conf(application_id, 'credentials.cloud_fqdn') }}" +nextcloud_cloud_directory: '{{ nextcloud_user_home_directory }}Clouds/{{nextcloud_cloud_fqdn}}/{{ users[desktop_username].username }}/' diff --git a/roles/svc-opt-keyboard-color/config/main.yml b/roles/svc-opt-keyboard-color/config/main.yml new file mode 100644 index 00000000..144faec1 --- /dev/null +++ b/roles/svc-opt-keyboard-color/config/main.yml @@ -0,0 +1 @@ +vendor_and_product_id: 1038:113a # Default \ No newline at end of file diff --git a/roles/svc-opt-keyboard-color/schema/main.yml b/roles/svc-opt-keyboard-color/schema/main.yml deleted file mode 100644 index 2673f7cc..00000000 --- a/roles/svc-opt-keyboard-color/schema/main.yml +++ /dev/null @@ -1 +0,0 @@ -vendor_and_product_id: "" # @todo schema needs to be implemented \ No newline at end of file diff --git a/roles/svc-opt-keyboard-color/vars/main.yml b/roles/svc-opt-keyboard-color/vars/main.yml index 172adef9..d38dc20a 100644 --- a/roles/svc-opt-keyboard-color/vars/main.yml +++ b/roles/svc-opt-keyboard-color/vars/main.yml @@ -1,3 +1,3 @@ -application_id: svc-opt-keyboard-color -system_service_id: "{{ application_id }}" +application_id: svc-opt-keyboard-color +system_service_id: "{{ application_id }}" vendor_and_product_id: "{{ applications | get_app_conf(application_id, 'vendor_and_product_id') }}" diff --git a/roles/web-app-gitea/schema/main.yml b/roles/web-app-gitea/schema/main.yml index 075c0f24..e7c86601 100644 --- a/roles/web-app-gitea/schema/main.yml +++ b/roles/web-app-gitea/schema/main.yml @@ -1 +1 @@ -credentials: \ No newline at end of file +credentials: {} \ No newline at end of file diff --git a/roles/web-app-magento/schema/main.yml b/roles/web-app-magento/schema/main.yml index e2a9c94f..b2720792 100644 --- a/roles/web-app-magento/schema/main.yml +++ b/roles/web-app-magento/schema/main.yml @@ -1,7 +1,9 @@ credentials: adobe_public_key: - description: "Adobe/Magento Marketplace Public Key" - algorithm: "plain" + description: "Adobe/Magento Marketplace Public Key" + algorithm: "plain" + validation: "^.*$" adobe_private_key: - description: "Adobe/Magento Marketplace Private Key" - algorithm: "plain" + description: "Adobe/Magento Marketplace Private Key" + algorithm: "plain" + validation: "^.*$" diff --git a/roles/web-app-oauth2-proxy/schema/main.yml b/roles/web-app-oauth2-proxy/schema/main.yml index dfe0265c..43a09879 100644 --- a/roles/web-app-oauth2-proxy/schema/main.yml +++ b/roles/web-app-oauth2-proxy/schema/main.yml @@ -1,7 +1,6 @@ # This file was created during refactoring to pass integration tests -# @todo add correct type credentials: oauth2_proxy_cookie_secret: description: "Cookie secret for OAuth2 Proxy (hex string, 32 characters, generated via `openssl rand -hex 16`)" - type: string - validation: "^[0-9A-Fa-f]{32}$" + algorithm: "plain" + validation: "^.*$" diff --git a/roles/web-app-openproject/schema/main.yml b/roles/web-app-openproject/schema/main.yml index 075c0f24..e7c86601 100644 --- a/roles/web-app-openproject/schema/main.yml +++ b/roles/web-app-openproject/schema/main.yml @@ -1 +1 @@ -credentials: \ No newline at end of file +credentials: {} \ No newline at end of file diff --git a/roles/web-app-phpldapadmin/schema/main.yml b/roles/web-app-phpldapadmin/schema/main.yml index 075c0f24..e7c86601 100644 --- a/roles/web-app-phpldapadmin/schema/main.yml +++ b/roles/web-app-phpldapadmin/schema/main.yml @@ -1 +1 @@ -credentials: \ No newline at end of file +credentials: {} \ No newline at end of file diff --git a/roles/web-app-phpmyadmin/schema/main.yml b/roles/web-app-phpmyadmin/schema/main.yml index 075c0f24..e7c86601 100644 --- a/roles/web-app-phpmyadmin/schema/main.yml +++ b/roles/web-app-phpmyadmin/schema/main.yml @@ -1 +1 @@ -credentials: \ No newline at end of file +credentials: {} \ No newline at end of file diff --git a/tests/unit/cli/create/test_credentials.py b/tests/unit/cli/create/test_credentials.py index bc988708..abcd6f5f 100644 --- a/tests/unit/cli/create/test_credentials.py +++ b/tests/unit/cli/create/test_credentials.py @@ -98,5 +98,66 @@ class TestCreateCredentials(unittest.TestCase): self.assertIsInstance(creds['api_key'], str) self.assertTrue(creds['api_key'].lstrip().startswith('$ANSIBLE_VAULT')) -if __name__ == '__main__': - unittest.main() + def test_main_plain_algorithm_allow_empty_plain_sets_empty_string_without_vault(self): + """ + When --allow-empty-plain is used, a 'plain' credential without override + should be set to "" and *not* encrypted (no ansible-vault calls). + """ + with tempfile.TemporaryDirectory() as tmpdir: + role_path = os.path.join(tmpdir, 'role') + os.makedirs(os.path.join(role_path, 'config')) + os.makedirs(os.path.join(role_path, 'schema')) + os.makedirs(os.path.join(role_path, 'vars')) + + # vars/main.yml with application_id + main_vars = {'application_id': 'app_empty_plain'} + with open(os.path.join(role_path, 'vars', 'main.yml'), 'w') as f: + yaml.dump(main_vars, f) + + # config/main.yml + config = {'features': {'central_database': False}} + with open(os.path.join(role_path, "config", "main.yml"), 'w') as f: + yaml.dump(config, f) + + # schema/main.yml: plain credential *without* overrides + schema = { + 'credentials': { + 'api_key': { + 'description': 'API key', + 'algorithm': 'plain', + 'validation': {} + } + } + } + with open(os.path.join(role_path, 'schema', 'main.yml'), 'w') as f: + yaml.dump(schema, f) + + # Empty inventory file + inventory_file = os.path.join(tmpdir, 'inventory.yml') + with open(inventory_file, 'w') as f: + yaml.dump({}, f) + + # Vault password file + vault_pw_file = os.path.join(tmpdir, 'pw.txt') + with open(vault_pw_file, 'w') as f: + f.write('pw') + + # Ensure ansible-vault is *not* called at all in this scenario + def fail_run(*_args, **_kwargs): + raise AssertionError("ansible-vault must not be called for allow_empty_plain + empty plain") + + with mock.patch('subprocess.run', side_effect=fail_run): + sys.argv = [ + 'create/credentials.py', + '--role-path', role_path, + '--inventory-file', inventory_file, + '--vault-password-file', vault_pw_file, + '--allow-empty-plain', + ] + main() + + data = yaml.safe_load(open(inventory_file)) + creds = data['applications']['app_empty_plain']['credentials'] + # api_key should exist and be an empty string, not a vault block + self.assertIn('api_key', creds) + self.assertEqual(creds['api_key'], "") \ No newline at end of file diff --git a/tests/unit/cli/create/test_inventory.py b/tests/unit/cli/create/test_inventory.py new file mode 100644 index 00000000..8f1393a6 --- /dev/null +++ b/tests/unit/cli/create/test_inventory.py @@ -0,0 +1,170 @@ +import os +import sys +import tempfile +import unittest +from pathlib import Path + +# Make cli module importable (same pattern as test_credentials.py) +dir_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), '../../../cli') +) +sys.path.insert(0, dir_path) + +from cli.create.inventory import ( # type: ignore + merge_inventories, + ensure_host_vars_file, +) + +from ruamel.yaml import YAML +from ruamel.yaml.comments import CommentedMap + + +class TestCreateInventory(unittest.TestCase): + def test_merge_inventories_adds_host_and_preserves_existing(self): + """ + merge_inventories() must: + - ensure the given host exists in every group of the new inventory, + - keep existing hosts and their variables untouched, + - copy host vars from the new inventory when available, + - create missing groups and add the host. + """ + host = "localhost" + + base = { + "all": { + "children": { + "web-app-nextcloud": { + "hosts": { + "oldhost": {"ansible_host": "1.2.3.4"}, + } + }, + "web-app-matomo": { + "hosts": { + "otherhost": {"ansible_host": "5.6.7.8"}, + } + }, + } + } + } + + # New inventory with localhost defined in two groups + new = { + "all": { + "children": { + "web-app-nextcloud": { + "hosts": { + "localhost": {"ansible_host": "127.0.0.1"}, + } + }, + "web-app-matomo": { + "hosts": { + "localhost": {}, + } + }, + # A new group with no hosts → merge_inventories must create hosts + localhost + "web-app-phpmyadmin": {} + } + } + } + + merged = merge_inventories(base, new, host=host) + children = merged["all"]["children"] + + # 1) Existing hosts must remain unchanged + self.assertIn("oldhost", children["web-app-nextcloud"]["hosts"]) + self.assertIn("otherhost", children["web-app-matomo"]["hosts"]) + + # 2) localhost must be inserted into all groups from `new` + self.assertIn("localhost", children["web-app-nextcloud"]["hosts"]) + self.assertIn("localhost", children["web-app-matomo"]["hosts"]) + self.assertIn("localhost", children["web-app-phpmyadmin"]["hosts"]) + + # 3) Host vars from the new inventory must be preserved + self.assertEqual( + children["web-app-nextcloud"]["hosts"]["localhost"], + {"ansible_host": "127.0.0.1"}, + ) + # Empty dict stays empty + self.assertEqual( + children["web-app-matomo"]["hosts"]["localhost"], + {}, + ) + # New group with no host vars receives an empty dict + self.assertEqual( + children["web-app-phpmyadmin"]["hosts"]["localhost"], + {}, + ) + + def test_ensure_host_vars_file_preserves_vault_and_adds_defaults(self): + """ + ensure_host_vars_file() must: + - load existing YAML containing a !vault tag without crashing, + - preserve the !vault node including its tag, + - keep existing keys untouched, + - add PRIMARY_DOMAIN, and WEB_PROTOCOL only when missing, + - not overwrite them on subsequent calls. + """ + yaml_rt = YAML(typ="rt") + yaml_rt.preserve_quotes = True + + with tempfile.TemporaryDirectory() as tmpdir: + host = "localhost" + host_vars_dir = Path(tmpdir) + host_vars_file = host_vars_dir / f"{host}.yml" + + # File containing a !vault tag to ensure ruamel loader works correctly + initial_yaml = """\ +secret: !vault | + $ANSIBLE_VAULT;1.1;AES256 + ENCRYPTEDVALUE +existing_key: foo +""" + + host_vars_dir.mkdir(parents=True, exist_ok=True) + host_vars_file.write_text(initial_yaml, encoding="utf-8") + + # Run ensure_host_vars_file + ensure_host_vars_file( + host_vars_file=host_vars_file, + host=host, + primary_domain="example.org", + web_protocol="https", + ) + + # Reload with ruamel.yaml to verify structure and tags + with host_vars_file.open("r", encoding="utf-8") as f: + data = yaml_rt.load(f) + + self.assertIsInstance(data, CommentedMap) + + # Existing keys must remain + self.assertIn("secret", data) + self.assertIn("existing_key", data) + self.assertEqual(data["existing_key"], "foo") + + # !vault tag must stay intact + secret_node = data["secret"] + self.assertEqual(getattr(secret_node, "tag", None), "!vault") + + # Default values must be added + self.assertEqual(data["PRIMARY_DOMAIN"], "example.org") + self.assertEqual(data["WEB_PROTOCOL"], "https") + + # A second call must NOT overwrite existing defaults + ensure_host_vars_file( + host_vars_file=host_vars_file, + host="other-host", + primary_domain="other.example", + web_protocol="http", + ) + + with host_vars_file.open("r", encoding="utf-8") as f: + data2 = yaml_rt.load(f) + + # Values remain unchanged + self.assertEqual(data2["PRIMARY_DOMAIN"], "example.org") + self.assertEqual(data2["WEB_PROTOCOL"], "https") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/web-app-desktop/__init__.py b/tests/unit/module_utils/manager/__init__.py similarity index 100% rename from tests/unit/web-app-desktop/__init__.py rename to tests/unit/module_utils/manager/__init__.py diff --git a/tests/unit/module_utils/manager/test_inventory_manager.py b/tests/unit/module_utils/manager/test_inventory_manager.py new file mode 100644 index 00000000..ef9f513f --- /dev/null +++ b/tests/unit/module_utils/manager/test_inventory_manager.py @@ -0,0 +1,374 @@ +import os +import sys +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +# Make project root importable so that `module_utils` can be imported +ROOT_DIR = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../../..") +) +sys.path.insert(0, ROOT_DIR) + +from module_utils.manager.inventory import InventoryManager # type: ignore +from module_utils.handler.vault import VaultScalar # type: ignore + + +class TestInventoryManager(unittest.TestCase): + def test_load_application_id_missing_exits(self): + """ + If vars/main.yml does not contain application_id, InventoryManager + must print an error and exit with code 1. + """ + with tempfile.TemporaryDirectory() as tmpdir: + role_path = Path(tmpdir) / "role" + inv_path = Path(tmpdir) / "inventory.yml" + + role_path.mkdir(parents=True, exist_ok=True) + (role_path / "schema").mkdir(parents=True, exist_ok=True) + (role_path / "vars").mkdir(parents=True, exist_ok=True) + (role_path / "config").mkdir(parents=True, exist_ok=True) + + # Dummy files so that Path comparisons in the fake loader work + (role_path / "schema" / "main.yml").write_text("{}", encoding="utf-8") + (role_path / "vars" / "main.yml").write_text("{}", encoding="utf-8") + (role_path / "config" / "main.yml").write_text("{}", encoding="utf-8") + inv_path.write_text("{}", encoding="utf-8") + + inventory_path = inv_path + + def fake_load_yaml(path): + p = Path(path) + if p == inventory_path: + return {} + if p == role_path / "schema" / "main.yml": + return {} + if p == role_path / "vars" / "main.yml": + # Missing application_id on purpose + return {} + if p == role_path / "config" / "main.yml": + return {"features": {}} + return {} + + with mock.patch( + "module_utils.manager.inventory.YamlHandler.load_yaml", + side_effect=fake_load_yaml, + ), mock.patch( + "module_utils.manager.inventory.VaultHandler" + ): + with self.assertRaises(SystemExit) as ctx: + InventoryManager( + role_path=role_path, + inventory_path=inventory_path, + vault_pw="dummy", + overrides={}, + ) + self.assertEqual(ctx.exception.code, 1) + + def test_plain_without_override_and_allow_empty_plain_exits(self): + """ + For a `plain` algorithm credential, if no override is provided and + allow_empty_plain=False, recurse_credentials/apply_schema must exit. + """ + with tempfile.TemporaryDirectory() as tmpdir: + role_path = Path(tmpdir) / "role" + inv_path = Path(tmpdir) / "inventory.yml" + + role_path.mkdir(parents=True, exist_ok=True) + (role_path / "schema").mkdir(parents=True, exist_ok=True) + (role_path / "vars").mkdir(parents=True, exist_ok=True) + (role_path / "config").mkdir(parents=True, exist_ok=True) + inv_path.write_text("{}", encoding="utf-8") + + inventory_path = inv_path + + schema_data = { + "credentials": { + "api_key": { + "description": "API key", + "algorithm": "plain", + "validation": {}, + } + } + } + + def fake_load_yaml(path): + p = Path(path) + if p == inventory_path: + return {"applications": {}} + if p == role_path / "schema" / "main.yml": + return schema_data + if p == role_path / "vars" / "main.yml": + return {"application_id": "app_test"} + if p == role_path / "config" / "main.yml": + return {"features": {}} + return {} + + with mock.patch( + "module_utils.manager.inventory.YamlHandler.load_yaml", + side_effect=fake_load_yaml, + ), mock.patch( + "module_utils.manager.inventory.VaultHandler" + ): + mgr = InventoryManager( + role_path=role_path, + inventory_path=inventory_path, + vault_pw="dummy", + overrides={}, # no plain override + allow_empty_plain=False, + ) + with self.assertRaises(SystemExit) as ctx: + mgr.apply_schema() + self.assertEqual(ctx.exception.code, 1) + + def test_plain_with_allow_empty_plain_sets_empty_string_unencrypted(self): + """ + For a `plain` algorithm credential, if no override is provided and + allow_empty_plain=True, the credential should be set to an empty string + and must NOT be encrypted. + """ + with tempfile.TemporaryDirectory() as tmpdir: + role_path = Path(tmpdir) / "role" + inv_path = Path(tmpdir) / "inventory.yml" + + role_path.mkdir(parents=True, exist_ok=True) + (role_path / "schema").mkdir(parents=True, exist_ok=True) + (role_path / "vars").mkdir(parents=True, exist_ok=True) + (role_path / "config").mkdir(parents=True, exist_ok=True) + inv_path.write_text("{}", encoding="utf-8") + + inventory_path = inv_path + + schema_data = { + "credentials": { + "api_key": { + "description": "API key", + "algorithm": "plain", + "validation": {}, + } + } + } + + def fake_load_yaml(path): + p = Path(path) + if p == inventory_path: + return {"applications": {}} + if p == role_path / "schema" / "main.yml": + return schema_data + if p == role_path / "vars" / "main.yml": + return {"application_id": "app_test"} + if p == role_path / "config" / "main.yml": + return {"features": {}} + return {} + + with mock.patch( + "module_utils.manager.inventory.YamlHandler.load_yaml", + side_effect=fake_load_yaml, + ), mock.patch( + "module_utils.manager.inventory.VaultHandler" + ) as mock_vault_cls: + # VaultHandler instance + mock_vault = mock_vault_cls.return_value + mock_vault.encrypt_string.return_value = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTED" + + mgr = InventoryManager( + role_path=role_path, + inventory_path=inventory_path, + vault_pw="dummy", + overrides={}, # no override for plain + allow_empty_plain=True, + ) + inv = mgr.apply_schema() + + apps = inv.get("applications", {}) + app_block = apps.get("app_test", {}) + creds = app_block.get("credentials", {}) + + # api_key must be present and must be a literal empty string + self.assertIn("api_key", creds) + self.assertEqual(creds["api_key"], "") + + # Empty string must not trigger encryption + mock_vault.encrypt_string.assert_not_called() + + def test_non_plain_algorithm_encrypts_and_sets_vaultscalar(self): + """ + For non-plain algorithms, recurse_credentials must generate a value + and encrypt it into a VaultScalar, unless an existing VaultScalar + is already present. + """ + with tempfile.TemporaryDirectory() as tmpdir: + role_path = Path(tmpdir) / "role" + inv_path = Path(tmpdir) / "inventory.yml" + + role_path.mkdir(parents=True, exist_ok=True) + (role_path / "schema").mkdir(parents=True, exist_ok=True) + (role_path / "vars").mkdir(parents=True, exist_ok=True) + (role_path / "config").mkdir(parents=True, exist_ok=True) + inv_path.write_text("{}", encoding="utf-8") + + inventory_path = inv_path + + schema_data = { + "credentials": { + "api_key": { + "description": "API key", + "algorithm": "random_hex_16", + "validation": {}, + } + } + } + + def fake_load_yaml(path): + p = Path(path) + if p == inventory_path: + return {"applications": {}} + if p == role_path / "schema" / "main.yml": + return schema_data + if p == role_path / "vars" / "main.yml": + return {"application_id": "app_test"} + if p == role_path / "config" / "main.yml": + return {"features": {}} + return {} + + fake_snippet = "!vault |\n $ANSIBLE_VAULT;1.1;AES256\n ENCRYPTEDVALUE" + + with mock.patch( + "module_utils.manager.inventory.YamlHandler.load_yaml", + side_effect=fake_load_yaml, + ), mock.patch( + "module_utils.manager.inventory.VaultHandler" + ) as mock_vault_cls, mock.patch.object( + InventoryManager, + "generate_value", + return_value="PLAINVAL", + ): + mock_vault = mock_vault_cls.return_value + mock_vault.encrypt_string.return_value = fake_snippet + + mgr = InventoryManager( + role_path=role_path, + inventory_path=inventory_path, + vault_pw="dummy", + overrides={}, + allow_empty_plain=False, + ) + inv = mgr.apply_schema() + + apps = inv.get("applications", {}) + app_block = apps.get("app_test", {}) + creds = app_block.get("credentials", {}) + + self.assertIn("api_key", creds) + value = creds["api_key"] + + # api_key must be a VaultScalar + self.assertIsInstance(value, VaultScalar) + # Its underlying body should contain the vault header line + self.assertIn("$ANSIBLE_VAULT", str(value)) + + # Encryption must have been called with generated plaintext and key + mock_vault.encrypt_string.assert_called_once_with("PLAINVAL", "api_key") + + def test_recurse_skips_existing_dict_and_vaultscalar(self): + """ + If the destination already contains: + - a dict for a credential key, or + - a VaultScalar for a credential key, + recurse_credentials must skip re-encryption and leave existing values + untouched. + """ + with tempfile.TemporaryDirectory() as tmpdir: + role_path = Path(tmpdir) / "role" + inv_path = Path(tmpdir) / "inventory.yml" + + role_path.mkdir(parents=True, exist_ok=True) + (role_path / "schema").mkdir(parents=True, exist_ok=True) + (role_path / "vars").mkdir(parents=True, exist_ok=True) + (role_path / "config").mkdir(parents=True, exist_ok=True) + inv_path.write_text("{}", encoding="utf-8") + + inventory_path = inv_path + + # Existing credentials in inventory + existing_vault = VaultScalar("EXISTING_BODY") + existing_dict = {"nested": "value"} + + inventory_data = { + "applications": { + "app_test": { + "credentials": { + "already_vaulted": existing_vault, + "complex": existing_dict, + } + } + } + } + + schema_data = { + "credentials": { + "already_vaulted": { + "description": "Vaulted", + "algorithm": "random_hex_16", + "validation": {}, + }, + "complex": { + "description": "Complex dict", + "algorithm": "random_hex_16", + "validation": {}, + }, + } + } + + def fake_load_yaml(path): + p = Path(path) + if p == inventory_path: + return inventory_data + if p == role_path / "schema" / "main.yml": + return schema_data + if p == role_path / "vars" / "main.yml": + return {"application_id": "app_test"} + if p == role_path / "config" / "main.yml": + return {"features": {}} + return {} + + with mock.patch( + "module_utils.manager.inventory.YamlHandler.load_yaml", + side_effect=fake_load_yaml, + ), mock.patch( + "module_utils.manager.inventory.VaultHandler" + ) as mock_vault_cls, mock.patch.object( + InventoryManager, + "generate_value", + return_value="IGNORED", + ): + mock_vault = mock_vault_cls.return_value + mock_vault.encrypt_string.side_effect = AssertionError( + "encrypt_string should not be called for existing VaultScalar/dict" + ) + + mgr = InventoryManager( + role_path=role_path, + inventory_path=inventory_path, + vault_pw="dummy", + overrides={}, + allow_empty_plain=False, + ) + inv = mgr.apply_schema() + + apps = inv.get("applications", {}) + app_block = apps.get("app_test", {}) + creds = app_block.get("credentials", {}) + + # Both keys must still be present + self.assertIn("already_vaulted", creds) + self.assertIn("complex", creds) + + # Types and values must be preserved + self.assertIs(creds["already_vaulted"], existing_vault) + self.assertIs(creds["complex"], existing_dict) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/web-app-desktop/lookup_plugins/__init__.py b/tests/unit/web-app-desktop/lookup_plugins/__init__.py deleted file mode 100644 index f7d69623..00000000 --- a/tests/unit/web-app-desktop/lookup_plugins/__init__.py +++ /dev/null @@ -1,117 +0,0 @@ -import os -import sys -import unittest -from importlib import import_module - -# Compute repo root (…/tests/unit/roles/web-app-desktop/lookup_plugins/docker_cards_grouped.py -> repo root) -_THIS_DIR = os.path.dirname(__file__) -_REPO_ROOT = os.path.abspath(os.path.join(_THIS_DIR, "../../../../..")) - -# Add the lookup_plugins directory to sys.path so we can import the plugin as a plain module -_LOOKUP_DIR = os.path.join(_REPO_ROOT, "roles", "web-app-desktop", "lookup_plugins") -if _LOOKUP_DIR not in sys.path: - sys.path.insert(0, _LOOKUP_DIR) - -# Import the plugin module -plugin = import_module("docker_cards_grouped") -LookupModule = plugin.LookupModule - -try: - from ansible.errors import AnsibleError -except Exception: # Fallback for environments without full Ansible - class AnsibleError(Exception): - pass - - -class TestDockerCardsGroupedLookup(unittest.TestCase): - def setUp(self): - self.lookup = LookupModule() - - # Menu categories with mixed-case names to verify case-insensitive sort - self.menu_categories = { - "B-Group": {"tags": ["b", "beta"]}, - "a-Group": {"tags": ["a", "alpha"]}, - "Zeta": {"tags": ["z"]}, - } - - # Cards with tags; one should end up uncategorized - self.cards = [ - {"title": "Alpha Tool", "tags": ["a"]}, - {"title": "Beta Widget", "tags": ["beta"]}, - {"title": "Zed App", "tags": ["z"]}, - {"title": "Unmatched Thing", "tags": ["x"]}, - ] - - def _run(self, cards=None, menu_categories=None): - result = self.lookup.run( - [cards or self.cards, menu_categories or self.menu_categories] - ) - # Plugin returns a single-element list containing the result dict - self.assertIsInstance(result, list) - self.assertEqual(len(result), 1) - self.assertIsInstance(result[0], dict) - return result[0] - - def test_categorization_and_uncategorized(self): - data = self._run() - self.assertIn("categorized", data) - self.assertIn("uncategorized", data) - - categorized = data["categorized"] - uncategorized = data["uncategorized"] - - # Each matching card is placed into the proper category - self.assertIn("a-Group", categorized) - self.assertIn("B-Group", categorized) - self.assertIn("Zeta", categorized) - - titles_in_a = [c["title"] for c in categorized["a-Group"]] - titles_in_b = [c["title"] for c in categorized["B-Group"]] - titles_in_z = [c["title"] for c in categorized["Zeta"]] - - self.assertEqual(titles_in_a, ["Alpha Tool"]) - self.assertEqual(titles_in_b, ["Beta Widget"]) - self.assertEqual(titles_in_z, ["Zed App"]) - - # Unmatched card should be in 'uncategorized' - self.assertEqual(len(uncategorized), 1) - self.assertEqual(uncategorized[0]["title"], "Unmatched Thing") - - def test_categories_sorted_alphabetically_case_insensitive(self): - data = self._run() - categorized = data["categorized"] - - # Verify order is alphabetical by key, case-insensitive - keys = list(categorized.keys()) - self.assertEqual(keys, ["a-Group", "B-Group", "Zeta"]) - - def test_multiple_tags_match_first_category_encountered(self): - # A card that matches multiple categories should be placed - # into the first matching category based on menu_categories iteration order. - # Here "Dual Match" has both 'a' and 'b' tags; since "a-Group" is alphabetically - # before "B-Group" only after sorting happens at RETURN time, we need to ensure the - # assignment is based on menu_categories order (insertion order). - menu_categories = { - "B-Group": {"tags": ["b"]}, - "a-Group": {"tags": ["a"]}, - } - cards = [{"title": "Dual Match", "tags": ["a", "b"]}] - # The plugin iterates menu_categories in insertion order and breaks on first match, - # so this card should end up in "B-Group". - data = self._run(cards=cards, menu_categories=menu_categories) - categorized = data["categorized"] - - self.assertIn("B-Group", categorized) - self.assertEqual([c["title"] for c in categorized["B-Group"]], ["Dual Match"]) - self.assertNotIn("a-Group", categorized) # no card added there - - def test_missing_arguments_raises(self): - with self.assertRaises(AnsibleError): - self.lookup.run([]) # no args - - with self.assertRaises(AnsibleError): - self.lookup.run([[]]) # only one arg - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit/web-app-desktop/lookup_plugins/docker_cards_grouped.py b/tests/unit/web-app-desktop/lookup_plugins/docker_cards_grouped.py deleted file mode 100644 index e69de29b..00000000