From d12f7a10a69fb991e085a72c7818d304a34292e1 Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Wed, 3 Dec 2025 12:09:47 +0100 Subject: [PATCH] Add optional --become-password support with automatic Vault encryption - Implement ensure_become_password() to handle explicit, generated, and existing become passwords - Integrate VaultHandler for encrypted ansible_become_password storage - Add CLI parameter --become-password to inventory creation workflow - Ensure backwards compatibility: existing passwords remain untouched unless explicitly overridden - Add unit test verifying non-overwrite behaviour when no password is provided - Part of migration and refactoring for Ansible 2.20 upgrade Reference: https://chatgpt.com/share/69301a6d-e920-800f-b19c-e5ca7c3bdd24 --- cli/create/inventory.py | 88 +++++++++++++++++++++++++ tests/unit/cli/create/test_inventory.py | 41 ++++++++++++ 2 files changed, 129 insertions(+) diff --git a/cli/create/inventory.py b/cli/create/inventory.py index 131a6847..92c6129a 100644 --- a/cli/create/inventory.py +++ b/cli/create/inventory.py @@ -50,6 +50,7 @@ try: except ImportError: # pragma: no cover raise SystemExit("Please `pip install ruamel.yaml` to use `infinito create inventory`.") +from module_utils.handler.vault import VaultHandler # --------------------------------------------------------------------------- # Generic helpers @@ -92,6 +93,75 @@ def build_env_with_project_root(project_root: Path) -> Dict[str, str]: env["PYTHONPATH"] = root_str return env +def ensure_become_password( + host_vars_file: Path, + vault_password_file: Path, + become_password: Optional[str], +) -> None: + """ + Ensure ansible_become_password exists and is stored as a vaulted string + according to the following rules: + + - If become_password is provided: + Encrypt it with Ansible Vault and set/overwrite ansible_become_password. + - If become_password is not provided and ansible_become_password already exists: + Do nothing (respect the existing value, even if it is plain text). + - If become_password is not provided and ansible_become_password is missing: + Generate a random password, encrypt it, and set ansible_become_password. + + The encryption is done via module_utils.handler.vault.VaultHandler so that the + resulting value is a !vault tagged scalar in host_vars. + """ + yaml_rt = YAML(typ="rt") + yaml_rt.preserve_quotes = True + + # Load existing host_vars document (created earlier by ensure_host_vars_file) + if host_vars_file.exists(): + with host_vars_file.open("r", encoding="utf-8") as f: + doc = yaml_rt.load(f) + if doc is None: + doc = CommentedMap() + else: + doc = CommentedMap() + + if not isinstance(doc, CommentedMap): + tmp = CommentedMap() + for k, v in dict(doc).items(): + tmp[k] = v + doc = tmp + + current_value = doc.get("ansible_become_password") + + # Case 1: no explicit password provided, but value already exists → respect it + if become_password is None and current_value is not None: + return + + # Case 2: explicit password provided → use it + # Case 3: no password provided and no value present → generate a random one + if become_password is not None: + plain_password = become_password + else: + plain_password = generate_random_password() + + # Use VaultHandler to encrypt the password via ansible-vault encrypt_string + handler = VaultHandler(str(vault_password_file)) + snippet_text = handler.encrypt_string(plain_password, "ansible_become_password") + + # Parse the snippet with ruamel.yaml to get the tagged !vault scalar node + snippet_yaml = YAML(typ="rt") + encrypted_doc = snippet_yaml.load(snippet_text) or CommentedMap() + encrypted_value = encrypted_doc.get("ansible_become_password") + if encrypted_value is None: + raise SystemExit( + "Failed to parse 'ansible_become_password' from ansible-vault output." + ) + + # Store the vaulted value in host_vars + doc["ansible_become_password"] = encrypted_value + + with host_vars_file.open("w", encoding="utf-8") as f: + yaml_rt.dump(doc, f) + def detect_project_root() -> Path: """ @@ -669,6 +739,16 @@ def main(argv: Optional[List[str]] = None) -> None: action="store_true", help="Disable SSL for this host (sets SSL_ENABLED: false in host_vars).", ) + parser.add_argument( + "--become-password", + required=False, + help=( + "Optional become password. If omitted and ansible_become_password is " + "missing, a random one is generated and vaulted. If omitted and " + "ansible_become_password already exists, it is left unchanged." + ), + ) + parser.add_argument( "--ip4", default="127.0.0.1", @@ -827,6 +907,14 @@ def main(argv: Optional[List[str]] = None) -> None: ip6=args.ip6, ) + # 4b) Ensure ansible_become_password is vaulted according to CLI options + print(f"[INFO] Ensuring ansible_become_password for host '{args.host}'") + ensure_become_password( + host_vars_file=host_vars_file, + vault_password_file=vault_password_file, + become_password=args.become_password, + ) + # 5) Generate credentials for all application_ids (snippets + single merge) if application_ids: print(f"[INFO] Generating credentials for {len(application_ids)} applications...") diff --git a/tests/unit/cli/create/test_inventory.py b/tests/unit/cli/create/test_inventory.py index 9f03262c..951aef63 100644 --- a/tests/unit/cli/create/test_inventory.py +++ b/tests/unit/cli/create/test_inventory.py @@ -13,6 +13,7 @@ sys.path.insert(0, dir_path) from cli.create.inventory import ( # type: ignore merge_inventories, ensure_host_vars_file, + ensure_become_password, ) from ruamel.yaml import YAML @@ -242,6 +243,46 @@ existing_key: foo data2 = yaml_rt.load(f) self.assertEqual(data2["ansible_connection"], "local") + + def test_ensure_become_password_keeps_existing_when_no_cli_password(self): + """ + If no --become-password is provided and ansible_become_password already + exists in host_vars, ensure_become_password must not overwrite it and + must not attempt to generate or vault a new one. + """ + + yaml_rt = YAML(typ="rt") + yaml_rt.preserve_quotes = True + + with tempfile.TemporaryDirectory() as tmpdir: + host = "localhost" + host_vars_dir = Path(tmpdir) + host_vars_file = host_vars_dir / f"{host}.yml" + vault_pw_file = host_vars_dir / ".password" + + # Create a dummy vault password file (not actually used in this test) + vault_pw_file.write_text("dummy\n", encoding="utf-8") + + # Seed host_vars with an existing ansible_become_password value + initial = CommentedMap() + initial["ansible_become_password"] = "EXISTING_VALUE" + with host_vars_file.open("w", encoding="utf-8") as f: + yaml_rt.dump(initial, f) + + # Call helper WITHOUT an explicit become_password + ensure_become_password( + host_vars_file=host_vars_file, + vault_password_file=vault_pw_file, + become_password=None, + ) + + # Reload and verify ansible_become_password remains unchanged + with host_vars_file.open("r", encoding="utf-8") as f: + doc = yaml_rt.load(f) + + self.assertIsNotNone(doc) + self.assertIn("ansible_become_password", doc) + self.assertEqual(doc["ansible_become_password"], "EXISTING_VALUE") if __name__ == "__main__": unittest.main()