fix(credentials, akaunting):

- update cli/create/credentials.py to handle vault literals correctly:
  * strip 'vault |' headers and keep only ANSIBLE_VAULT body
  * skip reprocessing keys added in same run (no duplicate confirmation prompts)
  * detect both 'vault' and 'ANSIBLE_VAULT' as already encrypted

Refs: https://chatgpt.com/share/68aed780-ad4c-800f-877d-aa4c40a47755
This commit is contained in:
2025-08-27 12:02:36 +02:00
parent d9980c0d8f
commit 19889a8cfc

View File

@@ -1,14 +1,29 @@
#!/usr/bin/env python3
"""
Selectively add & vault NEW credentials in your inventory, preserving comments
and formatting. Existing values are left untouched unless --force is used.
Usage example:
infinito create credentials \
--role-path roles/web-app-akaunting \
--inventory-file host_vars/echoserver.yml \
--vault-password-file .pass/echoserver.txt \
--set credentials.database_password=mysecret
"""
import argparse import argparse
import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
import yaml from typing import Dict, Any, Union
from typing import Dict, Any
from module_utils.manager.inventory import InventoryManager
from module_utils.handler.vault import VaultHandler, VaultScalar
from module_utils.handler.yaml import YamlHandler
from yaml.dumper import SafeDumper
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap
from module_utils.manager.inventory import InventoryManager
from module_utils.handler.vault import VaultHandler # uses your existing handler
# ---------- helpers ----------
def ask_for_confirmation(key: str) -> bool: def ask_for_confirmation(key: str) -> bool:
"""Prompt the user for confirmation to overwrite an existing value.""" """Prompt the user for confirmation to overwrite an existing value."""
@@ -18,35 +33,117 @@ def ask_for_confirmation(key: str) -> bool:
return confirmation == 'y' return confirmation == 'y'
def main(): def ensure_map(node: CommentedMap, key: str) -> CommentedMap:
"""
Ensure node[key] exists and is a mapping (CommentedMap) for round-trip safety.
"""
if key not in node or not isinstance(node.get(key), CommentedMap):
node[key] = CommentedMap()
return node[key]
def _is_ruamel_vault(val: Any) -> bool:
"""Detect if a ruamel scalar already carries the !vault tag."""
try:
return getattr(val, 'tag', None) == '!vault'
except Exception:
return False
def _is_vault_encrypted(val: Any) -> bool:
"""
Detect if value is already a vault string or a ruamel !vault scalar.
Accept both '$ANSIBLE_VAULT' and '!vault' markers.
"""
if _is_ruamel_vault(val):
return True
if isinstance(val, str) and ("$ANSIBLE_VAULT" in val or "!vault" in val):
return True
return False
def _vault_body(text: str) -> str:
"""
Return only the vault body starting from the first line that contains
'$ANSIBLE_VAULT'. If not found, return the original text.
Also strips any leading '!vault |' header if present.
"""
lines = text.splitlines()
for i, ln in enumerate(lines):
if "$ANSIBLE_VAULT" in ln:
return "\n".join(lines[i:])
return text
def _make_vault_scalar_from_text(text: str) -> Any:
"""
Build a ruamel object representing a literal block scalar tagged with !vault
by parsing a tiny YAML snippet. This avoids depending on yaml_set_tag().
"""
body = _vault_body(text)
indented = " " + body.replace("\n", "\n ") # proper block scalar indentation
snippet = f"v: !vault |\n{indented}\n"
y = YAML(typ="rt")
return y.load(snippet)["v"]
def to_vault_block(vault_handler: VaultHandler, value: Union[str, Any], label: str) -> Any:
"""
Return a ruamel scalar tagged as !vault. If the input value is already
vault-encrypted (string contains $ANSIBLE_VAULT or is a !vault scalar), reuse/wrap.
Otherwise, encrypt plaintext via ansible-vault.
"""
# Already a ruamel !vault scalar → reuse
if _is_ruamel_vault(value):
return value
# Already an encrypted string (may include '!vault |' or just the header)
if isinstance(value, str) and ("$ANSIBLE_VAULT" in value or "!vault" in value):
return _make_vault_scalar_from_text(value)
# Plaintext → encrypt now
snippet = vault_handler.encrypt_string(str(value), label)
return _make_vault_scalar_from_text(snippet)
def parse_overrides(pairs: list[str]) -> Dict[str, str]:
"""
Parse --set key=value pairs into a dict.
Supports both 'credentials.key=val' and 'key=val' (short) forms.
"""
out: Dict[str, str] = {}
for pair in pairs:
k, v = pair.split("=", 1)
out[k.strip()] = v.strip()
return out
# ---------- main ----------
def main() -> int:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Selectively vault credentials + become-password in your inventory." description="Selectively add & vault NEW credentials in your inventory, preserving comments/formatting."
) )
parser.add_argument("--role-path", required=True, help="Path to your role")
parser.add_argument("--inventory-file", required=True, help="Host vars file to update")
parser.add_argument("--vault-password-file", required=True, help="Vault password file")
parser.add_argument( parser.add_argument(
"--role-path", required=True, help="Path to your role" "--set", nargs="*", default=[],
) help="Override values key[.subkey]=VALUE (applied to NEW keys; with --force also to existing)"
parser.add_argument(
"--inventory-file", required=True, help="Host vars file to update"
)
parser.add_argument(
"--vault-password-file", required=True, help="Vault password file"
)
parser.add_argument(
"--set", nargs="*", default=[], help="Override values key.subkey=VALUE"
) )
parser.add_argument( parser.add_argument(
"-f", "--force", action="store_true", "-f", "--force", action="store_true",
help="Force overwrite without confirmation" help="Allow overrides to replace existing values (will ask per key unless combined with --yes)"
)
parser.add_argument(
"-y", "--yes", action="store_true",
help="Non-interactive: assume 'yes' for all overwrite confirmations when --force is used"
) )
args = parser.parse_args() args = parser.parse_args()
# Parse overrides overrides = parse_overrides(args.set)
overrides = {
k.strip(): v.strip()
for pair in args.set for k, v in [pair.split("=", 1)]
}
# Initialize inventory manager # Initialize inventory manager (provides schema + app_id + vault)
manager = InventoryManager( manager = InventoryManager(
role_path=Path(args.role_path), role_path=Path(args.role_path),
inventory_path=Path(args.inventory_file), inventory_path=Path(args.inventory_file),
@@ -54,62 +151,90 @@ def main():
overrides=overrides overrides=overrides
) )
# Load existing credentials to preserve # 1) Load existing inventory with ruamel (round-trip)
existing_apps = manager.inventory.get("applications", {}) yaml_rt = YAML(typ="rt")
existing_creds = {} yaml_rt.preserve_quotes = True
if manager.app_id in existing_apps:
existing_creds = existing_apps[manager.app_id].get("credentials", {}).copy()
# Apply schema (may generate defaults) with open(args.inventory_file, "r", encoding="utf-8") as f:
updated_inventory = manager.apply_schema() data = yaml_rt.load(f) # CommentedMap or None
if data is None:
data = CommentedMap()
# Restore existing database_password if present # 2) Get schema-applied structure (defaults etc.) for *non-destructive* merge
apps = updated_inventory.setdefault("applications", {}) schema_inventory: Dict[str, Any] = manager.apply_schema()
app_block = apps.setdefault(manager.app_id, {})
creds = app_block.setdefault("credentials", {})
if "database_password" in existing_creds:
creds["database_password"] = existing_creds["database_password"]
# Store original plaintext values # 3) Ensure structural path exists
original_plain = {key: str(val) for key, val in creds.items()} apps = ensure_map(data, "applications")
app_block = ensure_map(apps, manager.app_id)
creds = ensure_map(app_block, "credentials")
for key, raw_val in list(creds.items()): # 4) Determine defaults we could add
# Skip if already vaulted schema_apps = schema_inventory.get("applications", {})
if isinstance(raw_val, VaultScalar) or str(raw_val).lstrip().startswith("$ANSIBLE_VAULT"): schema_app_block = schema_apps.get(manager.app_id, {})
schema_creds = schema_app_block.get("credentials", {}) if isinstance(schema_app_block, dict) else {}
# 5) Add ONLY missing credential keys
newly_added_keys = set()
for key, default_val in schema_creds.items():
if key in creds:
# existing → do not touch (preserve plaintext/vault/formatting/comments)
continue continue
# Determine plaintext # Value to use for the new key
plain = original_plain.get(key, "") # Priority: --set exact key → default from schema → empty string
if key in overrides and (args.force or ask_for_confirmation(key)): ov = overrides.get(f"credentials.{key}", None)
plain = overrides[key] if ov is None:
ov = overrides.get(key, None)
# Encrypt the plaintext if ov is not None:
encrypted = manager.vault_handler.encrypt_string(plain, key) value_for_new_key: Union[str, Any] = ov
lines = encrypted.splitlines()
indent = len(lines[1]) - len(lines[1].lstrip())
body = "\n".join(line[indent:] for line in lines[1:])
creds[key] = VaultScalar(body)
# Vault top-level become password if present
if "ansible_become_password" in updated_inventory:
val = str(updated_inventory["ansible_become_password"])
if val.lstrip().startswith("$ANSIBLE_VAULT"):
updated_inventory["ansible_become_password"] = VaultScalar(val)
else: else:
snippet = manager.vault_handler.encrypt_string( if _is_vault_encrypted(default_val):
val, "ansible_become_password" # Schema already provides a vault value → take it as-is
creds[key] = to_vault_block(manager.vault_handler, default_val, key)
newly_added_keys.add(key)
continue
value_for_new_key = "" if default_val is None else str(default_val)
# Insert as !vault literal (encrypt if needed)
creds[key] = to_vault_block(manager.vault_handler, value_for_new_key, key)
newly_added_keys.add(key)
# 6) ansible_become_password: only add if missing;
# never rewrite an existing one unless --force (+ confirm/--yes) and override provided.
if "ansible_become_password" not in data:
val = overrides.get("ansible_become_password", None)
if val is not None:
data["ansible_become_password"] = to_vault_block(
manager.vault_handler, val, "ansible_become_password"
) )
lines = snippet.splitlines() else:
indent = len(lines[1]) - len(lines[1].lstrip()) if args.force and "ansible_become_password" in overrides:
body = "\n".join(line[indent:] for line in lines[1:]) do_overwrite = args.yes or ask_for_confirmation("ansible_become_password")
updated_inventory["ansible_become_password"] = VaultScalar(body) if do_overwrite:
data["ansible_become_password"] = to_vault_block(
manager.vault_handler, overrides["ansible_become_password"], "ansible_become_password"
)
# Write back to file # 7) Overrides for existing credential keys (only with --force)
if args.force:
for ov_key, ov_val in overrides.items():
# Accept both 'credentials.key' and bare 'key'
key = ov_key.split(".", 1)[1] if ov_key.startswith("credentials.") else ov_key
if key in creds:
# If we just added it in this run, don't ask again or rewrap
if key in newly_added_keys:
continue
if args.yes or ask_for_confirmation(key):
creds[key] = to_vault_block(manager.vault_handler, ov_val, key)
# 8) Write back with ruamel (preserve formatting & comments)
with open(args.inventory_file, "w", encoding="utf-8") as f: with open(args.inventory_file, "w", encoding="utf-8") as f:
yaml.dump(updated_inventory, f, sort_keys=False, Dumper=SafeDumper) yaml_rt.dump(data, f)
print(f"Inventory selectively vaulted{args.inventory_file}") print(f"Added new credentials without touching existing formatting/comments{args.inventory_file}")
return 0
if __name__ == "__main__": if __name__ == "__main__":
main() sys.exit(main())