Add optional --become-password support with automatic Vault encryption

- Implement ensure_become_password() to handle explicit, generated, and existing become passwords
- Integrate VaultHandler for encrypted ansible_become_password storage
- Add CLI parameter --become-password to inventory creation workflow
- Ensure backwards compatibility: existing passwords remain untouched unless explicitly overridden
- Add unit test verifying non-overwrite behaviour when no password is provided
- Part of migration and refactoring for Ansible 2.20 upgrade

Reference: https://chatgpt.com/share/69301a6d-e920-800f-b19c-e5ca7c3bdd24
This commit is contained in:
2025-12-03 12:09:47 +01:00
parent e22893bdcb
commit d12f7a10a6
2 changed files with 129 additions and 0 deletions

View File

@@ -50,6 +50,7 @@ try:
except ImportError: # pragma: no cover
raise SystemExit("Please `pip install ruamel.yaml` to use `infinito create inventory`.")
from module_utils.handler.vault import VaultHandler
# ---------------------------------------------------------------------------
# Generic helpers
@@ -92,6 +93,75 @@ def build_env_with_project_root(project_root: Path) -> Dict[str, str]:
env["PYTHONPATH"] = root_str
return env
def ensure_become_password(
host_vars_file: Path,
vault_password_file: Path,
become_password: Optional[str],
) -> None:
"""
Ensure ansible_become_password exists and is stored as a vaulted string
according to the following rules:
- If become_password is provided:
Encrypt it with Ansible Vault and set/overwrite ansible_become_password.
- If become_password is not provided and ansible_become_password already exists:
Do nothing (respect the existing value, even if it is plain text).
- If become_password is not provided and ansible_become_password is missing:
Generate a random password, encrypt it, and set ansible_become_password.
The encryption is done via module_utils.handler.vault.VaultHandler so that the
resulting value is a !vault tagged scalar in host_vars.
"""
yaml_rt = YAML(typ="rt")
yaml_rt.preserve_quotes = True
# Load existing host_vars document (created earlier by ensure_host_vars_file)
if host_vars_file.exists():
with host_vars_file.open("r", encoding="utf-8") as f:
doc = yaml_rt.load(f)
if doc is None:
doc = CommentedMap()
else:
doc = CommentedMap()
if not isinstance(doc, CommentedMap):
tmp = CommentedMap()
for k, v in dict(doc).items():
tmp[k] = v
doc = tmp
current_value = doc.get("ansible_become_password")
# Case 1: no explicit password provided, but value already exists → respect it
if become_password is None and current_value is not None:
return
# Case 2: explicit password provided → use it
# Case 3: no password provided and no value present → generate a random one
if become_password is not None:
plain_password = become_password
else:
plain_password = generate_random_password()
# Use VaultHandler to encrypt the password via ansible-vault encrypt_string
handler = VaultHandler(str(vault_password_file))
snippet_text = handler.encrypt_string(plain_password, "ansible_become_password")
# Parse the snippet with ruamel.yaml to get the tagged !vault scalar node
snippet_yaml = YAML(typ="rt")
encrypted_doc = snippet_yaml.load(snippet_text) or CommentedMap()
encrypted_value = encrypted_doc.get("ansible_become_password")
if encrypted_value is None:
raise SystemExit(
"Failed to parse 'ansible_become_password' from ansible-vault output."
)
# Store the vaulted value in host_vars
doc["ansible_become_password"] = encrypted_value
with host_vars_file.open("w", encoding="utf-8") as f:
yaml_rt.dump(doc, f)
def detect_project_root() -> Path:
"""
@@ -669,6 +739,16 @@ def main(argv: Optional[List[str]] = None) -> None:
action="store_true",
help="Disable SSL for this host (sets SSL_ENABLED: false in host_vars).",
)
parser.add_argument(
"--become-password",
required=False,
help=(
"Optional become password. If omitted and ansible_become_password is "
"missing, a random one is generated and vaulted. If omitted and "
"ansible_become_password already exists, it is left unchanged."
),
)
parser.add_argument(
"--ip4",
default="127.0.0.1",
@@ -827,6 +907,14 @@ def main(argv: Optional[List[str]] = None) -> None:
ip6=args.ip6,
)
# 4b) Ensure ansible_become_password is vaulted according to CLI options
print(f"[INFO] Ensuring ansible_become_password for host '{args.host}'")
ensure_become_password(
host_vars_file=host_vars_file,
vault_password_file=vault_password_file,
become_password=args.become_password,
)
# 5) Generate credentials for all application_ids (snippets + single merge)
if application_ids:
print(f"[INFO] Generating credentials for {len(application_ids)} applications...")

View File

@@ -13,6 +13,7 @@ sys.path.insert(0, dir_path)
from cli.create.inventory import ( # type: ignore
merge_inventories,
ensure_host_vars_file,
ensure_become_password,
)
from ruamel.yaml import YAML
@@ -242,6 +243,46 @@ existing_key: foo
data2 = yaml_rt.load(f)
self.assertEqual(data2["ansible_connection"], "local")
def test_ensure_become_password_keeps_existing_when_no_cli_password(self):
"""
If no --become-password is provided and ansible_become_password already
exists in host_vars, ensure_become_password must not overwrite it and
must not attempt to generate or vault a new one.
"""
yaml_rt = YAML(typ="rt")
yaml_rt.preserve_quotes = True
with tempfile.TemporaryDirectory() as tmpdir:
host = "localhost"
host_vars_dir = Path(tmpdir)
host_vars_file = host_vars_dir / f"{host}.yml"
vault_pw_file = host_vars_dir / ".password"
# Create a dummy vault password file (not actually used in this test)
vault_pw_file.write_text("dummy\n", encoding="utf-8")
# Seed host_vars with an existing ansible_become_password value
initial = CommentedMap()
initial["ansible_become_password"] = "EXISTING_VALUE"
with host_vars_file.open("w", encoding="utf-8") as f:
yaml_rt.dump(initial, f)
# Call helper WITHOUT an explicit become_password
ensure_become_password(
host_vars_file=host_vars_file,
vault_password_file=vault_pw_file,
become_password=None,
)
# Reload and verify ansible_become_password remains unchanged
with host_vars_file.open("r", encoding="utf-8") as f:
doc = yaml_rt.load(f)
self.assertIsNotNone(doc)
self.assertIn("ansible_become_password", doc)
self.assertEqual(doc["ansible_become_password"], "EXISTING_VALUE")
if __name__ == "__main__":
unittest.main()