From 5b18f39ccd440f21e479e184d349826440b4a2df Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Tue, 2 Dec 2025 13:44:16 +0100 Subject: [PATCH] Extend inventory create CLI with include/ignore filters and SSL/network defaults - https://chatgpt.com/share/692edecd-54c4-800f-b01b-35cf395a60f0 --- cli/create/inventory.py | 224 +++++++++++++++++++----- tests/unit/cli/create/test_inventory.py | 30 +++- 2 files changed, 206 insertions(+), 48 deletions(-) diff --git a/cli/create/inventory.py b/cli/create/inventory.py index 90c3ce55..43319660 100644 --- a/cli/create/inventory.py +++ b/cli/create/inventory.py @@ -9,27 +9,35 @@ This subcommand: 1. Uses `build inventory full` to generate a dynamic inventory for the given host containing all invokable applications. -2. Optionally filters the resulting groups by a user-provided list of - application_ids (`--roles`). +2. Optionally filters the resulting groups by: + - --include: only listed application_ids are kept + - --ignore: listed application_ids are removed + - --roles: legacy include filter (used only if --include/--ignore are not set) 3. Merges the generated inventory into an existing inventory file, without deleting or overwriting unrelated entries. 4. Ensures `host_vars/.yml` exists and stores base settings such as: - - PRIMARY_DOMAIN - - WEB_PROTOCOL + - PRIMARY_DOMAIN (optional) + - SSL_ENABLED + - networks.internet.ip4 + - networks.internet.ip6 Existing keys are preserved (only missing keys are added). 5. For every application_id in the final inventory, uses: - `meta/applications/role_name.py` to resolve the role path - `create/credentials.py --snippet` to generate credentials YAML snippets, and merges all snippets into host_vars in a single write. +6. If --vault-password-file is not provided, a file `.password` is created + in the inventory directory (if missing) and used as vault password file. """ import argparse import subprocess import sys from pathlib import Path -from typing import Dict, Any, List, Set, Optional +from typing import Dict, Any, List, Set, Optional, NoReturn import concurrent.futures -import os +import os +import secrets +import string try: import yaml @@ -67,6 +75,7 @@ def run_subprocess( raise SystemExit(msg) return result + def build_env_with_project_root(project_root: Path) -> Dict[str, str]: """ Return an environment dict where PYTHONPATH includes the project root. @@ -83,6 +92,7 @@ def build_env_with_project_root(project_root: Path) -> Dict[str, str]: env["PYTHONPATH"] = root_str return env + def detect_project_root() -> Path: """ Detect project root assuming this file is at: /cli/create/inventory.py @@ -109,9 +119,10 @@ def dump_yaml(path: Path, data: Dict[str, Any]) -> None: def parse_roles_list(raw_roles: Optional[List[str]]) -> Optional[Set[str]]: """ - Parse a list of roles supplied on the CLI. Supports: - --roles web-app-nextcloud web-app-mastodon - --roles web-app-nextcloud,web-app-mastodon + Parse a list of IDs supplied on the CLI. Supports: + --include web-app-nextcloud web-app-mastodon + --include web-app-nextcloud,web-app-mastodon + Same logic is reused for --ignore and --roles. """ if not raw_roles: return None @@ -128,6 +139,14 @@ def parse_roles_list(raw_roles: Optional[List[str]]) -> Optional[Set[str]]: return result +def generate_random_password(length: int = 64) -> str: + """ + Generate a random password using ASCII letters and digits. + """ + alphabet = string.ascii_letters + string.digits + return "".join(secrets.choice(alphabet) for _ in range(length)) + + # --------------------------------------------------------------------------- # Inventory generation (servers.yml via build/inventory/full.py) # --------------------------------------------------------------------------- @@ -160,17 +179,20 @@ def generate_dynamic_inventory( tmp_inventory.unlink(missing_ok=True) return data -def filter_inventory_by_roles(inv_data: Dict[str, Any], roles_filter: Set[str]) -> Dict[str, Any]: + +def _filter_inventory_children( + inv_data: Dict[str, Any], + predicate, +) -> Dict[str, Any]: """ - Return a new inventory dict that contains only the groups whose names - are in `roles_filter`. All other structure is preserved. + Generic helper: keep only children for which predicate(group_name, group_data) is True. """ all_block = inv_data.get("all", {}) children = all_block.get("children", {}) or {} filtered_children: Dict[str, Any] = {} for group_name, group_data in children.items(): - if group_name in roles_filter: + if predicate(group_name, group_data): filtered_children[group_name] = group_data new_all = dict(all_block) @@ -178,6 +200,36 @@ def filter_inventory_by_roles(inv_data: Dict[str, Any], roles_filter: Set[str]) return {"all": new_all} +def filter_inventory_by_roles(inv_data: Dict[str, Any], roles_filter: Set[str]) -> Dict[str, Any]: + """ + Legacy: keep only groups whose names are in roles_filter. + """ + return _filter_inventory_children( + inv_data, + lambda group_name, _group_data: group_name in roles_filter, + ) + + +def filter_inventory_by_include(inv_data: Dict[str, Any], include_set: Set[str]) -> Dict[str, Any]: + """ + Keep only groups whose names are in include_set. + """ + return _filter_inventory_children( + inv_data, + lambda group_name, _group_data: group_name in include_set, + ) + + +def filter_inventory_by_ignore(inv_data: Dict[str, Any], ignore_set: Set[str]) -> Dict[str, Any]: + """ + Keep all groups except those whose names are in ignore_set. + """ + return _filter_inventory_children( + inv_data, + lambda group_name, _group_data: group_name not in ignore_set, + ) + + def merge_inventories( base: Dict[str, Any], new: Dict[str, Any], @@ -216,6 +268,7 @@ def merge_inventories( return base + # --------------------------------------------------------------------------- # host_vars helpers # --------------------------------------------------------------------------- @@ -223,15 +276,19 @@ def merge_inventories( def ensure_host_vars_file( host_vars_file: Path, host: str, - primary_domain: str, - web_protocol: str, + primary_domain: Optional[str], + ssl_disabled: bool, + ip4: str, + ip6: str, ) -> None: """ Ensure host_vars/.yml exists and contains base settings. Important: Existing keys are NOT overwritten. Only missing keys are added: - - PRIMARY_DOMAIN - - WEB_PROTOCOL + - PRIMARY_DOMAIN (only if primary_domain is provided) + - SSL_ENABLED (true by default, false if --ssl-disabled is used) + - networks.internet.ip4 + - networks.internet.ip6 Uses ruamel.yaml so that custom tags like !vault are preserved and do not break parsing (unlike PyYAML safe_load). @@ -254,15 +311,34 @@ def ensure_host_vars_file( data = tmp # Only set defaults; do NOT override existing values - if "PRIMARY_DOMAIN" not in data: + if primary_domain is not None and "PRIMARY_DOMAIN" not in data: data["PRIMARY_DOMAIN"] = primary_domain - if "WEB_PROTOCOL" not in data: - data["WEB_PROTOCOL"] = web_protocol + + if "SSL_ENABLED" not in data: + # By default SSL is enabled; --ssl-disabled flips this to false + data["SSL_ENABLED"] = not ssl_disabled + + # networks.internet.ip4 / ip6 + networks = data.get("networks") + if not isinstance(networks, CommentedMap): + networks = CommentedMap() + data["networks"] = networks + + internet = networks.get("internet") + if not isinstance(internet, CommentedMap): + internet = CommentedMap() + networks["internet"] = internet + + if "ip4" not in internet: + internet["ip4"] = ip4 + if "ip6" not in internet: + internet["ip6"] = ip6 host_vars_file.parent.mkdir(parents=True, exist_ok=True) with host_vars_file.open("w", encoding="utf-8") as f: yaml_rt.dump(data, f) + def ensure_ruamel_map(node: CommentedMap, key: str) -> CommentedMap: """ Ensure node[key] exists and is a mapping (CommentedMap). @@ -339,11 +415,13 @@ def resolve_role_path( return role_path -def fatal(msg: str) -> "NoReturn": + +def fatal(msg: str) -> NoReturn: """Print a fatal error and exit with code 1.""" sys.stderr.write(f"[FATAL] {msg}\n") sys.exit(1) + # --------------------------------------------------------------------------- # Credentials generation via create/credentials.py --snippet # --------------------------------------------------------------------------- @@ -559,26 +637,36 @@ def main(argv: Optional[List[str]] = None) -> None: "credentials for all selected applications." ) ) + parser.add_argument( + "inventory_dir", + help="Inventory directory (e.g. inventories/galaxyserver).", + ) parser.add_argument( "--host", - required=True, - help="Hostname to use in the inventory (e.g. galaxyserver, localhost).", + required=False, + default="localhost", + help="Hostname to use in the inventory (default: localhost).", ) parser.add_argument( "--primary-domain", - required=True, - help="Primary domain for this host (e.g. infinito.nexus).", + required=False, + default=None, + help="Primary domain for this host (e.g. infinito.nexus). Optional.", ) parser.add_argument( - "--web-protocol", - default="https", - choices=("http", "https"), - help="Web protocol to use for this host (default: https).", + "--ssl-disabled", + action="store_true", + help="Disable SSL for this host (sets SSL_ENABLED: false in host_vars).", ) parser.add_argument( - "--inventory-dir", - required=True, - help="Path to the inventory directory (e.g. inventories/galaxyserver).", + "--ip4", + default="127.0.0.1", + help="IPv4 address for networks.internet.ip4 (default: 127.0.0.1).", + ) + parser.add_argument( + "--ip6", + default="::1", + help='IPv6 address for networks.internet.ip6 (default: "::1").', ) parser.add_argument( "--inventory-file", @@ -588,15 +676,35 @@ def main(argv: Optional[List[str]] = None) -> None: "--roles", nargs="+", help=( - "Optional list of application_ids to include. " - "If omitted, all invokable applications are used. " + "Optional legacy list of application_ids to include. " + "Used only if neither --include nor --ignore is specified. " "Supports comma-separated values as well." ), ) + parser.add_argument( + "--include", + nargs="+", + help=( + "Only include the listed application_ids in the inventory. " + "Mutually exclusive with --ignore." + ), + ) + parser.add_argument( + "--ignore", + nargs="+", + help=( + "Exclude the listed application_ids from the inventory. " + "Mutually exclusive with --include." + ), + ) parser.add_argument( "--vault-password-file", - required=True, - help="Path to the Vault password file for credentials generation.", + required=False, + default=None, + help=( + "Path to the Vault password file for credentials generation. " + "If omitted, /.password is created or reused." + ), ) parser.add_argument( "--roles-dir", @@ -615,6 +723,15 @@ def main(argv: Optional[List[str]] = None) -> None: args = parser.parse_args(argv) + # Parse include/ignore/roles lists + include_filter = parse_roles_list(args.include) + ignore_filter = parse_roles_list(args.ignore) + roles_filter = parse_roles_list(args.roles) + + # Enforce mutual exclusivity: only one of --include / --ignore may be used + if include_filter and ignore_filter: + fatal("Options --include and --ignore are mutually exclusive. Please use only one of them.") + project_root = detect_project_root() roles_dir = Path(args.roles_dir) if args.roles_dir else (project_root / "roles") categories_file = Path(args.categories_file) if args.categories_file else (roles_dir / "categories.yml") @@ -628,9 +745,24 @@ def main(argv: Optional[List[str]] = None) -> None: host_vars_dir = inventory_dir / "host_vars" host_vars_file = host_vars_dir / f"{args.host}.yml" - vault_password_file = Path(args.vault_password_file).resolve() + # Vault password file: use provided one, otherwise create/reuse .password in inventory_dir + if args.vault_password_file: + vault_password_file = Path(args.vault_password_file).resolve() + else: + vault_password_file = inventory_dir / ".password" + if not vault_password_file.exists(): + print(f"[INFO] No --vault-password-file provided. Creating {vault_password_file} ...") + password = generate_random_password() + with vault_password_file.open("w", encoding="utf-8") as f: + f.write(password + "\n") + try: + vault_password_file.chmod(0o600) + except PermissionError: + # Best-effort; ignore if chmod is not allowed + pass + else: + print(f"[INFO] Using existing vault password file: {vault_password_file}") - roles_filter = parse_roles_list(args.roles) tmp_inventory = inventory_dir / "_inventory_full_tmp.yml" # 1) Generate dynamic inventory via build/inventory/full.py @@ -643,9 +775,15 @@ def main(argv: Optional[List[str]] = None) -> None: project_root=project_root, ) - # 2) Optional: filter by roles - if roles_filter: - print(f"[INFO] Filtering inventory to roles: {', '.join(sorted(roles_filter))}") + # 2) Apply filters: include → ignore → legacy roles + if include_filter: + print(f"[INFO] Including only application_ids: {', '.join(sorted(include_filter))}") + dyn_inv = filter_inventory_by_include(dyn_inv, include_filter) + elif ignore_filter: + print(f"[INFO] Ignoring application_ids: {', '.join(sorted(ignore_filter))}") + dyn_inv = filter_inventory_by_ignore(dyn_inv, ignore_filter) + elif roles_filter: + print(f"[INFO] Filtering inventory to roles (legacy): {', '.join(sorted(roles_filter))}") dyn_inv = filter_inventory_by_roles(dyn_inv, roles_filter) # Collect final application_ids from dynamic inventory for credential generation @@ -673,7 +811,9 @@ def main(argv: Optional[List[str]] = None) -> None: host_vars_file=host_vars_file, host=args.host, primary_domain=args.primary_domain, - web_protocol=args.web_protocol, + ssl_disabled=args.ssl_disabled, + ip4=args.ip4, + ip6=args.ip6, ) # 5) Generate credentials for all application_ids (snippets + single merge) diff --git a/tests/unit/cli/create/test_inventory.py b/tests/unit/cli/create/test_inventory.py index 8f1393a6..338df1e6 100644 --- a/tests/unit/cli/create/test_inventory.py +++ b/tests/unit/cli/create/test_inventory.py @@ -101,7 +101,7 @@ class TestCreateInventory(unittest.TestCase): - load existing YAML containing a !vault tag without crashing, - preserve the !vault node including its tag, - keep existing keys untouched, - - add PRIMARY_DOMAIN, and WEB_PROTOCOL only when missing, + - add PRIMARY_DOMAIN, SSL_ENABLED and networks.internet.ip4/ip6 only when missing, - not overwrite them on subsequent calls. """ yaml_rt = YAML(typ="rt") @@ -123,12 +123,14 @@ existing_key: foo host_vars_dir.mkdir(parents=True, exist_ok=True) host_vars_file.write_text(initial_yaml, encoding="utf-8") - # Run ensure_host_vars_file + # Run ensure_host_vars_file (first time) ensure_host_vars_file( host_vars_file=host_vars_file, host=host, primary_domain="example.org", - web_protocol="https", + ssl_disabled=False, + ip4="127.0.0.1", + ip6="::1", ) # Reload with ruamel.yaml to verify structure and tags @@ -148,14 +150,26 @@ existing_key: foo # Default values must be added self.assertEqual(data["PRIMARY_DOMAIN"], "example.org") - self.assertEqual(data["WEB_PROTOCOL"], "https") + self.assertIn("SSL_ENABLED", data) + self.assertTrue(data["SSL_ENABLED"]) + + self.assertIn("networks", data) + self.assertIsInstance(data["networks"], CommentedMap) + self.assertIn("internet", data["networks"]) + self.assertIsInstance(data["networks"]["internet"], CommentedMap) + + internet = data["networks"]["internet"] + self.assertEqual(internet["ip4"], "127.0.0.1") + self.assertEqual(internet["ip6"], "::1") # A second call must NOT overwrite existing defaults ensure_host_vars_file( host_vars_file=host_vars_file, host="other-host", primary_domain="other.example", - web_protocol="http", + ssl_disabled=True, # would switch to false if it overwrote + ip4="10.0.0.1", + ip6="::2", ) with host_vars_file.open("r", encoding="utf-8") as f: @@ -163,7 +177,11 @@ existing_key: foo # Values remain unchanged self.assertEqual(data2["PRIMARY_DOMAIN"], "example.org") - self.assertEqual(data2["WEB_PROTOCOL"], "https") + self.assertTrue(data2["SSL_ENABLED"]) + + internet2 = data2["networks"]["internet"] + self.assertEqual(internet2["ip4"], "127.0.0.1") + self.assertEqual(internet2["ip6"], "::1") if __name__ == "__main__":