Compare commits

...

8 Commits

41 changed files with 1106 additions and 191 deletions

22
.github/workflows/test-on-arch.yml vendored Normal file
View File

@@ -0,0 +1,22 @@
name: Build & Test on Arch Linux
on:
push:
branches: [ master ]
pull_request:
jobs:
build-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build & Test in Arch Linux Container
uses: addnab/docker-run-action@v3
with:
image: archlinux:latest
options: -v ${{ github.workspace }}:/workspace -w /workspace
run: |
pacman -Sy --noconfirm base-devel git python python-pip docker make
make build
make test

View File

@@ -1,106 +1,109 @@
#!/usr/bin/env python3
import argparse
import os
import yaml
import sys
import time
from pathlib import Path
plugin_path = Path(__file__).resolve().parent / ".." / ".." / ".." /"lookup_plugins"
# Ensure project root on PYTHONPATH so utils is importable
repo_root = Path(__file__).resolve().parent.parent.parent.parent
sys.path.insert(0, str(repo_root))
# Add lookup_plugins for application_gid
plugin_path = repo_root / "lookup_plugins"
sys.path.insert(0, str(plugin_path))
from utils.dict_renderer import DictRenderer
from application_gid import LookupModule
def load_yaml_file(path):
"""Load a YAML file if it exists, otherwise return an empty dict."""
def load_yaml_file(path: Path) -> dict:
if not path.exists():
return {}
with path.open("r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
class DefaultsGenerator:
def __init__(self, roles_dir: Path, output_file: Path, verbose: bool, timeout: float):
self.roles_dir = roles_dir
self.output_file = output_file
self.verbose = verbose
self.renderer = DictRenderer(verbose=verbose, timeout=timeout)
self.gid_lookup = LookupModule()
def main():
parser = argparse.ArgumentParser(
description="Generate defaults_applications YAML from docker roles and include users meta data for each role."
)
parser.add_argument(
"--roles-dir",
help="Path to the roles directory (default: roles)"
)
parser.add_argument(
"--output-file",
help="Path to output YAML file"
)
def log(self, message: str):
if self.verbose:
print(f"[DefaultsGenerator] {message}")
args = parser.parse_args()
cwd = Path.cwd()
roles_dir = (cwd / args.roles_dir).resolve()
output_file = (cwd / args.output_file).resolve()
# Ensure output directory exists
output_file.parent.mkdir(parents=True, exist_ok=True)
# Initialize result structure
def run(self):
result = {"defaults_applications": {}}
gid_lookup = LookupModule()
# Process each role for application configs
for role_dir in sorted(roles_dir.iterdir()):
for role_dir in sorted(self.roles_dir.iterdir()):
role_name = role_dir.name
vars_main = role_dir / "vars" / "main.yml"
config_file = role_dir / "config" / "main.yml"
if not vars_main.exists():
print(f"[!] Skipping {role_name}: vars/main.yml missing")
self.log(f"Skipping {role_name}: vars/main.yml missing")
continue
vars_data = load_yaml_file(vars_main)
try:
application_id = vars_data.get("application_id")
except Exception as e:
print(
f"Warning: failed to read application_id from {vars_main}\nException: {e}",
file=sys.stderr
)
sys.exit(1)
if not application_id:
print(f"[!] Skipping {role_name}: application_id not defined in vars/main.yml")
self.log(f"Skipping {role_name}: application_id not defined")
continue
if not config_file.exists():
print(f"[!] Skipping {role_name}: config/main.yml missing")
self.log(f"Skipping {role_name}: config/main.yml missing")
continue
config_data = load_yaml_file(config_file)
if config_data:
try:
gid_number = gid_lookup.run([application_id], roles_dir=str(roles_dir))[0]
gid_number = self.gid_lookup.run([application_id], roles_dir=str(self.roles_dir))[0]
except Exception as e:
print(f"Warning: failed to determine gid for '{application_id}': {e}", file=sys.stderr)
sys.exit(1)
config_data["group_id"] = gid_number
result["defaults_applications"][application_id] = config_data
users_meta_file = role_dir / "users" / "main.yml"
transformed_users = {}
if users_meta_file.exists():
users_meta = load_yaml_file(users_meta_file)
# Inject users mapping as Jinja2 references
users_meta = load_yaml_file(role_dir / "users" / "main.yml")
users_data = users_meta.get("users", {})
for user, role_user_attrs in users_data.items():
transformed_users[user] = f"{{{{ users[\"{user}\"] }}}}"
transformed = {user: f"{{{{ users[\"{user}\"] }}}}" for user in users_data}
if transformed:
result["defaults_applications"][application_id]["users"] = transformed
# Attach transformed users under each application
if transformed_users:
result["defaults_applications"][application_id]["users"] = transformed_users
# Render placeholders in entire result context
self.log("Starting placeholder rendering...")
try:
result = self.renderer.render(result)
except Exception as e:
print(f"Error during rendering: {e}", file=sys.stderr)
sys.exit(1)
# Write out result YAML
with output_file.open("w", encoding="utf-8") as f:
# Write output
self.output_file.parent.mkdir(parents=True, exist_ok=True)
with self.output_file.open("w", encoding="utf-8") as f:
yaml.dump(result, f, sort_keys=False)
# Print location of generated file (absolute if not under cwd)
try:
print(f"✅ Generated: {output_file.relative_to(cwd)}")
rel = self.output_file.relative_to(Path.cwd())
except ValueError:
print(f"✅ Generated: {output_file}")
rel = self.output_file
print(f"✅ Generated: {rel}")
if __name__ == "__main__":
main()
parser = argparse.ArgumentParser(description="Generate defaults_applications YAML...")
parser.add_argument("--roles-dir", default="roles", help="Path to the roles directory")
parser.add_argument("--output-file", required=True, help="Path to output YAML file")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--timeout", type=float, default=10.0, help="Timeout for rendering")
args = parser.parse_args()
cwd = Path.cwd()
roles_dir = (cwd / args.roles_dir).resolve()
output_file = (cwd / args.output_file).resolve()
DefaultsGenerator(roles_dir, output_file, args.verbose, args.timeout).run()

View File

@@ -1,46 +1,37 @@
#!/usr/bin/env python3
# cli/meta/applications/all.py
import argparse
import glob
import os
import sys
# Import the Ansible filter implementation
try:
import yaml
from filter_plugins.get_all_application_ids import get_all_application_ids
except ImportError:
sys.stderr.write("PyYAML is required. Install with `pip install pyyaml`.\n")
sys.stderr.write("Filter plugin `get_all_application_ids` not found. Ensure `filter_plugins/get_all_application_ids.py` is in your PYTHONPATH.\n")
sys.exit(1)
def find_application_ids():
"""
Searches all files matching roles/*/vars/main.yml for the key 'application_id'
and returns a list of all found IDs.
Legacy function retained for reference.
Delegates to the `get_all_application_ids` filter plugin.
"""
pattern = os.path.join('roles', '*', 'vars', 'main.yml')
app_ids = []
for filepath in glob.glob(pattern):
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
sys.stderr.write(f"Error reading {filepath}: {e}\n")
continue
if isinstance(data, dict) and 'application_id' in data:
app_ids.append(data['application_id'])
return sorted(set(app_ids))
return get_all_application_ids()
def main():
parser = argparse.ArgumentParser(
description='Output a list of all application_id values defined in roles/*/vars/main.yml'
)
# No arguments other than --help
parser.parse_args()
try:
ids = find_application_ids()
except Exception as e:
sys.stderr.write(f"Error retrieving application IDs: {e}\n")
sys.exit(1)
for app_id in ids:
print(app_id)

View File

@@ -13,7 +13,7 @@ import argparse
import yaml
def get_role_folder(application_id, roles_path):
def get_role(application_id, roles_path):
"""
Find the role directory under `roles_path` whose vars/main.yml contains the specified application_id.
@@ -62,7 +62,7 @@ def main():
args = parser.parse_args()
try:
folder = get_role_folder(args.application_id, args.roles_path)
folder = get_role(args.application_id, args.roles_path)
print(folder)
sys.exit(0)
except RuntimeError as err:

0
cli/meta/j2/__init__.py Normal file
View File

76
cli/meta/j2/compiler.py Executable file
View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
import argparse
import os
import re
import sys
# Projekt-Root: vier Ebenen über diesem File
PROJECT_ROOT = os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.dirname(__file__)
)
)
)
INCLUDE_RE = re.compile(r"^(\s*)\{%\s*include\s*['\"]([^'\"]+)['\"]\s*%\}")
def expand_includes(rel_path, seen=None):
"""
Liest die Datei rel_path (relative zum PROJECT_ROOT),
ersetzt rekursiv alle "{% include 'path' %}"-Zeilen durch den
Inhalt der jeweiligen Datei (mit gleicher Einrückung).
"""
if seen is None:
seen = set()
rp = rel_path.replace("\\", "/")
if rp in seen:
raise RuntimeError(f"Circular include detected: {rp}")
seen.add(rp)
abs_path = os.path.join(PROJECT_ROOT, rp)
if not os.path.isfile(abs_path):
raise FileNotFoundError(f"Template not found: {rp}")
output_lines = []
for line in open(abs_path, encoding="utf-8"):
m = INCLUDE_RE.match(line)
if not m:
output_lines.append(line.rstrip("\n"))
else:
indent, inc_rel = m.group(1), m.group(2)
# rekursiver Aufruf
for inc_line in expand_includes(inc_rel, seen):
output_lines.append(indent + inc_line)
seen.remove(rp)
return output_lines
def parse_args():
p = argparse.ArgumentParser(
description="Expand all {% include '...' %} directives in a Jinja2 template (no variable rendering)."
)
p.add_argument("template", help="Template path relative to project root")
p.add_argument(
"--out",
help="If given, write output to this file instead of stdout",
default=None
)
return p.parse_args()
def main():
args = parse_args()
try:
lines = expand_includes(args.template)
text = "\n".join(lines)
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(text + "\n")
else:
print(text)
except Exception as e:
sys.stderr.write(f"Error: {e}\n")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -127,7 +127,7 @@ class FilterModule(object):
self.is_feature_enabled(applications, 'portfolio_iframe', application_id)
and directive == 'frame-ancestors'
):
domain = domains.get('portfolio')[0]
domain = domains.get('web-app-port-ui')[0]
sld_tld = ".".join(domain.split(".")[-2:]) # yields "example.com"
tokens.append(f"{sld_tld}") # yields "*.example.com"

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# filter_plugins/get_all_application_ids.py
import glob
import os
import yaml
def get_all_application_ids(roles_dir='roles'):
"""
Ansible filter to retrieve all unique application_id values
defined in roles/*/vars/main.yml files.
:param roles_dir: Base directory for Ansible roles (default: 'roles')
:return: Sorted list of unique application_id strings
"""
pattern = os.path.join(roles_dir, '*', 'vars', 'main.yml')
app_ids = []
for filepath in glob.glob(pattern):
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception:
continue
if isinstance(data, dict) and 'application_id' in data:
app_ids.append(data['application_id'])
return sorted(set(app_ids))
class FilterModule(object):
"""
Ansible filter plugin for retrieving application IDs.
"""
def filters(self):
return {
'get_all_application_ids': get_all_application_ids
}

View File

@@ -0,0 +1,51 @@
# filter_plugins/get_application_id.py
import os
import re
import yaml
from ansible.errors import AnsibleFilterError
def get_application_id(role_name):
"""
Jinja2/Ansible filter: given a role name, load its vars/main.yml and return the application_id value.
"""
# Construct path: assumes current working directory is project root
vars_file = os.path.join(os.getcwd(), 'roles', role_name, 'vars', 'main.yml')
if not os.path.isfile(vars_file):
raise AnsibleFilterError(f"Vars file not found for role '{role_name}': {vars_file}")
try:
# Read entire file content to avoid lazy stream issues
with open(vars_file, 'r', encoding='utf-8') as f:
content = f.read()
data = yaml.safe_load(content)
except Exception as e:
raise AnsibleFilterError(f"Error reading YAML from {vars_file}: {e}")
# Ensure parsed data is a mapping
if not isinstance(data, dict):
raise AnsibleFilterError(
f"Error reading YAML from {vars_file}: expected mapping, got {type(data).__name__}"
)
# Detect malformed YAML: no valid identifier-like keys
valid_key_pattern = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')
if data and not any(valid_key_pattern.match(k) for k in data.keys()):
raise AnsibleFilterError(f"Error reading YAML from {vars_file}: invalid top-level keys")
if 'application_id' not in data:
raise AnsibleFilterError(f"Key 'application_id' not found in {vars_file}")
return data['application_id']
class FilterModule(object):
"""
Ansible filter plugin entry point.
"""
def filters(self):
return {
'get_application_id': get_application_id,
}

View File

@@ -1,5 +1,5 @@
'''
Ansible filter plugin: get_role_folder
Ansible filter plugin: get_role
This filter inspects each role under the given roles directory, loads its vars/main.yml,
and returns the role folder name whose application_id matches the provided value.
@@ -10,7 +10,7 @@ import os
import yaml
def get_role_folder(application_id, roles_path='roles'):
def get_role(application_id, roles_path='roles'):
"""
Find the role directory under `roles_path` whose vars/main.yml contains the given application_id.
@@ -40,9 +40,9 @@ def get_role_folder(application_id, roles_path='roles'):
class FilterModule(object):
"""
Register the get_role_folder filter
Register the get_role filter
"""
def filters(self):
return {
'get_role_folder': get_role_folder,
'get_role': get_role,
}

View File

@@ -38,15 +38,15 @@ ports:
matomo: 8018
listmonk: 8019
discourse: 8020
synapse: 8021
element: 8022
matrix_synapse: 8021
matrix_element: 8022
openproject: 8023
gitlab: 8024
akaunting: 8025
moodle: 8026
taiga: 8027
friendica: 8028
portfolio: 8029
web-app-port-ui: 8029
bluesky_api: 8030
bluesky_web: 8031
keycloak: 8032

View File

@@ -1,6 +1,6 @@
(function() {
var primary = "{{ primary_domain }}";
var allowedOrigin = "https://{{ domains | get_domain('portfolio') }}";
var allowedOrigin = "https://{{ domains | get_domain('web-app-port-ui') }}";
function notifyParent() {
try {

View File

@@ -17,7 +17,7 @@
name: srv-web-7-6-composer
vars:
domain: "{{domains.matrix.synapse}}"
http_port: "{{ports.localhost.http.synapse}}"
http_port: "{{ports.localhost.http.matrix_synapse}}"
- name: create {{well_known_directory}}
file:
@@ -36,7 +36,7 @@
dest: "{{nginx.directories.http.servers}}{{domains.matrix.synapse}}.conf"
vars:
domain: "{{domains.matrix.synapse}}" # Didn't work in the past. May it works now. This does not seem to work @todo Check how to solve without declaring set_fact, seems a bug at templates
http_port: "{{ports.localhost.http.synapse}}"
http_port: "{{ports.localhost.http.matrix_synapse}}"
notify: restart nginx
- name: "include role srv-proxy-6-6-domain for {{application_id}}"
@@ -44,7 +44,7 @@
name: srv-proxy-6-6-domain
vars:
domain: "{{domains.matrix.element}}"
http_port: "{{ports.localhost.http.element}}"
http_port: "{{ports.localhost.http.matrix_element}}"
- name: include create-and-seed-database.yml for multiple bridges
include_tasks: create-and-seed-database.yml

View File

@@ -17,7 +17,7 @@
- SYNAPSE_SERVER_NAME={{domains.matrix.synapse}}
- SYNAPSE_REPORT_STATS=no
ports:
- "127.0.0.1:{{ports.localhost.http.synapse}}:{{ container_port }}"
- "127.0.0.1:{{ports.localhost.http.matrix_synapse}}:{{ container_port }}"
{% include 'roles/docker-container/templates/healthcheck/curl.yml.j2' %}
{% if bridges | length > 0 %}
{% for item in bridges %}
@@ -36,7 +36,7 @@
volumes:
- ./element-config.json:/app/config.json
ports:
- "127.0.0.1:{{ports.localhost.http.element}}:{{ container_port }}"
- "127.0.0.1:{{ports.localhost.http.matrix_element}}:{{ container_port }}"
{% include 'roles/docker-container/templates/healthcheck/wget.yml.j2' %}
{% include 'roles/docker-container/templates/networks.yml.j2' %}

View File

@@ -2,7 +2,7 @@ server {
{# Somehow .j2 doesn't interpretate the passed variable right. For this reasons this redeclaration is necessary #}
{# Could be that this is related to the set_fact use #}
{% set domain = domains.matrix.synapse %}
{% set http_port = ports.localhost.http.synapse %}
{% set http_port = ports.localhost.http.matrix_synapse %}
server_name {{domains.matrix.synapse}};
{% include 'roles/srv-web-7-7-letsencrypt/templates/ssl_header.j2' %}

View File

@@ -1,4 +1,4 @@
# PortWebUI
# PortUI
## Description

View File

@@ -16,13 +16,13 @@ class LookupModule(LookupBase):
This lookup iterates over all roles whose folder name starts with 'web-app-'
and generates a list of dictionaries (cards). For each role, it:
- Extracts the application_id (everything after "web-app-")
- Reads application_id from the role's vars/main.yml
- Reads the title from the role's README.md (the first H1 line)
- Retrieves the description from galaxy_info.description in meta/main.yml
- Retrieves the icon class from galaxy_info.logo.class
- Retrieves the tags from galaxy_info.galaxy_tags
- Builds the URL using the 'domains' variable (e.g. domains | get_domain(application_id))
- Sets the iframe flag from applications[application_id].features.iframe
- Builds the URL using the 'domains' variable
- Sets the iframe flag from applications[application_id].features.portfolio_iframe
Only cards whose application_id is included in the variable group_names are returned.
"""
@@ -40,11 +40,22 @@ class LookupModule(LookupBase):
role_basename = os.path.basename(role_dir)
# Skip roles not starting with "web-app-"
if not role_basename.startswith("web-app-"):
if not role_basename.startswith("web-app-"): # Ensure prefix
continue
# Extract application_id from role name
application_id = role_basename[len("web-app-"):]
# Load application_id from role's vars/main.yml
vars_path = os.path.join(role_dir, "vars", "main.yml")
try:
if not os.path.isfile(vars_path):
raise AnsibleError(f"Vars file not found for role '{role_basename}': {vars_path}")
with open(vars_path, "r", encoding="utf-8") as vf:
vars_content = vf.read()
vars_data = yaml.safe_load(vars_content) or {}
application_id = vars_data.get("application_id")
if not application_id:
raise AnsibleError(f"Key 'application_id' not found in {vars_path}")
except Exception as e:
raise AnsibleError(f"Error getting application_id for role '{role_basename}': {e}")
# Skip roles not listed in group_names
if application_id not in group_names:
@@ -65,15 +76,14 @@ class LookupModule(LookupBase):
title_match = re.search(r'^#\s+(.*)$', readme_content, re.MULTILINE)
title = title_match.group(1).strip() if title_match else application_id
except Exception as e:
raise AnsibleError("Error reading '{}': {}".format(readme_path, str(e)))
raise AnsibleError(f"Error reading '{readme_path}': {e}")
# Extract metadata from meta/main.yml
try:
with open(meta_path, "r", encoding="utf-8") as f:
meta_data = yaml.safe_load(f)
meta_data = yaml.safe_load(f) or {}
galaxy_info = meta_data.get("galaxy_info", {})
# If display is set to False ignore it
if not galaxy_info.get("display", True):
continue
@@ -83,7 +93,7 @@ class LookupModule(LookupBase):
icon_class = logo.get("class", "fa-solid fa-cube")
tags = galaxy_info.get("galaxy_tags", [])
except Exception as e:
raise AnsibleError("Error reading '{}': {}".format(meta_path, str(e)))
raise AnsibleError(f"Error reading '{meta_path}': {e}")
# Retrieve domains and applications from the variables
domains = variables.get("domains", {})
@@ -107,7 +117,7 @@ class LookupModule(LookupBase):
"title": title,
"text": description,
"url": url,
"link_text": "Explore {}".format(title),
"link_text": f"Explore {title}",
"iframe": iframe,
"tags": tags,
}

View File

@@ -1,7 +1,7 @@
---
galaxy_info:
author: "Kevin Veen-Birkenbach"
description: "Portfolio to showcase your projects and creative work with a focus on user experience and easy customization. 🚀"
description: "PortUI provides CyMaIS users with a unified web interface to easily access all their applications in one place"
license: "CyMaIS NonCommercial License (CNCL)"
license_url: "https://s.veen.world/cncl"
company: |

View File

@@ -2,7 +2,7 @@ applications:
{% if (portfolio_menu_data.categorized is mapping and portfolio_menu_data.categorized | length > 0)
or (portfolio_menu_data.uncategorized is sequence and portfolio_menu_data.uncategorized | length > 0) %}
- name: Applications
- name: Apps
description: Browse, configure and launch all available applications
icon:
class: fa fa-th-large

View File

@@ -6,9 +6,13 @@
icon:
class: fa-solid fa-expand-arrows-alt
onclick: "toggleFullscreen()"
- name: Tab
description: Open the currently embedded iframe URL in a fresh browser tab
icon:
class: fa-solid fa-up-right-from-square
onclick: openIframeInNewTab()
- name: Reload
description: Reload the application
icon:
class: fa-solid fa-rotate-right
url: "{{ domains | get_domain('web-app-port-ui') }}"

View File

@@ -1,4 +1,4 @@
application_id: "web-app-port-ui"
docker_repository_address: "https://github.com/kevinveenbirkenbach/port-web-ui"
docker_repository_address: "https://github.com/kevinveenbirkenbach/port-ui"
config_inventory_path: "{{ inventory_dir }}/files/{{ inventory_hostname }}/docker/web-app-port-ui/config.yaml.j2"
docker_repository: true

View File

@@ -1,2 +1,2 @@
source_directory: "{{ playbook_dir }}/assets"
url: "{{ web_protocol ~ '://' ~ 'files.' ~ primary_domain ~ '/assets' }}"
url: "{{ web_protocol }}://<< defaults_applications['web-svc-file']domains.canonical[0] >>/assets"

View File

@@ -0,0 +1,35 @@
# Debug Variables Task Include
This task file (`tasks/utils/debug/main.yml`) outputs key variables for troubleshooting Ansible roles and playbooks.
## Purpose
Use this file to quickly debug and inspect variables such as `application_id`, `applications`, `ports`, and more. It helps identify missing or misconfigured variables during playbook runs.
## Usage
Include the debug file in any task list or role:
```yaml
- import_tasks: utils/debug/main.yml
````
or
```yaml
- include_tasks: utils/debug/main.yml
```
Optionally, enable it conditionally:
```yaml
- import_tasks: utils/debug/main.yml
when: enable_debug | default(false)
```
**Note:**
The path is relative to the directory of your task file.
---
This tool is intended for development and troubleshooting only. Remove or disable it in production runs.

View File

@@ -0,0 +1,50 @@
- name: Assert all required application_id-based variables are defined
vars:
missing_keys: []
block:
- name: Check if applications[application_id] exists
set_fact:
missing_keys: "{{ missing_keys + ['applications'] }}"
when: applications.get(application_id, None) is not defined
- name: Check if applications[application_id].docker.services.database.enabled exists
set_fact:
missing_keys: "{{ missing_keys + ['applications.{}.docker.services.database.enabled'.format(application_id)] }}"
when: applications[application_id].docker.services.database is not defined
- name: Check if applications[application_id].docker.services.redis.enabled exists
set_fact:
missing_keys: "{{ missing_keys + ['applications.{}.docker.services.redis.enabled'.format(application_id)] }}"
when: applications[application_id].docker.services.redis is not defined
- name: Check if applications[application_id].images[application_id] exists
set_fact:
missing_keys: "{{ missing_keys + ['applications.{}.images.{}'.format(application_id, application_id)] }}"
when: applications[application_id].images is not defined or applications[application_id].images.get(application_id) is not defined
- name: Check if applications[application_id].features exists
set_fact:
missing_keys: "{{ missing_keys + ['applications.{}.features'.format(application_id)] }}"
when: applications[application_id].features is not defined
- name: Check if ports.localhost.oauth2_proxy[application_id] exists
set_fact:
missing_keys: "{{ missing_keys + ['ports.localhost.oauth2_proxy.{}'.format(application_id)] }}"
when: ports.localhost.oauth2_proxy.get(application_id, None) is not defined
- name: Check if ports.localhost.http[application_id] exists
set_fact:
missing_keys: "{{ missing_keys + ['ports.localhost.http.{}'.format(application_id)] }}"
when: ports.localhost.http.get(application_id, None) is not defined
- name: Check if networks.local[application_id].subnet exists (optional)
set_fact:
missing_keys: "{{ missing_keys + ['networks.local.{}.subnet'.format(application_id)] }}"
when: networks.local.get(application_id, None) is not defined or networks.local[application_id].get('subnet', None) is not defined
- name: Fail if any required keys are missing
debug:
msg: |
The following variables/keys for application_id {{ application_id }} are not defined or not accessible:
{{ missing_keys | join('\n- ') }}
Please define them in your group_vars, host_vars, or inventory.

View File

@@ -0,0 +1,35 @@
- name: Show application_id
debug:
var: application_id
- name: Show applications dict
debug:
var: applications
- name: Show ports
debug:
var: ports
- name: Show networks
debug:
var: networks
- name: Show database_type
debug:
var: database_type
- name: Show database_host
debug:
var: database_host
- name: Show docker_compose
debug:
var: docker_compose
- name: Show container_port
debug:
var: container_port
- name: Show container_healthcheck
debug:
var: container_healthcheck

View File

@@ -9,3 +9,15 @@ applications:
variable_b: {} # Merges with the existing content
variable_c: [] # Replaces the default value (use caution with domains)
```
## Placeholder Logic with `<< >>`
You can reference values from the generated `defaults_applications` dictionary at build time by embedding `<< ... >>` placeholders inside your template. For example:
```yaml
url: "{{ web_protocol }}://<< defaults_applications.web-svc-file.domains.canonical[0] >>/assets"
```
- The `<< ... >>` placeholders are resolved by the [`DictRenderer`](../../../utils/dict_renderer.py) helper class.
- The CLI uses the [`DefaultsGenerator`](../../../cli/build/defaults/applications.py) class to merge all role configurations into a single YAML and then calls the renderer to substitute each `<< ... >>` occurrence.
- Use the `--verbose` flag on the CLI script to log every replacement step, and rely on the builtin timeout (default: 10 seconds) to prevent infinite loops.

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
import os
import unittest
import yaml
from filter_plugins.get_all_application_ids import get_all_application_ids
class TestApplicationIDsInPorts(unittest.TestCase):
def test_all_ports_application_ids_are_valid(self):
# Path to the ports definition file
ports_file = os.path.abspath(
os.path.join(
os.path.dirname(__file__), '..', '..', 'group_vars', 'all', '09_ports.yml'
)
)
with open(ports_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
# Collect all referenced application IDs under ports.hosttype.porttype
refs = set()
ports = data.get('ports', {}) or {}
for hosttype, porttypes in ports.items():
if not isinstance(porttypes, dict):
continue
for porttype, apps in porttypes.items():
if not isinstance(apps, dict):
continue
for app_id in apps.keys():
refs.add(app_id)
# Retrieve valid application IDs from Ansible roles
valid_ids = set(get_all_application_ids())
# Identify IDs that are neither valid nor have a valid prefix before the first underscore
missing = []
for app_id in refs:
if app_id in valid_ids:
continue
prefix = app_id.split('_', 1)[0]
if prefix in valid_ids:
continue
missing.append(app_id)
if missing:
self.fail(
f"Undefined application IDs in ports definition: {', '.join(sorted(missing))}"
)
if __name__ == '__main__':
unittest.main()

View File

@@ -9,10 +9,15 @@ class TestAnsibleRolesMetadata(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not os.path.isdir(cls.ROLES_DIR):
raise unittest.SkipTest(f"Roles directory not found at {cls.ROLES_DIR}")
cls.roles = [d for d in os.listdir(cls.ROLES_DIR)
if os.path.isdir(os.path.join(cls.ROLES_DIR, d))]
all_dirs = os.listdir(cls.ROLES_DIR)
cls.roles = [
d for d in all_dirs
if (
os.path.isdir(os.path.join(cls.ROLES_DIR, d))
and d != '__pycache__'
)
]
def test_each_role_has_valid_meta(self):
"""

View File

@@ -1,51 +1,74 @@
import os
import unittest
import yaml
import glob
import yaml
import warnings
import unittest
class TestApplicationIdMatchesRoleName(unittest.TestCase):
def test_application_id_matches_role_directory(self):
"""
Warn if application_id in vars/main.yml does not match
the role directory basename, to avoid confusion.
If vars/main.yml is missing, do nothing.
"""
# locate the 'roles' directory (two levels up)
# import your filters
from filter_plugins.invokable_paths import get_invokable_paths, get_non_invokable_paths
class TestApplicationIdAndInvocability(unittest.TestCase):
@classmethod
def setUpClass(cls):
# locate roles dir (two levels up)
base_dir = os.path.dirname(__file__)
roles_dir = os.path.abspath(os.path.join(base_dir, '..', '..', 'roles'))
cls.roles_dir = os.path.abspath(os.path.join(base_dir, '..', '..', 'roles'))
# iterate over each role folder
for role_path in glob.glob(os.path.join(roles_dir, '*')):
# get lists of invokable and non-invokable role *names*
# filters return dash-joined paths; for top-level roles names are just the basename
cls.invokable = {
p.split('-', 1)[0]
for p in get_invokable_paths()
}
cls.non_invokable = {
p.split('-', 1)[0]
for p in get_non_invokable_paths()
}
def test_application_id_presence_and_match(self):
"""
- Invokable roles must have application_id defined (else fail).
- Non-invokable roles must NOT have application_id (else fail).
- If application_id exists but != folder name, warn and recommend aligning.
"""
for role_path in glob.glob(os.path.join(self.roles_dir, '*')):
if not os.path.isdir(role_path):
continue
role_name = os.path.basename(role_path)
vars_main = os.path.join(role_path, 'vars', 'main.yml')
# skip roles without vars/main.yml
if not os.path.exists(vars_main):
continue
# load vars/main.yml if it exists
data = {}
if os.path.exists(vars_main):
with open(vars_main, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
app_id = data.get('application_id')
if role_name in self.invokable:
# must have application_id
if app_id is None:
self.fail(f"{role_name}: invokable role is missing 'application_id' in vars/main.yml")
elif role_name in self.non_invokable:
# must NOT have application_id
if app_id is not None:
self.fail(f"{role_name}: non-invokable role should not define 'application_id' in vars/main.yml")
else:
# roles not mentioned in categories.yml? we'll skip them
continue
# if present but mismatched, warn
if app_id is not None and app_id != role_name:
warnings.warn(
f"{role_name}: 'application_id' is missing in vars/main.yml. "
f"Consider setting it to '{role_name}' to avoid confusion."
)
elif app_id != role_name:
warnings.warn(
f"{role_name}: 'application_id' is '{app_id}', "
f"but the folder name is '{role_name}'. "
"This can lead to confusion—using the directory name "
"as the application_id is recommended."
f"{role_name}: 'application_id' is '{app_id}',"
f" but the folder name is '{role_name}'."
" Consider setting application_id to exactly the role folder name to avoid confusion."
)
# always pass
# if we get here, all presence/absence checks passed
self.assertTrue(True)
if __name__ == '__main__':
unittest.main()

View File

@@ -20,7 +20,8 @@ class TestDependencyApplicationId(unittest.TestCase):
vars_file = os.path.join(role_path, 'vars', 'main.yml')
if not os.path.isfile(vars_file):
return None
data = yaml.safe_load(open(vars_file, encoding='utf-8')) or {}
with open(vars_file, encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
return data.get('application_id')
# Iterate all roles
@@ -33,7 +34,9 @@ class TestDependencyApplicationId(unittest.TestCase):
if not os.path.isfile(meta_file):
continue
meta = yaml.safe_load(open(meta_file, encoding='utf-8')) or {}
with open(meta_file, encoding='utf-8') as f:
meta = yaml.safe_load(f) or {}
deps = meta.get('dependencies', [])
if not isinstance(deps, list):
continue

View File

@@ -0,0 +1,82 @@
import glob
import os
import re
import unittest
from filter_plugins.get_all_application_ids import get_all_application_ids
def collect_domain_keys(base_dir="."):
"""
Scan all YAML and Python files under the project for usages of domains.get('...') and domains['...']
and return a dict mapping each domain key to a list of file:line locations where it's used.
Ignores the integration test file itself, but will scan other test files.
"""
pattern = re.compile(r"domains(?:\.get\(\s*['\"](?P<id>[^'\"]+)['\"]\s*\)|\[['\"](?P<id2>[^'\"]+)['\"]\])")
locations = {}
# Path of this test file to ignore
ignore_path = os.path.normpath(os.path.join(base_dir, 'tests', 'integration', 'test_domain_application_ids.py'))
for root, dirs, files in os.walk(base_dir):
for fname in files:
# only scan YAML, YAML and Python files
if not fname.endswith(('.yml', '.yaml', '.py')):
continue
path = os.path.normpath(os.path.join(root, fname))
# skip this integration test file
if path == ignore_path:
continue
try:
with open(path, 'r', encoding='utf-8') as f:
for lineno, line in enumerate(f, start=1):
for m in pattern.finditer(line):
key = m.group('id') or m.group('id2')
loc = f"{path}:{lineno}"
locations.setdefault(key, []).append(loc)
except (OSError, UnicodeDecodeError):
continue
return locations
class TestDomainApplicationIds(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Load valid application IDs from roles
cls.valid_ids = set(get_all_application_ids(roles_dir='roles'))
if not cls.valid_ids:
raise RuntimeError("No application_ids found in roles/*/vars/main.yml")
# Collect domain keys and their locations, excluding this test file
cls.domain_locations = collect_domain_keys(base_dir='.')
if not cls.domain_locations:
raise RuntimeError("No domains.get(...) or domains[...] usages found to validate")
# Define keys to ignore (placeholders or meta-fields)
cls.ignore_keys = {"canonical", "aliases"}
def test_all_keys_are_valid(self):
"""Ensure every domains.get/[] key matches a valid application_id (excluding ignored keys)."""
def is_placeholder(key):
# Treat keys with curly braces as placeholders
return bool(re.match(r"^\{.+\}$", key))
invalid = {}
for key, locs in self.domain_locations.items():
if key in self.ignore_keys or is_placeholder(key):
continue
if key not in self.valid_ids:
invalid[key] = locs
if invalid:
details = []
for key, locs in invalid.items():
locations_str = ", ".join(locs)
details.append(f"'{key}' at {locations_str}")
detail_msg = "; ".join(details)
self.fail(
f"Found usages of domains with invalid application_ids: {detail_msg}. "
f"Valid application_ids are: {sorted(self.valid_ids)}"
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,46 @@
import os
import re
import sys
import unittest
# Ensure filter_plugins is on the path
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.insert(0, PROJECT_ROOT)
from filter_plugins.get_all_application_ids import get_all_application_ids
class TestGetDomainApplicationIds(unittest.TestCase):
"""
Integration test to verify that all string literals passed to get_domain()
correspond to valid application_id values defined in roles/*/vars/main.yml.
"""
GET_DOMAIN_PATTERN = re.compile(r"get_domain\(\s*['\"]([^'\"]+)['\"]\s*\)")
def test_get_domain_literals_are_valid_ids(self):
# Collect all application IDs from roles
valid_ids = set(get_all_application_ids())
# Walk through project files
invalid_usages = []
for root, dirs, files in os.walk(PROJECT_ROOT):
# Skip tests directory to avoid matching in test code
if 'tests' in root.split(os.sep):
continue
for fname in files:
if not fname.endswith('.py'):
continue
path = os.path.join(root, fname)
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
for match in self.GET_DOMAIN_PATTERN.finditer(content):
literal = match.group(1)
if literal not in valid_ids:
invalid_usages.append((path, literal))
if invalid_usages:
msgs = [f"{path}: '{lit}' is not a valid application_id" for path, lit in invalid_usages]
self.fail("Found invalid get_domain() usages:\n" + "\n".join(msgs))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,42 @@
#!/usr/bin/env python3
import os
import unittest
class TestRolesFolderNames(unittest.TestCase):
def test_no_underscore_in_role_folder_names(self):
"""
Integration test that verifies none of the folders under 'roles' contain an underscore in their name,
ignoring the '__pycache__' folder.
"""
# Determine the absolute path to the 'roles' directory
roles_dir = os.path.abspath(
os.path.join(
os.path.dirname(__file__), '..', '..', 'roles'
)
)
# List all entries in the roles directory
try:
entries = os.listdir(roles_dir)
except FileNotFoundError:
self.fail(f"Roles directory not found at expected location: {roles_dir}")
# Identify any role folders containing underscores, excluding '__pycache__'
invalid = []
for name in entries:
# Skip the '__pycache__' directory
if name == '__pycache__':
continue
path = os.path.join(roles_dir, name)
if os.path.isdir(path) and '_' in name:
invalid.append(name)
# Fail the test if any invalid folder names are found
if invalid:
self.fail(
f"Role folder names must not contain underscores: {', '.join(sorted(invalid))}"
)
if __name__ == '__main__':
unittest.main()

View File

@@ -179,7 +179,7 @@ class TestCspFilters(unittest.TestCase):
# Ensure feature enabled and domain set
self.apps['app1']['features']['portfolio_iframe'] = True
# simulate a subdomain for the application
self.domains['portfolio'] = ['domain-example.com']
self.domains['web-app-port-ui'] = ['domain-example.com']
header = self.filter.build_csp_header(self.apps, 'app1', self.domains, web_protocol='https')
# Expect '*.domain-example.com' in the frame-ancestors directive

View File

@@ -0,0 +1,55 @@
import unittest
import tempfile
import os
import yaml
from filter_plugins.get_all_application_ids import get_all_application_ids
class TestGetAllApplicationIds(unittest.TestCase):
def setUp(self):
# Create a temporary directory to act as the roles base
self.tmpdir = tempfile.TemporaryDirectory()
self.roles_dir = os.path.join(self.tmpdir.name, 'roles')
os.makedirs(self.roles_dir)
def tearDown(self):
# Clean up temporary directory
self.tmpdir.cleanup()
def create_role(self, role_name, data):
# Helper to create roles/<role_name>/vars/main.yml with given dict
path = os.path.join(self.roles_dir, role_name, 'vars')
os.makedirs(path, exist_ok=True)
with open(os.path.join(path, 'main.yml'), 'w', encoding='utf-8') as f:
yaml.safe_dump(data, f)
def test_single_application_id(self):
self.create_role('role1', {'application_id': 'app1'})
result = get_all_application_ids(self.roles_dir)
self.assertEqual(result, ['app1'])
def test_multiple_application_ids(self):
self.create_role('role1', {'application_id': 'app1'})
self.create_role('role2', {'application_id': 'app2'})
result = get_all_application_ids(self.roles_dir)
self.assertEqual(sorted(result), ['app1', 'app2'])
def test_duplicate_application_ids(self):
self.create_role('role1', {'application_id': 'app1'})
self.create_role('role2', {'application_id': 'app1'})
result = get_all_application_ids(self.roles_dir)
self.assertEqual(result, ['app1'])
def test_missing_application_id(self):
self.create_role('role1', {'other_key': 'value'})
result = get_all_application_ids(self.roles_dir)
self.assertEqual(result, [])
def test_no_roles_directory(self):
# Point to a non-existent directory
empty_dir = os.path.join(self.tmpdir.name, 'no_roles_here')
result = get_all_application_ids(empty_dir)
self.assertEqual(result, [])
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,63 @@
# tests/unit/filter_plugins/test_get_application_id.py
import unittest
import os
import tempfile
import shutil
import yaml
from ansible.errors import AnsibleFilterError
from filter_plugins.get_application_id import get_application_id
class TestGetApplicationIdFilter(unittest.TestCase):
def setUp(self):
# Create a temporary project directory and switch to it
self.tmpdir = tempfile.mkdtemp()
self.original_cwd = os.getcwd()
os.chdir(self.tmpdir)
# Create the roles/testrole/vars directory structure
self.role_name = 'testrole'
self.vars_dir = os.path.join('roles', self.role_name, 'vars')
os.makedirs(self.vars_dir)
self.vars_file = os.path.join(self.vars_dir, 'main.yml')
def tearDown(self):
# Return to original cwd and remove temp directory
os.chdir(self.original_cwd)
shutil.rmtree(self.tmpdir)
def write_vars_file(self, content):
with open(self.vars_file, 'w') as f:
yaml.dump(content, f)
def test_returns_application_id(self):
# Given a valid vars file with application_id
expected_id = '12345'
self.write_vars_file({'application_id': expected_id})
# When
result = get_application_id(self.role_name)
# Then
self.assertEqual(result, expected_id)
def test_file_not_found_raises_error(self):
# Given no vars file for a nonexistent role
with self.assertRaises(AnsibleFilterError) as cm:
get_application_id('nonexistent_role')
self.assertIn("Vars file not found", str(cm.exception))
def test_missing_key_raises_error(self):
# Given a vars file without application_id
self.write_vars_file({'other_key': 'value'})
with self.assertRaises(AnsibleFilterError) as cm:
get_application_id(self.role_name)
self.assertIn("Key 'application_id' not found", str(cm.exception))
def test_invalid_yaml_raises_error(self):
# Write invalid YAML content
with open(self.vars_file, 'w') as f:
f.write(":::not a yaml:::")
with self.assertRaises(AnsibleFilterError) as cm:
get_application_id(self.role_name)
self.assertIn("Error reading YAML", str(cm.exception))
if __name__ == '__main__':
unittest.main()

View File

@@ -5,7 +5,7 @@ import unittest
import yaml
from ansible.errors import AnsibleFilterError
from filter_plugins.get_role_folder import get_role_folder
from filter_plugins.get_role import get_role
class TestGetRoleFolder(unittest.TestCase):
def setUp(self):
@@ -35,20 +35,20 @@ class TestGetRoleFolder(unittest.TestCase):
def test_find_existing_role(self):
# Should find role1 for application_id 'app-123'
result = get_role_folder('app-123', roles_path=self.roles_dir)
result = get_role('app-123', roles_path=self.roles_dir)
self.assertEqual(result, 'role1')
def test_no_match_raises(self):
# No role has application_id 'nonexistent'
with self.assertRaises(AnsibleFilterError) as cm:
get_role_folder('nonexistent', roles_path=self.roles_dir)
get_role('nonexistent', roles_path=self.roles_dir)
self.assertIn("No role found with application_id 'nonexistent'", str(cm.exception))
def test_missing_roles_path(self):
# Path does not exist
invalid_path = os.path.join(self.tempdir, 'invalid')
with self.assertRaises(AnsibleFilterError) as cm:
get_role_folder('any', roles_path=invalid_path)
get_role('any', roles_path=invalid_path)
self.assertIn(f"Roles path not found: {invalid_path}", str(cm.exception))
def test_invalid_yaml_raises(self):
@@ -59,7 +59,7 @@ class TestGetRoleFolder(unittest.TestCase):
f.write("::: invalid yaml :::")
with self.assertRaises(AnsibleFilterError) as cm:
get_role_folder('app-123', roles_path=self.roles_dir)
get_role('app-123', roles_path=self.roles_dir)
self.assertIn('Failed to load', str(cm.exception))
if __name__ == '__main__':

View File

@@ -1,3 +1,5 @@
# tests/unit/lookup_plugins/test_docker_cards.py
import os
import sys
import tempfile
@@ -9,14 +11,22 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../roles/web-a
from docker_cards import LookupModule
class TestDockerCardsLookup(unittest.TestCase):
def setUp(self):
# Create a temporary directory to simulate the roles directory.
self.test_roles_dir = tempfile.mkdtemp(prefix="test_roles_")
# Create a sample role "web-app-port-ui".
# Create a sample role "web-app-port-ui" under that directory.
self.role_name = "web-app-port-ui"
self.role_dir = os.path.join(self.test_roles_dir, self.role_name)
os.makedirs(os.path.join(self.role_dir, "meta"))
os.makedirs(os.path.join(self.role_dir, "vars"))
# Create vars/main.yml so get_application_id() can find the application_id.
vars_main = os.path.join(self.role_dir, "vars", "main.yml")
with open(vars_main, "w", encoding="utf-8") as f:
f.write("application_id: portfolio\n")
# Create a sample README.md with a H1 line for the title.
readme_path = os.path.join(self.role_dir, "README.md")
@@ -80,7 +90,7 @@ galaxy_info:
"applications": {
"portfolio": {
"features": {
"iframe": True
"portfolio_iframe": True
}
}
},
@@ -96,5 +106,6 @@ galaxy_info:
self.assertIsInstance(cards, list)
self.assertEqual(len(cards), 0)
if __name__ == "__main__":
unittest.main()

View File

View File

@@ -0,0 +1,86 @@
import unittest
from utils.dict_renderer import DictRenderer
class TestDictRenderer(unittest.TestCase):
def setUp(self):
# Timeout is small for tests, verbose off
self.renderer = DictRenderer(verbose=False, timeout=1.0)
def test_simple_replacement(self):
data = {"foo": "bar", "val": "<<foo>>"}
rendered = self.renderer.render(data)
self.assertEqual(rendered["val"], "bar")
def test_nested_replacement(self):
data = {"parent": {"child": "value"}, "ref": "<<parent.child>>"}
rendered = self.renderer.render(data)
self.assertEqual(rendered["ref"], "value")
def test_list_index(self):
data = {"lst": [10, 20, 30], "second": "<<lst[1]>>"}
rendered = self.renderer.render(data)
self.assertEqual(rendered["second"], "20")
def test_multi_pass(self):
data = {"a": "<<b>>", "b": "<<c>>", "c": "final"}
rendered = self.renderer.render(data)
self.assertEqual(rendered["a"], "final")
def test_unresolved_raises(self):
data = {"a": "<<missing>>"}
with self.assertRaises(ValueError) as cm:
self.renderer.render(data)
self.assertIn("missing", str(cm.exception))
def test_leave_curly(self):
data = {"tmpl": "{{ not touched }}"}
rendered = self.renderer.render(data)
self.assertEqual(rendered["tmpl"], "{{ not touched }}")
def test_mixed_braces(self):
data = {"foo": "bar", "tmpl": "{{ <<foo>> }}"}
rendered = self.renderer.render(data)
self.assertEqual(rendered["tmpl"], "{{ bar }}")
def test_single_quoted_key(self):
# ['foo-bar'] should resolve the key 'foo-bar'
data = {
"foo-bar": {"val": "xyz"},
"result": "<<['foo-bar'].val>>"
}
rendered = self.renderer.render(data)
self.assertEqual(rendered["result"], "xyz")
def test_double_quoted_key(self):
# ["foo-bar"] should also resolve the key 'foo-bar'
data = {
"foo-bar": {"val": 123},
"result": '<<["foo-bar"].val>>'
}
rendered = self.renderer.render(data)
self.assertEqual(rendered["result"], "123")
def test_mixed_bracket_and_dot_with_index(self):
# Combine quoted key, dot access and numeric index
data = {
"web-svc-file": {
"domains": {
"canonical": ["file.example.com"]
}
},
"url": '<<[\'web-svc-file\'].domains.canonical[0]>>'
}
rendered = self.renderer.render(data)
self.assertEqual(rendered["url"], "file.example.com")
def test_double_quoted_key_with_list_index(self):
# Double-quoted key and list index together
data = {
"my-list": [ "a", "b", "c" ],
"pick": '<<["my-list"][2]>>'
}
rendered = self.renderer.render(data)
self.assertEqual(rendered["pick"], "c")
if __name__ == "__main__":
unittest.main()

119
utils/dict_renderer.py Normal file
View File

@@ -0,0 +1,119 @@
import re
import time
from typing import Any, Dict, Union, List, Set
class DictRenderer:
"""
Resolves placeholders in the form << path >> within nested dictionaries,
supporting hyphens, numeric list indexing, and quoted keys via ['key'] or ["key"].
"""
# Match << path >> where path contains no whitespace or closing >
PATTERN = re.compile(r"<<\s*(?P<path>[^\s>]+)\s*>>")
# Tokenizes a path into unquoted keys, single-quoted, double-quoted keys, or numeric indices
TOKEN_REGEX = re.compile(
r"(?P<key>[\w\-]+)"
r"|\['(?P<qkey>[^']+)'\]"
r"|\[\"(?P<dkey>[^\"]+)\"\]"
r"|\[(?P<idx>\d+)\]"
)
def __init__(self, verbose: bool = False, timeout: float = 10.0):
self.verbose = verbose
self.timeout = timeout
def render(self, data: Union[Dict[str, Any], List[Any]]) -> Union[Dict[str, Any], List[Any]]:
start = time.monotonic()
self.root = data
rendered = data
pass_num = 0
while True:
pass_num += 1
if self.verbose:
print(f"[DictRenderer] Pass {pass_num} starting...")
rendered, changed = self._render_pass(rendered)
if not changed:
if self.verbose:
print(f"[DictRenderer] No more placeholders after pass {pass_num}.")
break
if time.monotonic() - start > self.timeout:
raise TimeoutError(f"Rendering exceeded timeout of {self.timeout} seconds")
# After all passes, raise error on unresolved placeholders
unresolved = self.find_unresolved(rendered)
if unresolved:
raise ValueError(f"Unresolved placeholders: {', '.join(sorted(unresolved))}")
return rendered
def _render_pass(self, obj: Any) -> (Any, bool):
if isinstance(obj, dict):
new = {}
changed = False
for k, v in obj.items():
nv, ch = self._render_pass(v)
new[k] = nv
changed = changed or ch
return new, changed
if isinstance(obj, list):
new_list = []
changed = False
for item in obj:
ni, ch = self._render_pass(item)
new_list.append(ni)
changed = changed or ch
return new_list, changed
if isinstance(obj, str):
def repl(m):
path = m.group('path')
val = self._lookup(path)
if val is not None:
if self.verbose:
print(f"[DictRenderer] Resolving <<{path}>> -> {val}")
return str(val)
return m.group(0)
new_str = self.PATTERN.sub(repl, obj)
return new_str, new_str != obj
return obj, False
def _lookup(self, path: str) -> Any:
current = self.root
for m in self.TOKEN_REGEX.finditer(path):
if m.group('key') is not None:
if isinstance(current, dict):
current = current.get(m.group('key'))
else:
return None
elif m.group('qkey') is not None:
if isinstance(current, dict):
current = current.get(m.group('qkey'))
else:
return None
elif m.group('dkey') is not None:
if isinstance(current, dict):
current = current.get(m.group('dkey'))
else:
return None
elif m.group('idx') is not None:
idx = int(m.group('idx'))
if isinstance(current, list) and 0 <= idx < len(current):
current = current[idx]
else:
return None
if current is None:
return None
return current
def find_unresolved(self, data: Any) -> Set[str]:
"""Return all paths of unresolved << placeholders in data."""
unresolved: Set[str] = set()
if isinstance(data, dict):
for v in data.values():
unresolved |= self.find_unresolved(v)
elif isinstance(data, list):
for item in data:
unresolved |= self.find_unresolved(item)
elif isinstance(data, str):
for m in self.PATTERN.finditer(data):
unresolved.add(m.group('path'))
return unresolved