diff --git a/cli/create_credentials.py b/cli/create_credentials.py index 38721a81..a68d3e91 100644 --- a/cli/create_credentials.py +++ b/cli/create_credentials.py @@ -1,205 +1,56 @@ -#!/usr/bin/env python3 import argparse -import secrets -import hashlib -import bcrypt import subprocess import sys from pathlib import Path - import yaml -from yaml.loader import SafeLoader +from typing import Dict, Any +from utils.manager.inventory import InventoryManager +from utils.handler.vault import VaultHandler, VaultScalar +from utils.handler.yaml import YamlHandler from yaml.dumper import SafeDumper -# ───────────────────────────────────────────────────────────────────────────── -# On load: treat any !vault tag as plain text -def _vault_constructor(loader, node): - return node.value -SafeLoader.add_constructor('!vault', _vault_constructor) - -# A str subclass so PyYAML emits !vault literal blocks on dump -class VaultScalar(str): - pass - -def _vault_representer(dumper, data): - return dumper.represent_scalar('!vault', data, style='|') - -SafeDumper.add_representer(VaultScalar, _vault_representer) -# ───────────────────────────────────────────────────────────────────────────── - -def generate_value(algorithm: str) -> str: - if algorithm == "random_hex": - return secrets.token_hex(64) - if algorithm == "sha256": - return hashlib.sha256(secrets.token_bytes(32)).hexdigest() - if algorithm == "sha1": - return hashlib.sha1(secrets.token_bytes(20)).hexdigest() - if algorithm == "bcrypt": - pw = secrets.token_urlsafe(16).encode() - return bcrypt.hashpw(pw, bcrypt.gensalt()).decode() - # we should never auto-generate for "plain" - return "undefined" - -def wrap_existing_vaults(node): - """ - Recursively wrap any str that begins with '$ANSIBLE_VAULT' - in a VaultScalar so it dumps as a literal block. - """ - if isinstance(node, dict): - return {k: wrap_existing_vaults(v) for k, v in node.items()} - if isinstance(node, list): - return [wrap_existing_vaults(v) for v in node] - if isinstance(node, str) and node.lstrip().startswith("$ANSIBLE_VAULT"): - return VaultScalar(node) - return node - -def load_yaml_plain(path: Path) -> dict: - """ - Load any YAML (vaulted or not) via SafeLoader + our !vault constructor, - then wrap existing vault‐blocks for correct literal dumping. - """ - text = path.read_text() - data = yaml.load(text, Loader=SafeLoader) or {} - return wrap_existing_vaults(data) - -def encrypt_with_vault(value: str, name: str, vault_password_file: str) -> str: - cmd = [ - "ansible-vault", "encrypt_string", - value, f"--name={name}", - "--vault-password-file", vault_password_file - ] - proc = subprocess.run(cmd, capture_output=True, text=True) - if proc.returncode != 0: - raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}") - return proc.stdout - -def parse_overrides(pairs: list[str]) -> dict: - out = {} - for p in pairs: - if "=" in p: - k, v = p.split("=", 1) - out[k.strip()] = v.strip() - return out - -def load_application_id(role_path: Path) -> str: - vars_file = role_path / "vars" / "main.yml" - data = load_yaml_plain(vars_file) - app_id = data.get("application_id") - if not app_id: - print(f"ERROR: 'application_id' missing in {vars_file}", file=sys.stderr) - sys.exit(1) - return app_id - -def apply_schema(schema: dict, - inventory: dict, - app_id: str, - overrides: dict, - vault_pw: str) -> dict: - apps = inventory.setdefault("applications", {}) - target = apps.setdefault(app_id, {}) - - def recurse(branch: dict, dest: dict, prefix: str = ""): - for key, meta in branch.items(): - full_key = f"{prefix}.{key}" if prefix else key - - # leaf node spec - if isinstance(meta, dict) and all(k in meta for k in ("description","algorithm","validation")): - alg = meta["algorithm"] - if alg == "plain": - # must be supplied via --set - if full_key not in overrides: - print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=", file=sys.stderr) - sys.exit(1) - plain = overrides[full_key] - else: - # generate or override - plain = overrides.get(full_key, generate_value(alg)) - - snippet = encrypt_with_vault(plain, key, vault_pw) - lines = snippet.splitlines() - indent = len(lines[1]) - len(lines[1].lstrip()) - body = "\n".join(line[indent:] for line in lines[1:]) - dest[key] = VaultScalar(body) - - # nested mapping - elif isinstance(meta, dict): - sub = dest.setdefault(key, {}) - recurse(meta, sub, full_key) - - # literal passthrough - else: - dest[key] = meta - - recurse(schema, target) - return inventory - -def encrypt_leaves(branch: dict, vault_pw: str): - for k, v in list(branch.items()): - if isinstance(v, dict): - encrypt_leaves(v, vault_pw) - else: - plain = str(v) - # skip if already vaulted - if plain.lstrip().startswith("$ANSIBLE_VAULT"): - continue - snippet = encrypt_with_vault(plain, k, vault_pw) - lines = snippet.splitlines() - indent = len(lines[1]) - len(lines[1].lstrip()) - body = "\n".join(line[indent:] for line in lines[1:]) - branch[k] = VaultScalar(body) - -def encrypt_credentials_branch(node, vault_pw: str): - if isinstance(node, dict): - for key, val in node.items(): - if key == "credentials" and isinstance(val, dict): - encrypt_leaves(val, vault_pw) - else: - encrypt_credentials_branch(val, vault_pw) - elif isinstance(node, list): - for item in node: - encrypt_credentials_branch(item, vault_pw) - def main(): parser = argparse.ArgumentParser( description="Selectively vault credentials + become-password in your inventory." ) - parser.add_argument("--role-path", required=True, help="Path to your role") - parser.add_argument("--inventory-file", required=True, help="host_vars file to update") - parser.add_argument("--vault-password-file",required=True, help="Vault password file") + parser.add_argument("--role-path", required=True, help="Path to your role") + parser.add_argument("--inventory-file", required=True, help="Host vars file to update") + parser.add_argument("--vault-password-file", required=True, help="Vault password file") parser.add_argument("--set", nargs="*", default=[], help="Override values key.subkey=VALUE") args = parser.parse_args() - role_path = Path(args.role_path) - inv_file = Path(args.inventory_file) - vault_pw = args.vault_password_file - overrides = parse_overrides(args.set) + # Parsing overrides + overrides = {k.strip(): v.strip() for pair in args.set for k, v in [pair.split("=", 1)]} - # 1) Load & wrap any existing vault blocks - inventory = load_yaml_plain(inv_file) + # Initialize the Inventory Manager + manager = InventoryManager( + role_path=Path(args.role_path), + inventory_path=Path(args.inventory_file), + vault_pw=args.vault_password_file, + overrides=overrides + ) - # 2) Merge schema-driven credentials (plain ones must be overridden) - schema = load_yaml_plain(role_path / "meta" / "schema.yml") - app_id = load_application_id(role_path) - inventory = apply_schema(schema, inventory, app_id, overrides, vault_pw) + # 1) Apply schema and update inventory + updated_inventory = manager.apply_schema() - # 3) Vault any leaves under 'credentials:' mappings - encrypt_credentials_branch(inventory, vault_pw) + # 2) Vault any leaves under 'credentials:' mappings + manager.vault_handler.encrypt_leaves(updated_inventory, args.vault_password_file) - # 4) Vault top-level ansible_become_password if present - if "ansible_become_password" in inventory: - val = str(inventory["ansible_become_password"]) + # 3) Vault top-level ansible_become_password if present + if "ansible_become_password" in updated_inventory: + val = str(updated_inventory["ansible_become_password"]) if not val.lstrip().startswith("$ANSIBLE_VAULT"): - snippet = encrypt_with_vault(val, "ansible_become_password", vault_pw) + snippet = manager.vault_handler.encrypt_string(val, "ansible_become_password") lines = snippet.splitlines() indent = len(lines[1]) - len(lines[1].lstrip()) body = "\n".join(line[indent:] for line in lines[1:]) - inventory["ansible_become_password"] = VaultScalar(body) + updated_inventory["ansible_become_password"] = VaultScalar(body) - # 5) Overwrite file with proper !vault literal blocks only where needed - with open(inv_file, "w", encoding="utf-8") as f: - yaml.dump(inventory, f, sort_keys=False, Dumper=SafeDumper) + # 4) Save the updated inventory to file + with open(args.inventory_file, "w", encoding="utf-8") as f: + yaml.dump(updated_inventory, f, sort_keys=False, Dumper=SafeDumper) - print(f"✅ Inventory selectively vaulted → {inv_file}") + print(f"✅ Inventory selectively vaulted → {args.inventory_file}") if __name__ == "__main__": main() diff --git a/cli/utils/__init__.py b/cli/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cli/utils/handler/__init__.py b/cli/utils/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cli/utils/handler/vault.py b/cli/utils/handler/vault.py new file mode 100644 index 00000000..2dbaaf22 --- /dev/null +++ b/cli/utils/handler/vault.py @@ -0,0 +1,50 @@ +import subprocess +from typing import Any, Dict + +from yaml.loader import SafeLoader +from yaml.dumper import SafeDumper + +class VaultScalar(str): + """A subclass of str to represent vault-encrypted strings.""" + pass + +def _vault_constructor(loader, node): + """Custom constructor to handle !vault tag as plain text.""" + return node.value + +def _vault_representer(dumper, data): + """Custom representer to dump VaultScalar as literal blocks.""" + return dumper.represent_scalar('!vault', data, style='|') + +SafeLoader.add_constructor('!vault', _vault_constructor) +SafeDumper.add_representer(VaultScalar, _vault_representer) + +class VaultHandler: + def __init__(self, vault_password_file: str): + self.vault_password_file = vault_password_file + + def encrypt_string(self, value: str, name: str) -> str: + """Encrypt a string using ansible-vault.""" + cmd = [ + "ansible-vault", "encrypt_string", + value, f"--name={name}", + "--vault-password-file", self.vault_password_file + ] + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError(f"ansible-vault encrypt_string failed:\n{proc.stderr}") + return proc.stdout + + def encrypt_leaves(self, branch: Dict[str, Any], vault_pw: str): + """Recursively encrypt all leaves (plain text values) under the credentials section.""" + for key, value in branch.items(): + if isinstance(value, dict): + self.encrypt_leaves(value, vault_pw) # Recurse into nested dictionaries + else: + # Skip if already vaulted (i.e., starts with $ANSIBLE_VAULT) + if isinstance(value, str) and not value.lstrip().startswith("$ANSIBLE_VAULT"): + snippet = self.encrypt_string(value, key) + lines = snippet.splitlines() + indent = len(lines[1]) - len(lines[1].lstrip()) + body = "\n".join(line[indent:] for line in lines[1:]) + branch[key] = VaultScalar(body) # Store encrypted value as VaultScalar diff --git a/cli/utils/handler/yaml.py b/cli/utils/handler/yaml.py new file mode 100644 index 00000000..9edb48d3 --- /dev/null +++ b/cli/utils/handler/yaml.py @@ -0,0 +1,23 @@ +import yaml +from yaml.loader import SafeLoader +from typing import Any, Dict +from utils.handler.vault import VaultScalar + +class YamlHandler: + @staticmethod + def load_yaml(path) -> Dict: + """Load the YAML file and wrap existing !vault entries.""" + text = path.read_text() + data = yaml.load(text, Loader=SafeLoader) or {} + return YamlHandler.wrap_existing_vaults(data) + + @staticmethod + def wrap_existing_vaults(node: Any) -> Any: + """Recursively wrap any str that begins with '$ANSIBLE_VAULT' in a VaultScalar so it dumps as a literal block.""" + if isinstance(node, dict): + return {k: YamlHandler.wrap_existing_vaults(v) for k, v in node.items()} + if isinstance(node, list): + return [YamlHandler.wrap_existing_vaults(v) for v in node] + if isinstance(node, str) and node.lstrip().startswith("$ANSIBLE_VAULT"): + return VaultScalar(node) + return node \ No newline at end of file diff --git a/cli/utils/manager/inventory.py b/cli/utils/manager/inventory.py new file mode 100644 index 00000000..f424f605 --- /dev/null +++ b/cli/utils/manager/inventory.py @@ -0,0 +1,99 @@ +import secrets +import hashlib +import bcrypt +from pathlib import Path +from typing import Dict +from utils.handler.yaml import YamlHandler +from utils.handler.vault import VaultHandler, VaultScalar + +class InventoryManager: + def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]): + """Initialize the Inventory Manager.""" + self.role_path = role_path + self.inventory_path = inventory_path + self.vault_pw = vault_pw + self.overrides = overrides + self.inventory = YamlHandler.load_yaml(inventory_path) + self.schema = YamlHandler.load_yaml(role_path / "meta" / "schema.yml") + self.app_id = self.load_application_id(role_path) + + self.vault_handler = VaultHandler(vault_pw) + + def load_application_id(self, role_path: Path) -> str: + """Load the application ID from the role's vars/main.yml file.""" + vars_file = role_path / "vars" / "main.yml" + data = YamlHandler.load_yaml(vars_file) + app_id = data.get("application_id") + if not app_id: + print(f"ERROR: 'application_id' missing in {vars_file}", file=sys.stderr) + sys.exit(1) + return app_id + + def apply_schema(self) -> Dict: + """Apply the schema and return the updated inventory.""" + apps = self.inventory.setdefault("applications", {}) + target = apps.setdefault(self.app_id, {}) + + # Load the data from vars/main.yml + vars_file = self.role_path / "vars" / "main.yml" + data = YamlHandler.load_yaml(vars_file) + + # Check if 'central-database' is enabled in the features section of data + if "features" in data and \ + "central-database" in data["features"] and \ + data["features"]["central_database"]: + # Add 'database_password' to credentials if 'central-database' is True + target.setdefault("credentials", {})["database_password"] = { + "value": self.generate_value("alphanumeric") # Generate the password value + } + + self.recurse(self.schema, target) + return self.inventory + + def recurse(self, branch: dict, dest: dict, prefix: str = ""): + """Recursively process the schema and generate values.""" + for key, meta in branch.items(): + full_key = f"{prefix}.{key}" if prefix else key + + if isinstance(meta, dict) and all(k in meta for k in ("description", "algorithm", "validation")): + alg = meta["algorithm"] + if alg == "plain": + # Must be supplied via --set + if full_key not in self.overrides: + print(f"ERROR: Plain algorithm for '{full_key}' requires override via --set {full_key}=", file=sys.stderr) + sys.exit(1) + plain = self.overrides[full_key] + else: + plain = self.overrides.get(full_key, self.generate_value(alg)) + + snippet = self.vault_handler.encrypt_string(plain, key) + lines = snippet.splitlines() + indent = len(lines[1]) - len(lines[1].lstrip()) + body = "\n".join(line[indent:] for line in lines[1:]) + dest[key] = VaultScalar(body) + + elif isinstance(meta, dict): + sub = dest.setdefault(key, {}) + self.recurse(meta, sub, full_key) + else: + dest[key] = meta + + def generate_secure_alphanumeric(length: int) -> str: + """Generate a cryptographically secure random alphanumeric string of the given length.""" + characters = string.ascii_letters + string.digits # a-zA-Z0-9 + return ''.join(secrets.choice(characters) for _ in range(length)) + + def generate_value(self, algorithm: str) -> str: + """Generate a value based on the provided algorithm.""" + if algorithm == "random_hex": + return secrets.token_hex(64) + if algorithm == "sha256": + return hashlib.sha256(secrets.token_bytes(32)).hexdigest() + if algorithm == "sha1": + return hashlib.sha1(secrets.token_bytes(20)).hexdigest() + if algorithm == "bcrypt": + pw = secrets.token_urlsafe(16).encode() + return bcrypt.hashpw(pw, bcrypt.gensalt()).decode() + if algorithm == "alphanumeric": + return generate_secure_alphanumeric(64) + return "undefined" \ No newline at end of file diff --git a/filter_plugins/csp_filters.py b/filter_plugins/csp_filters.py index 17cf06ac..9d90a9bd 100644 --- a/filter_plugins/csp_filters.py +++ b/filter_plugins/csp_filters.py @@ -110,7 +110,7 @@ class FilterModule(object): self.is_feature_enabled(applications, matomo_feature_name, application_id) and directive in ['script-src', 'connect-src'] ): - matomo_domain = domains.get('matomo') + matomo_domain = domains.get('matomo')[0] if matomo_domain: tokens.append(f"{web_protocol}://{matomo_domain}") diff --git a/group_vars/all/15_about.yml b/group_vars/all/15_about.yml index ef7ad12f..b571fe16 100644 --- a/group_vars/all/15_about.yml +++ b/group_vars/all/15_about.yml @@ -9,12 +9,12 @@ defaults_service_provider: city: "Cybertown" postal_code: "00001" country: "Nexusland" - logo: "{{ applications['assets-server'].url | safe_var | safe_join('img/logo.png') }}" + logo: "{{ applications['assets-server'].url ~ '/img/logo.png' }}" platform: titel: "CyMaIS Demo" subtitel: "The Future of Self-Hosted Infrastructure. Secure. Automated. Sovereign." - logo: "{{ applications['assets-server'].url | safe_var | safe_join('img/logo.png') }}" - favicon: "{{ applications['assets-server'].url | safe_var | safe_join('img/favicon.ico') }}" + logo: "{{ applications['assets-server'].url ~ '/img/logo.png' }}" + favicon: "{{ applications['assets-server'].url ~ '/img/favicon.ico' }}" contact: bluesky: >- {{ ('@' ~ users.administrator.username ~ '.' ~ domains.bluesky.api) @@ -30,4 +30,4 @@ defaults_service_provider: legal: editorial_responsible: "Johannes Gutenberg" source_code: "https://github.com/kevinveenbirkenbach/cymais" - imprint: "{{web_protocol}}://{{domains['html-server']}}/imprint.html" \ No newline at end of file + imprint: "{{web_protocol}}://{{ domains | get_domain('html-server') }}/imprint.html" \ No newline at end of file diff --git a/roles/docker-akaunting/vars/configuration.yml b/roles/docker-akaunting/vars/configuration.yml index 1252db45..24d95c17 100644 --- a/roles/docker-akaunting/vars/configuration.yml +++ b/roles/docker-akaunting/vars/configuration.yml @@ -8,8 +8,6 @@ features: portfolio_iframe: false central_database: true credentials: -# database_password: Needs to be defined in inventory file -# setup_admin_password: Needs to be defined in inventory file domains: canonical: - "accounting.{{ primary_domain }}" diff --git a/roles/docker-funkwhale/vars/configuration.yml b/roles/docker-funkwhale/vars/configuration.yml index a7e25f9a..2ef05a94 100644 --- a/roles/docker-funkwhale/vars/configuration.yml +++ b/roles/docker-funkwhale/vars/configuration.yml @@ -2,7 +2,7 @@ version: "1.4.0" features: matomo: true css: true - portfolio_iframe: true + portfolio_iframe: true ldap: true central_database: true credentials: diff --git a/roles/docker-nextcloud/vars/plugins/bbb.yml b/roles/docker-nextcloud/vars/plugins/bbb.yml index 9d5a22ee..dbe74cb7 100644 --- a/roles/docker-nextcloud/vars/plugins/bbb.yml +++ b/roles/docker-nextcloud/vars/plugins/bbb.yml @@ -4,4 +4,4 @@ plugin_configuration: configvalue: "{{ applications.bigbluebutton.credentials.shared_secret }}" - appid: "bbb" configkey: "api.url" - configvalue: "{{ web_protocol }}://{{domains | get_domain(''bigbluebutton'')}}{{applications.bigbluebutton.api_suffix}}" \ No newline at end of file + configvalue: "{{ web_protocol }}://{{domains | get_domain('bigbluebutton')}}{{applications.bigbluebutton.api_suffix}}" \ No newline at end of file diff --git a/roles/nginx-serve-assets/vars/configuration.yml b/roles/nginx-serve-assets/vars/configuration.yml index 57142bbf..914b3185 100644 --- a/roles/nginx-serve-assets/vars/configuration.yml +++ b/roles/nginx-serve-assets/vars/configuration.yml @@ -1,3 +1,2 @@ source_directory: "{{ playbook_dir }}/assets" -url: "{{ web_protocol ~ '://' ~ 'file-server' - | load_configuration('domains.canonical[0]') ~ '/assets' }}" \ No newline at end of file +url: "{{ web_protocol ~ '://' ~ 'files.' ~ primary_domain ~ '/assets' }}" \ No newline at end of file diff --git a/tests/unit/test_csp_filters.py b/tests/unit/test_csp_filters.py index 3dc9dc44..d055125d 100644 --- a/tests/unit/test_csp_filters.py +++ b/tests/unit/test_csp_filters.py @@ -49,7 +49,7 @@ class TestCspFilters(unittest.TestCase): 'app2': {} } self.domains = { - 'matomo': 'matomo.example.org' + 'matomo': ['matomo.example.org'] } def test_get_csp_whitelist_list(self):