Extend inventory creation with --authorized-keys support, unify PATH_ADMINISTRATOR_HOME resolution via group_vars, update CI workflow to pass dummy SSH key, and add full unit test coverage for new features.

See details: https://chatgpt.com/share/6931f775-12d4-800f-9ccb-e6ce52097f9c
This commit is contained in:
2025-12-04 22:06:10 +01:00
parent 2ed58ceffc
commit bf0134b9c5
3 changed files with 330 additions and 10 deletions

View File

@@ -429,6 +429,138 @@ def ensure_ruamel_map(node: CommentedMap, key: str) -> CommentedMap:
return node[key]
def get_path_administrator_home_from_group_vars(project_root: Path) -> str:
"""
Read PATH_ADMINISTRATOR_HOME from group_vars/all/06_paths.yml.
Expected layout (relative to project_root):
group_vars/
all/
06_paths.yml
If the file or variable is missing, fall back to '/home/administrator/'
and emit a warning to stderr.
"""
paths_file = project_root / "group_vars" / "all" / "06_paths.yml"
default_path = "/home/administrator/"
if not paths_file.exists():
print(
f"[WARN] group_vars paths file not found: {paths_file}. "
f"Falling back to PATH_ADMINISTRATOR_HOME={default_path}",
file=sys.stderr,
)
return default_path
try:
with paths_file.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
except Exception as exc: # pragma: no cover
print(
f"[WARN] Failed to load {paths_file}: {exc}. "
f"Falling back to PATH_ADMINISTRATOR_HOME={default_path}",
file=sys.stderr,
)
return default_path
value = data.get("PATH_ADMINISTRATOR_HOME", default_path)
if not isinstance(value, str) or not value:
print(
f"[WARN] PATH_ADMINISTRATOR_HOME missing or invalid in {paths_file}. "
f"Falling back to {default_path}",
file=sys.stderr,
)
return default_path
# Normalize: ensure it ends with exactly one trailing slash.
value = value.rstrip("/") + "/"
return value
def ensure_administrator_authorized_keys(
inventory_dir: Path,
host: str,
authorized_keys_spec: Optional[str],
project_root: Path,
) -> None:
"""
Ensure that the administrator's authorized_keys file exists and contains
all keys provided via --authorized-keys.
Behavior:
- If authorized_keys_spec is None → do nothing.
- If authorized_keys_spec is a path to an existing file:
read all non-empty, non-comment lines in that file as keys.
- Else:
treat authorized_keys_spec as literal key text, which may contain
one or more keys separated by newlines.
The target file path mirrors the Ansible task in roles/user-administrator:
src: "{{ inventory_dir }}/files/{{ inventory_hostname }}{{ PATH_ADMINISTRATOR_HOME }}.ssh/authorized_keys"
We implement the same pattern here:
<inventory_dir>/files/<host><PATH_ADMINISTRATOR_HOME>.ssh/authorized_keys
PATH_ADMINISTRATOR_HOME is read from group_vars/all/06_paths.yml so that
Python and Ansible share a single source of truth.
"""
if not authorized_keys_spec:
return
# Read PATH_ADMINISTRATOR_HOME from group_vars/all/06_paths.yml
PATH_ADMINISTRATOR_HOME = get_path_administrator_home_from_group_vars(project_root)
# Build relative path identical to the Ansible src:
# files/{{ inventory_hostname }}{{ PATH_ADMINISTRATOR_HOME }}.ssh/authorized_keys
rel_fragment = f"{host}{PATH_ADMINISTRATOR_HOME}.ssh/authorized_keys"
# remove leading slash so it becomes relative under files/
rel_path = rel_fragment.lstrip("/")
target_path = inventory_dir / "files" / rel_path
target_path.parent.mkdir(parents=True, exist_ok=True)
spec_path = Path(authorized_keys_spec)
if spec_path.exists() and spec_path.is_file():
# Use keys from the referenced file.
source_text = spec_path.read_text(encoding="utf-8")
else:
# Treat the argument as literal key text.
source_text = authorized_keys_spec
# Normalize incoming keys: one key per non-empty, non-comment line.
new_keys: List[str] = []
for line in (source_text or "").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
new_keys.append(stripped)
if not new_keys:
# Nothing to add.
return
existing_lines: List[str] = []
existing_keys: Set[str] = set()
if target_path.exists():
for line in target_path.read_text(encoding="utf-8").splitlines():
existing_lines.append(line)
stripped = line.strip()
if stripped and not stripped.startswith("#"):
existing_keys.add(stripped)
# Append only keys that are not yet present (by stripped line match).
for key in new_keys:
if key not in existing_keys:
existing_lines.append(key)
existing_keys.add(key)
# Write back, ensuring a trailing newline.
final_text = "\n".join(existing_lines).rstrip() + "\n"
target_path.write_text(final_text, encoding="utf-8")
# ---------------------------------------------------------------------------
# Role resolution (meta/applications/role_name.py)
# ---------------------------------------------------------------------------
@@ -748,7 +880,18 @@ def main(argv: Optional[List[str]] = None) -> None:
"ansible_become_password already exists, it is left unchanged."
),
)
parser.add_argument(
"--authorized-keys",
required=False,
help=(
"Optional SSH public keys for the 'administrator' account. "
"May be a literal key string (possibly with newlines) or a path "
"to a file containing one or more public keys. "
"All keys are ensured to exist in "
"files/<host><PATH_ADMINISTRATOR_HOME>.ssh/authorized_keys "
"under the inventory directory; missing keys are appended."
),
)
parser.add_argument(
"--ip4",
default="127.0.0.1",
@@ -849,7 +992,7 @@ def main(argv: Optional[List[str]] = None) -> None:
try:
vault_password_file.chmod(0o600)
except PermissionError:
# Best-effort; exclude if chmod is not allowed
# Best-effort; ignore if chmod is not allowed
pass
else:
print(f"[INFO] Using existing vault password file: {vault_password_file}")
@@ -915,6 +1058,19 @@ def main(argv: Optional[List[str]] = None) -> None:
become_password=args.become_password,
)
# 4c) Ensure administrator authorized_keys file contains keys from --authorized-keys
if args.authorized_keys:
print(
f"[INFO] Ensuring administrator authorized_keys for host '{args.host}' "
f"from spec: {args.authorized_keys}"
)
ensure_administrator_authorized_keys(
inventory_dir=inventory_dir,
host=args.host,
authorized_keys_spec=args.authorized_keys,
project_root=project_root,
)
# 5) Generate credentials for all application_ids (snippets + single merge)
if application_ids:
print(f"[INFO] Generating credentials for {len(application_ids)} applications...")