From bf0134b9c59364ececee9f87c735ebd7614ac0fa Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Thu, 4 Dec 2025 22:06:10 +0100 Subject: [PATCH] Extend inventory creation with --authorized-keys support, unify PATH_ADMINISTRATOR_HOME resolution via group_vars, update CI workflow to pass dummy SSH key, and add full unit test coverage for new features. See details: https://chatgpt.com/share/6931f775-12d4-800f-9ccb-e6ce52097f9c --- .github/workflows/test-deploy.yml | 28 +++-- cli/create/inventory.py | 160 +++++++++++++++++++++++- tests/unit/cli/create/test_inventory.py | 152 ++++++++++++++++++++++ 3 files changed, 330 insertions(+), 10 deletions(-) diff --git a/.github/workflows/test-deploy.yml b/.github/workflows/test-deploy.yml index 8942bd11..85122a8c 100644 --- a/.github/workflows/test-deploy.yml +++ b/.github/workflows/test-deploy.yml @@ -1,4 +1,3 @@ -# .github/workflows/test-deploy.yml name: Build & Test Infinito.Nexus CLI in Docker Container on: @@ -18,9 +17,13 @@ jobs: env: EXCLUDED_ROLES: > drv-lid-switch, + svc-db-memcached, + svc-db-redis, svc-net-wireguard-core, svc-net-wireguard-firewalled, svc-net-wireguard-plain, + svc-bkp-loc-2-usb, + svc-bkp-rmt-2-loc, svc-opt-keyboard-color, svc-opt-ssd-hdd, web-app-bridgy-fed, @@ -40,29 +43,38 @@ jobs: - name: Show Docker version run: docker version - # First deploy: normal + debug + # 1) First deploy: normal + debug (mit Build) - name: First deploy (normal + debug) run: | - python -m cli.deploy.container --build --exclude "$EXCLUDED_ROLES" -- \ + python -m cli.deploy.container run --image "$INFINITO_IMAGE" --build -- \ + --exclude "$EXCLUDED_ROLES" \ + --authorized-keys "ssh-ed25519 AAAA_TEST_DUMMY_KEY github-ci-dummy@infinito" \ + -- \ -T server \ --debug \ --skip-cleanup \ --skip-tests - # Second deploy: reset + debug + # 2) Second deploy: reset + debug (ohne Build, nur reuse Image) - name: Second deploy (--reset --debug) run: | - python -m cli.deploy.container --exclude "$EXCLUDED_ROLES" -- \ + python -m cli.deploy.container run --image "$INFINITO_IMAGE" -- \ + --exclude "$EXCLUDED_ROLES" \ + --authorized-keys "ssh-ed25519 AAAA_TEST_DUMMY_KEY github-ci-dummy@infinito" \ + -- \ -T server \ --reset \ --debug \ --skip-cleanup \ --skip-tests - # Third deploy: async, no debug + # 3) Third deploy: async deploy – no debug - name: Third deploy (async deploy – no debug) run: | - python -m cli.deploy.container --exclude "$EXCLUDED_ROLES" -- \ + python -m cli.deploy.container run --image "$INFINITO_IMAGE" -- \ + --exclude "$EXCLUDED_ROLES" \ + --authorized-keys "ssh-ed25519 AAAA_TEST_DUMMY_KEY github-ci-dummy@infinito" \ + -- \ -T server \ --skip-cleanup \ - --skip-tests + --skip-tests \ No newline at end of file diff --git a/cli/create/inventory.py b/cli/create/inventory.py index f7b2a587..ab5315a1 100644 --- a/cli/create/inventory.py +++ b/cli/create/inventory.py @@ -429,6 +429,138 @@ def ensure_ruamel_map(node: CommentedMap, key: str) -> CommentedMap: return node[key] +def get_path_administrator_home_from_group_vars(project_root: Path) -> str: + """ + Read PATH_ADMINISTRATOR_HOME from group_vars/all/06_paths.yml. + + Expected layout (relative to project_root): + + group_vars/ + all/ + 06_paths.yml + + If the file or variable is missing, fall back to '/home/administrator/' + and emit a warning to stderr. + """ + paths_file = project_root / "group_vars" / "all" / "06_paths.yml" + default_path = "/home/administrator/" + + if not paths_file.exists(): + print( + f"[WARN] group_vars paths file not found: {paths_file}. " + f"Falling back to PATH_ADMINISTRATOR_HOME={default_path}", + file=sys.stderr, + ) + return default_path + + try: + with paths_file.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + except Exception as exc: # pragma: no cover + print( + f"[WARN] Failed to load {paths_file}: {exc}. " + f"Falling back to PATH_ADMINISTRATOR_HOME={default_path}", + file=sys.stderr, + ) + return default_path + + value = data.get("PATH_ADMINISTRATOR_HOME", default_path) + if not isinstance(value, str) or not value: + print( + f"[WARN] PATH_ADMINISTRATOR_HOME missing or invalid in {paths_file}. " + f"Falling back to {default_path}", + file=sys.stderr, + ) + return default_path + + # Normalize: ensure it ends with exactly one trailing slash. + value = value.rstrip("/") + "/" + return value + + +def ensure_administrator_authorized_keys( + inventory_dir: Path, + host: str, + authorized_keys_spec: Optional[str], + project_root: Path, +) -> None: + """ + Ensure that the administrator's authorized_keys file exists and contains + all keys provided via --authorized-keys. + + Behavior: + - If authorized_keys_spec is None → do nothing. + - If authorized_keys_spec is a path to an existing file: + read all non-empty, non-comment lines in that file as keys. + - Else: + treat authorized_keys_spec as literal key text, which may contain + one or more keys separated by newlines. + + The target file path mirrors the Ansible task in roles/user-administrator: + + src: "{{ inventory_dir }}/files/{{ inventory_hostname }}{{ PATH_ADMINISTRATOR_HOME }}.ssh/authorized_keys" + + We implement the same pattern here: + /files/.ssh/authorized_keys + + PATH_ADMINISTRATOR_HOME is read from group_vars/all/06_paths.yml so that + Python and Ansible share a single source of truth. + """ + if not authorized_keys_spec: + return + + # Read PATH_ADMINISTRATOR_HOME from group_vars/all/06_paths.yml + PATH_ADMINISTRATOR_HOME = get_path_administrator_home_from_group_vars(project_root) + + # Build relative path identical to the Ansible src: + # files/{{ inventory_hostname }}{{ PATH_ADMINISTRATOR_HOME }}.ssh/authorized_keys + rel_fragment = f"{host}{PATH_ADMINISTRATOR_HOME}.ssh/authorized_keys" + # remove leading slash so it becomes relative under files/ + rel_path = rel_fragment.lstrip("/") + target_path = inventory_dir / "files" / rel_path + target_path.parent.mkdir(parents=True, exist_ok=True) + + spec_path = Path(authorized_keys_spec) + if spec_path.exists() and spec_path.is_file(): + # Use keys from the referenced file. + source_text = spec_path.read_text(encoding="utf-8") + else: + # Treat the argument as literal key text. + source_text = authorized_keys_spec + + # Normalize incoming keys: one key per non-empty, non-comment line. + new_keys: List[str] = [] + for line in (source_text or "").splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + new_keys.append(stripped) + + if not new_keys: + # Nothing to add. + return + + existing_lines: List[str] = [] + existing_keys: Set[str] = set() + + if target_path.exists(): + for line in target_path.read_text(encoding="utf-8").splitlines(): + existing_lines.append(line) + stripped = line.strip() + if stripped and not stripped.startswith("#"): + existing_keys.add(stripped) + + # Append only keys that are not yet present (by stripped line match). + for key in new_keys: + if key not in existing_keys: + existing_lines.append(key) + existing_keys.add(key) + + # Write back, ensuring a trailing newline. + final_text = "\n".join(existing_lines).rstrip() + "\n" + target_path.write_text(final_text, encoding="utf-8") + + # --------------------------------------------------------------------------- # Role resolution (meta/applications/role_name.py) # --------------------------------------------------------------------------- @@ -748,7 +880,18 @@ def main(argv: Optional[List[str]] = None) -> None: "ansible_become_password already exists, it is left unchanged." ), ) - + parser.add_argument( + "--authorized-keys", + required=False, + help=( + "Optional SSH public keys for the 'administrator' account. " + "May be a literal key string (possibly with newlines) or a path " + "to a file containing one or more public keys. " + "All keys are ensured to exist in " + "files/.ssh/authorized_keys " + "under the inventory directory; missing keys are appended." + ), + ) parser.add_argument( "--ip4", default="127.0.0.1", @@ -849,7 +992,7 @@ def main(argv: Optional[List[str]] = None) -> None: try: vault_password_file.chmod(0o600) except PermissionError: - # Best-effort; exclude if chmod is not allowed + # Best-effort; ignore if chmod is not allowed pass else: print(f"[INFO] Using existing vault password file: {vault_password_file}") @@ -915,6 +1058,19 @@ def main(argv: Optional[List[str]] = None) -> None: become_password=args.become_password, ) + # 4c) Ensure administrator authorized_keys file contains keys from --authorized-keys + if args.authorized_keys: + print( + f"[INFO] Ensuring administrator authorized_keys for host '{args.host}' " + f"from spec: {args.authorized_keys}" + ) + ensure_administrator_authorized_keys( + inventory_dir=inventory_dir, + host=args.host, + authorized_keys_spec=args.authorized_keys, + project_root=project_root, + ) + # 5) Generate credentials for all application_ids (snippets + single merge) if application_ids: print(f"[INFO] Generating credentials for {len(application_ids)} applications...") diff --git a/tests/unit/cli/create/test_inventory.py b/tests/unit/cli/create/test_inventory.py index 8fefd97a..da455d08 100644 --- a/tests/unit/cli/create/test_inventory.py +++ b/tests/unit/cli/create/test_inventory.py @@ -17,6 +17,8 @@ from cli.create.inventory import ( # type: ignore parse_roles_list, filter_inventory_by_include, filter_inventory_by_ignore, + get_path_administrator_home_from_group_vars, + ensure_administrator_authorized_keys, ) from ruamel.yaml import YAML @@ -353,6 +355,156 @@ existing_key: foo self.assertIsNotNone(doc) self.assertIn("ansible_become_password", doc) self.assertEqual(doc["ansible_become_password"], "EXISTING_VALUE") + + def test_get_path_administrator_home_from_group_vars_reads_value(self): + """ + get_path_administrator_home_from_group_vars() must: + - read PATH_ADMINISTRATOR_HOME from group_vars/all/06_paths.yml, + - normalize it to have exactly one trailing slash. + """ + with tempfile.TemporaryDirectory() as tmpdir: + project_root = Path(tmpdir) + + # Create group_vars/all/06_paths.yml with a custom PATH_ADMINISTRATOR_HOME + gv_dir = project_root / "group_vars" / "all" + gv_dir.mkdir(parents=True, exist_ok=True) + paths_file = gv_dir / "06_paths.yml" + paths_file.write_text( + 'PATH_ADMINISTRATOR_HOME: "/custom/admin"\n', + encoding="utf-8", + ) + + value = get_path_administrator_home_from_group_vars(project_root) + # Must normalize to exactly one trailing slash + self.assertEqual(value, "/custom/admin/") + + def test_get_path_administrator_home_from_group_vars_falls_back_if_missing(self): + """ + If group_vars/all/06_paths.yml does not exist or does not define + PATH_ADMINISTRATOR_HOME, the helper must fall back to '/home/administrator/'. + """ + with tempfile.TemporaryDirectory() as tmpdir: + project_root = Path(tmpdir) + + # No group_vars/all/06_paths.yml present + value = get_path_administrator_home_from_group_vars(project_root) + self.assertEqual(value, "/home/administrator/") + + # Now create an empty 06_paths.yml without PATH_ADMINISTRATOR_HOME + gv_dir = project_root / "group_vars" / "all" + gv_dir.mkdir(parents=True, exist_ok=True) + paths_file = gv_dir / "06_paths.yml" + paths_file.write_text("", encoding="utf-8") + + value2 = get_path_administrator_home_from_group_vars(project_root) + self.assertEqual(value2, "/home/administrator/") + + def test_ensure_administrator_authorized_keys_uses_file_and_deduplicates(self): + """ + ensure_administrator_authorized_keys() must: + - read PATH_ADMINISTRATOR_HOME from group_vars/all/06_paths.yml, + - treat authorized_keys_spec as file path when it exists, + - append keys that are not yet present, + - not duplicate existing keys. + """ + with tempfile.TemporaryDirectory() as tmpdir: + tmp = Path(tmpdir) + + # Fake project_root with group_vars/all/06_paths.yml + project_root = tmp / "project" + project_root.mkdir(parents=True, exist_ok=True) + gv_dir = project_root / "group_vars" / "all" + gv_dir.mkdir(parents=True, exist_ok=True) + paths_file = gv_dir / "06_paths.yml" + paths_file.write_text( + 'PATH_ADMINISTRATOR_HOME: "/home/administrator/"\n', + encoding="utf-8", + ) + + # Inventory dir (separate from project_root, as in real usage) + inventory_dir = tmp / "inventory" + inventory_dir.mkdir(parents=True, exist_ok=True) + + host = "galaxyserver" + + # Prepare a source authorized_keys file with two keys + keys_file = tmp / "keys.pub" + key1 = "ssh-ed25519 AAAA... key1@example" + key2 = "ssh-ed25519 AAAA... key2@example" + keys_file.write_text(f"{key1}\n{key2}\n", encoding="utf-8") + + # Pre-create target file with key1 already present and a comment + # Path must match: files/.ssh/authorized_keys + # PATH_ADMINISTRATOR_HOME = /home/administrator/ + target_rel = f"{host}/home/administrator/.ssh/authorized_keys" + target_path = inventory_dir / "files" / target_rel + target_path.parent.mkdir(parents=True, exist_ok=True) + target_path.write_text( + f"# existing authorized_keys\n{key1}\n", + encoding="utf-8", + ) + + # Run helper with spec pointing to the keys file + ensure_administrator_authorized_keys( + inventory_dir=inventory_dir, + host=host, + authorized_keys_spec=str(keys_file), + project_root=project_root, + ) + + # Verify that file contains key1 only once and key2 appended + content = target_path.read_text(encoding="utf-8").strip().splitlines() + self.assertIn("# existing authorized_keys", content) + self.assertEqual(content.count(key1), 1) + self.assertEqual(content.count(key2), 1) + + def test_ensure_administrator_authorized_keys_accepts_literal_keys_string(self): + """ + When authorized_keys_spec is not a path to an existing file, + ensure_administrator_authorized_keys() must treat it as literal content. + """ + with tempfile.TemporaryDirectory() as tmpdir: + tmp = Path(tmpdir) + + project_root = tmp / "project" + project_root.mkdir(parents=True, exist_ok=True) + gv_dir = project_root / "group_vars" / "all" + gv_dir.mkdir(parents=True, exist_ok=True) + paths_file = gv_dir / "06_paths.yml" + paths_file.write_text( + 'PATH_ADMINISTRATOR_HOME: "/home/administrator/"\n', + encoding="utf-8", + ) + + inventory_dir = tmp / "inventory" + inventory_dir.mkdir(parents=True, exist_ok=True) + + host = "localhost" + + key1 = "ssh-rsa AAAA... literal1@example" + key2 = "ssh-rsa AAAA... literal2@example" + literal_spec = f"{key1}\n{key2}\n" + + ensure_administrator_authorized_keys( + inventory_dir=inventory_dir, + host=host, + authorized_keys_spec=literal_spec, + project_root=project_root, + ) + + # Target file should now exist with both keys + target_rel = f"{host}/home/administrator/.ssh/authorized_keys" + target_path = inventory_dir / "files" / target_rel + self.assertTrue(target_path.exists()) + + lines = [ + line.strip() + for line in target_path.read_text(encoding="utf-8").splitlines() + if line.strip() and not line.strip().startswith("#") + ] + self.assertIn(key1, lines) + self.assertIn(key2, lines) + if __name__ == "__main__": unittest.main()