18 Commits

Author SHA1 Message Date
f443300f70 Used PKGMGR as Basis for infinito.nexus 2025-12-16 22:53:30 +01:00
b5749415d1 Added ruff linter 2025-12-16 22:24:57 +01:00
3123ac4a08 Fixed Unit Tests https://chatgpt.com/share/6941cd96-81b4-800f-9017-2e4bcdc4bd52 2025-12-16 22:22:47 +01:00
d8361fe00a Deactivated venv. Was propably bug due to refactoring. 2025-12-16 21:44:13 +01:00
253a18921f Optimized .dockerfile 2025-12-16 21:26:52 +01:00
22f1e24773 split tests into lint/unit/integration and enhance vars-usage lint
- Introduce separate test-lint, test-unit, and test-integration targets
- Add TEST_PATTERN support to filter unittest discovery per test category
- Move vars-usage check to tests/lint and classify it as a static lint test
- Enhance vars-usage lint to report defining files and line numbers
- Keep test-messy for backwards compatibility and Ansible syntax check

https://chatgpt.com/share/6941c04c-b8d0-800f-9fe8-a5c01a1e1032
2025-12-16 21:25:34 +01:00
14548cbc52 Replaced false make messy-build by make setup 2025-12-16 20:51:37 +01:00
57154bc6e7 Optimiezd command documentation 2025-12-16 20:47:05 +01:00
a7140f0097 Adapted python import path 2025-12-16 20:41:52 +01:00
0e89d89b45 Make sound support optional and guard against missing audio dependencies
- Move simpleaudio to optional dependency (audio extra)
- Add DummySound fallback when optional audio libs are unavailable
- Import simpleaudio/numpy lazily with ImportError handling
- Remove Docker-specific sound disabling logic
- Improve typing and robustness of sound utilities

https://chatgpt.com/share/693dec1d-60bc-800f-8ffe-3886a9c265bd
2025-12-13 23:43:36 +01:00
d0882433c8 Refactor setup workflow and make install robust via virtualenv
- Introduce a dedicated Python virtualenv (deps target) and run all setup scripts through it
- Fix missing PyYAML errors in clean, CI, and Nix environments
- Refactor build defaults into cli/setup for clearer semantics
- Make setup deterministic and independent from system Python
- Replace early Makefile shell expansion with runtime evaluation
- Rename messy-test to test-messy and update deploy logic and tests accordingly
- Keep setup and test targets consistent across Makefile, CLI, and unit tests

https://chatgpt.com/share/693de226-00ac-800f-8cbd-06552b2f283c
2025-12-13 23:00:13 +01:00
600d7a1fe8 Ignored python package build files 2025-12-13 22:13:53 +01:00
0580839705 Makefile: unify Python interpreter via PYTHON variable
Avoids mixed system/Nix/venv Python usage and fixes missing PyYAML errors.

https://chatgpt.com/share/693dd6b2-14f0-800f-9b95-368d58b68f49
2025-12-13 22:12:12 +01:00
7070100363 Added missing PyYAML 2025-12-13 21:41:29 +01:00
ad813df0c5 Switch to pyproject.toml for Python dependencies
Introduce pyproject.toml as the single source of truth for Python dependencies.
Remove legacy requirements.txt and simplify requirements.yml to Ansible collections only.
Drop pytest in favor of the built-in unittest framework.

https://chatgpt.com/share/693dbe8c-8b64-800f-a6e5-41b7d21ae7e0
2025-12-13 20:29:09 +01:00
f8e2aa2b93 Added mirrors 2025-12-13 09:27:04 +01:00
d0a2c3fada Release version 0.2.1 2025-12-10 21:14:47 +01:00
75eaecce5b **Remove obsolete installation/administration docs, fix pgAdmin server mode condition, normalize git repository vars, and ensure correct application_id for web-app-sphinx**
* Remove outdated `Installation.md` and `Administration.md` documentation from Akaunting and Peertube roles
* Fix `server_mode` conditional in `web-app-pgadmin` to avoid unintended defaults
* Normalize formatting of git repository variables in `web-app-roulette-wheel`
* Explicitly set `application_id` when loading `sys-stk-full-stateless` in `web-app-sphinx` to prevent scoping issues

https://chatgpt.com/share/6939d42e-483c-800f-b0fc-be61caab615d
2025-12-10 21:12:15 +01:00
32 changed files with 623 additions and 456 deletions

View File

@@ -10,4 +10,5 @@ venv
*tree.json *tree.json
roles/list.json roles/list.json
*.pyc *.pyc
.git *.egg-info
build.git

38
.github/workflows/lint-python.yml vendored Normal file
View File

@@ -0,0 +1,38 @@
name: Lint Python (ruff)
on:
push:
branches:
- master
- main
- develop
- "*"
pull_request:
jobs:
lint-python:
name: lint-python
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install ruff
run: |
python -m pip install --upgrade pip
pip install ruff
- name: Ruff (lint)
run: |
ruff check .
- name: Ruff (format check)
run: |
ruff format --check .

2
.gitignore vendored
View File

@@ -10,3 +10,5 @@ venv
*tree.json *tree.json
roles/list.json roles/list.json
*.pyc *.pyc
*.egg-info
build

View File

@@ -1,3 +1,8 @@
## [0.2.1] - 2025-12-10
* restored full deployability of the Sphinx app by fixing the application_id scoping bug.
## [0.2.0] - 2025-12-10 ## [0.2.0] - 2025-12-10
* Added full Nix installer integration with dynamic upstream SHA256 verification, OS-specific installation paths, template-driven configuration, and updated pkgmgr integration. * Added full Nix installer integration with dynamic upstream SHA256 verification, OS-specific installation paths, template-driven configuration, and updated pkgmgr integration.

View File

@@ -1,60 +1,41 @@
FROM archlinux:latest # syntax=docker/dockerfile:1
# 1) Pakete inkl. docker (damit docker CLI im Container vorhanden ist) ARG DISTRO=arch
RUN pacman -Syu --noconfirm \
base-devel \
git \
python \
python-pip \
python-setuptools \
alsa-lib \
go \
rsync \
docker \
&& pacman -Scc --noconfirm
# 2) systemctl & yay stubben ARG PKGMGR_IMAGE_OWNER=kevinveenbirkenbach
RUN printf '#!/bin/sh\nexit 0\n' > /usr/bin/systemctl \ ARG PKGMGR_IMAGE_TAG=stable
&& chmod +x /usr/bin/systemctl \ ARG PKGMGR_IMAGE="ghcr.io/${PKGMGR_IMAGE_OWNER}/pkgmgr-${DISTRO}:${PKGMGR_IMAGE_TAG}"
&& printf '#!/bin/sh\nexit 0\n' > /usr/bin/yay \
&& chmod +x /usr/bin/yay
# 3) python-simpleaudio aus AUR FROM ${PKGMGR_IMAGE} AS infinito
RUN useradd -m aur_builder \ SHELL ["/bin/bash", "-lc"]
&& su aur_builder -c "git clone https://aur.archlinux.org/python-simpleaudio.git /home/aur_builder/psa && \
cd /home/aur_builder/psa && \
makepkg --noconfirm --skippgpcheck" \
&& pacman -U --noconfirm /home/aur_builder/psa/*.pkg.tar.zst \
&& rm -rf /home/aur_builder/psa
# 4) pkgmgr + venv RUN cat /etc/os-release || true
ENV PKGMGR_REPO=/opt/package-manager \
PKGMGR_VENV=/root/.venvs/pkgmgr
RUN git clone https://github.com/kevinveenbirkenbach/package-manager.git $PKGMGR_REPO \ # ------------------------------------------------------------
&& python -m venv $PKGMGR_VENV \ # Infinito.Nexus source in
&& $PKGMGR_VENV/bin/pip install --upgrade pip \ # ------------------------------------------------------------
&& $PKGMGR_VENV/bin/pip install --no-cache-dir -r $PKGMGR_REPO/requirements.txt ansible \
&& printf '#!/bin/sh\n. %s/bin/activate\nexec python %s/main.py "$@"\n' \
"$PKGMGR_VENV" "$PKGMGR_REPO" > /usr/local/bin/pkgmgr \
&& chmod +x /usr/local/bin/pkgmgr
ENV PATH="$PKGMGR_VENV/bin:/root/.local/bin:${PATH}"
# 6) Infinito.Nexus Quelle rein
COPY . /opt/infinito-src COPY . /opt/infinito-src
# 7) Infinito via pkgmgr (shallow) # ------------------------------------------------------------
RUN pkgmgr install infinito --clone-mode shallow # Install infinito via pkgmgr (shallow)
# ------------------------------------------------------------
RUN set -euo pipefail; \
pkgmgr install infinito --clone-mode shallow
# 8) Override mit lokaler Quelle # ------------------------------------------------------------
RUN INFINITO_PATH=$(pkgmgr path infinito) && \ # Override with local source
rm -rf "$INFINITO_PATH"/* && \ # ------------------------------------------------------------
rsync -a --delete --exclude='.git' /opt/infinito-src/ "$INFINITO_PATH"/ RUN set -euo pipefail; \
INFINITO_PATH="$(pkgmgr path infinito)"; \
rm -rf "${INFINITO_PATH:?}/"*; \
rsync -a --delete --exclude='.git' /opt/infinito-src/ "${INFINITO_PATH}/"
# 9) Symlink # ------------------------------------------------------------
RUN INFINITO_PATH=$(pkgmgr path infinito) && \ # Symlink entry
ln -sf "$INFINITO_PATH"/main.py /usr/local/bin/infinito && \ # ------------------------------------------------------------
chmod +x /usr/local/bin/infinito RUN set -euo pipefail; \
INFINITO_PATH="$(pkgmgr path infinito)"; \
ln -sf "${INFINITO_PATH}/main.py" /usr/local/bin/infinito; \
chmod +x /usr/local/bin/infinito
CMD sh -c "infinito --help && exec tail -f /dev/null" CMD ["bash", "-lc", "infinito --help && exec tail -f /dev/null"]

3
MIRRORS Normal file
View File

@@ -0,0 +1,3 @@
git@github.com:infinito-nexus/core.git
ssh://git@code.infinito.nexus:2201/infinito/nexus.git
git@github.com:kevinveenbirkenbach/infinito-nexus.git

119
Makefile
View File

@@ -1,15 +1,27 @@
SHELL := /usr/bin/env bash
VENV ?= .venv
PYTHON ?= python3
PIP ?= $(PYTHON) -m pip
ROLES_DIR := ./roles ROLES_DIR := ./roles
APPLICATIONS_OUT := ./group_vars/all/04_applications.yml APPLICATIONS_OUT := ./group_vars/all/04_applications.yml
APPLICATIONS_SCRIPT := ./cli/build/defaults/applications.py APPLICATIONS_SCRIPT := ./cli/setup/applications.py
USERS_SCRIPT := ./cli/setup/users.py
USERS_OUT := ./group_vars/all/03_users.yml USERS_OUT := ./group_vars/all/03_users.yml
USERS_SCRIPT := ./cli/build/defaults/users.py
INCLUDES_SCRIPT := ./cli/build/role_include.py INCLUDES_SCRIPT := ./cli/build/role_include.py
INCLUDE_GROUPS := $(shell python3 main.py meta categories invokable -s "-" --no-signal | tr '\n' ' ')
# Directory where these include-files will be written # Directory where these include-files will be written
INCLUDES_OUT_DIR := ./tasks/groups INCLUDES_OUT_DIR := ./tasks/groups
# --- Test filtering (unittest discover) ---
TEST_PATTERN ?= test*.py
LINT_TESTS_DIR ?= tests/lint
UNIT_TESTS_DIR ?= tests/unit
INTEGRATION_TESTS_DIR ?= tests/integration
# Ensure repo root is importable (so module_utils/, filter_plugins/ etc. work)
PYTHONPATH ?= .
# Compute extra users as before # Compute extra users as before
RESERVED_USERNAMES := $(shell \ RESERVED_USERNAMES := $(shell \
find $(ROLES_DIR) -maxdepth 1 -type d -printf '%f\n' \ find $(ROLES_DIR) -maxdepth 1 -type d -printf '%f\n' \
@@ -19,7 +31,10 @@ RESERVED_USERNAMES := $(shell \
| paste -sd, - \ | paste -sd, - \
) )
.PHONY: build install test .PHONY: \
deps setup setup-clean install \
test test-messy test-lint test-unit test-integration \
clean clean-keep-logs list tree mig dockerignore
clean-keep-logs: clean-keep-logs:
@echo "🧹 Cleaning ignored files but keeping logs/…" @echo "🧹 Cleaning ignored files but keeping logs/…"
@@ -30,56 +45,102 @@ clean:
git clean -fdX git clean -fdX
list: list:
@echo Generating the roles list @echo "Generating the roles list"
python3 main.py build roles_list $(PYTHON) main.py build roles_list
tree: tree:
@echo Generating Tree @echo "Generating Tree"
python3 main.py build tree -D 2 --no-signal $(PYTHON) main.py build tree -D 2 --no-signal
mig: list tree mig: list tree
@echo Creating meta data for meta infinity graph @echo "Creating meta data for meta infinity graph"
make build:
docker build --network=host -t infinito:latest .
dockerignore: dockerignore:
@echo Create dockerignore @echo "Create dockerignore"
cat .gitignore > .dockerignore cat .gitignore > .dockerignore
echo ".git" >> .dockerignore echo ".git" >> .dockerignore
messy-build: dockerignore setup: dockerignore
@echo "🔧 Generating users defaults → $(USERS_OUT)" @echo "🔧 Generating users defaults → $(USERS_OUT)"
python3 $(USERS_SCRIPT) \ $(PYTHON) $(USERS_SCRIPT) \
--roles-dir $(ROLES_DIR) \ --roles-dir $(ROLES_DIR) \
--output $(USERS_OUT) \ --output $(USERS_OUT) \
--reserved-usernames "$(RESERVED_USERNAMES)" --reserved-usernames "$(RESERVED_USERNAMES)"
@echo "✅ Users defaults written to $(USERS_OUT)\n" @echo "✅ Users defaults written to $(USERS_OUT)\n"
@echo "🔧 Generating applications defaults → $(APPLICATIONS_OUT)" @echo "🔧 Generating applications defaults → $(APPLICATIONS_OUT)"
python3 $(APPLICATIONS_SCRIPT) \ $(PYTHON) $(APPLICATIONS_SCRIPT) \
--roles-dir $(ROLES_DIR) \ --roles-dir $(ROLES_DIR) \
--output-file $(APPLICATIONS_OUT) --output-file $(APPLICATIONS_OUT)
@echo "✅ Applications defaults written to $(APPLICATIONS_OUT)\n" @echo "✅ Applications defaults written to $(APPLICATIONS_OUT)\n"
@echo "🔧 Generating role-include files for each group…" @echo "🔧 Generating role-include files for each group…"
@mkdir -p $(INCLUDES_OUT_DIR) @mkdir -p $(INCLUDES_OUT_DIR)
@$(foreach grp,$(INCLUDE_GROUPS), \ @INCLUDE_GROUPS="$$( $(PYTHON) main.py meta categories invokable -s "-" --no-signal | tr '\n' ' ' )"; \
out=$(INCLUDES_OUT_DIR)/$(grp)roles.yml; \ for grp in $$INCLUDE_GROUPS; do \
echo "→ Building $$out (pattern: '$(grp)')…"; \ out="$(INCLUDES_OUT_DIR)/$${grp}roles.yml"; \
python3 $(INCLUDES_SCRIPT) $(ROLES_DIR) \ echo "→ Building $$out (pattern: '$$grp')…"; \
-p $(grp) -o $$out; \ $(PYTHON) $(INCLUDES_SCRIPT) $(ROLES_DIR) -p $$grp -o $$out; \
echo "$$out"; \ echo "$$out"; \
) done
messy-test: setup-clean: clean setup
@echo "🧪 Running Python tests…" @echo "Full build with cleanup before was executed."
PYTHONPATH=. python -m unittest discover -s tests
# --- Tests (separated) ---
test-lint:
@if [ ! -d "$(LINT_TESTS_DIR)" ]; then \
echo " No lint tests directory found at $(LINT_TESTS_DIR) (skipping)."; \
exit 0; \
fi
@echo "🔎 Running lint tests (dir: $(LINT_TESTS_DIR), pattern: $(TEST_PATTERN))…"
@PYTHONPATH="$(PYTHONPATH)" $(PYTHON) -m unittest discover \
-s "$(LINT_TESTS_DIR)" \
-p "$(TEST_PATTERN)" \
-t "$(PYTHONPATH)"
test-unit:
@if [ ! -d "$(UNIT_TESTS_DIR)" ]; then \
echo " No unit tests directory found at $(UNIT_TESTS_DIR) (skipping)."; \
exit 0; \
fi
@echo "🧪 Running unit tests (dir: $(UNIT_TESTS_DIR), pattern: $(TEST_PATTERN))…"
@PYTHONPATH="$(PYTHONPATH)" $(PYTHON) -m unittest discover \
-s "$(UNIT_TESTS_DIR)" \
-p "$(TEST_PATTERN)" \
-t "$(PYTHONPATH)"
test-integration:
@if [ ! -d "$(INTEGRATION_TESTS_DIR)" ]; then \
echo " No integration tests directory found at $(INTEGRATION_TESTS_DIR) (skipping)."; \
exit 0; \
fi
@echo "🧪 Running integration tests (dir: $(INTEGRATION_TESTS_DIR), pattern: $(TEST_PATTERN))…"
@PYTHONPATH="$(PYTHONPATH)" $(PYTHON) -m unittest discover \
-s "$(INTEGRATION_TESTS_DIR)" \
-p "$(TEST_PATTERN)" \
-t "$(PYTHONPATH)"
# Backwards compatible target (kept)
test-messy: test-lint test-unit test-integration
@echo "📑 Checking Ansible syntax…" @echo "📑 Checking Ansible syntax…"
ansible-playbook -i localhost, -c local $(foreach f,$(wildcard group_vars/all/*.yml),-e @$(f)) playbook.yml --syntax-check ansible-playbook -i localhost, -c local $(foreach f,$(wildcard group_vars/all/*.yml),-e @$(f)) playbook.yml --syntax-check
install: build test: clean setup test-messy
@echo "⚙️ Install complete." @echo "✅ Full test (setup + tests) executed."
build: clean messy-build deps:
@echo "Full build with cleanup before was executed." @if [ ! -d "$(VENV)" ]; then \
echo "🐍 Creating virtualenv $(VENV)"; \
python3 -m venv "$(VENV)"; \
fi
@echo "📦 Installing Python dependencies"
@$(PIP) install --upgrade pip setuptools wheel
@$(PIP) install -e .
test: build messy-test install: deps
@echo "Full test with build before was executed." @echo "✅ Python environment installed (editable)."

View File

@@ -1,4 +1,3 @@
# cli/deploy/container.py
import argparse import argparse
import os import os
import subprocess import subprocess

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@@ -63,8 +63,8 @@ def run_ansible_playbook(
# 2) Build Phase # 2) Build Phase
# --------------------------------------------------------- # ---------------------------------------------------------
if not skip_build: if not skip_build:
print("\n🛠️ Running project build (make messy-build)...\n") print("\n🛠️ Running project build (make setup)...\n")
subprocess.run(["make", "messy-build"], check=True) subprocess.run(["make", "setup"], check=True)
else: else:
print("\n🛠️ Build skipped (--skip-build)\n") print("\n🛠️ Build skipped (--skip-build)\n")
@@ -95,8 +95,8 @@ def run_ansible_playbook(
# 4) Test Phase # 4) Test Phase
# --------------------------------------------------------- # ---------------------------------------------------------
if not skip_tests: if not skip_tests:
print("\n🧪 Running tests (make messy-test)...\n") print("\n🧪 Running tests (make test-messy)...\n")
subprocess.run(["make", "messy-test"], check=True) subprocess.run(["make", "test-messy"], check=True)
else: else:
print("\n🧪 Tests skipped (--skip-tests)\n") print("\n🧪 Tests skipped (--skip-tests)\n")

View File

@@ -6,7 +6,7 @@ import time
from pathlib import Path from pathlib import Path
# Ensure project root on PYTHONPATH so module_utils is importable # Ensure project root on PYTHONPATH so module_utils is importable
repo_root = Path(__file__).resolve().parent.parent.parent.parent repo_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(repo_root)) sys.path.insert(0, str(repo_root))
# Add lookup_plugins for application_gid # Add lookup_plugins for application_gid

View File

@@ -29,7 +29,7 @@ infinito --help
To deploy Infinito.Nexus on a personal computer (e.g., a laptop), you can run: To deploy Infinito.Nexus on a personal computer (e.g., a laptop), you can run:
```bash ```bash
infinito playbook \ infinito deploy dedicated \
--limit hp-spectre-x360 \ --limit hp-spectre-x360 \
--host-type personal-computer \ --host-type personal-computer \
--update \ --update \
@@ -64,7 +64,7 @@ To avoid typing your vault password interactively, you can provide a file:
## 🔍 Full Command-Line Reference ## 🔍 Full Command-Line Reference
Heres a breakdown of all available parameters from `infinito playbook --help`: Heres a breakdown of all available parameters from `infinito deploy dedicated --help`:
| Argument | Description | | Argument | Description |
|----------|-------------| |----------|-------------|
@@ -87,7 +87,7 @@ Heres a breakdown of all available parameters from `infinito playbook --help`
You can mix and match modes like this: You can mix and match modes like this:
```bash ```bash
infinito playbook --update --backup --cleanup pcs.yml infinito deploy dedicated --update --backup --cleanup pcs.yml
``` ```
This will update the system, create a backup, and clean up unnecessary files in one run. This will update the system, create a backup, and clean up unnecessary files in one run.

View File

@@ -209,7 +209,7 @@ def print_global_help(available, cli_dir):
Fore.CYAN Fore.CYAN
)) ))
print(color_text( print(color_text(
" corresponds to `cli/build/defaults/users.py`.", " corresponds to `cli/setup/users.py`.",
Fore.CYAN Fore.CYAN
)) ))
print() print()

View File

@@ -1,186 +1,227 @@
import os import os
import warnings
class DummySound: class DummySound:
@staticmethod @staticmethod
def play_start_sound(): pass def play_start_sound() -> None:
pass
@staticmethod @staticmethod
def play_infinito_intro_sound(): pass def play_infinito_intro_sound() -> None:
pass
@staticmethod @staticmethod
def play_finished_successfully_sound(): pass def play_finished_successfully_sound() -> None:
pass
@staticmethod @staticmethod
def play_finished_failed_sound(): pass def play_finished_failed_sound() -> None:
pass
@staticmethod @staticmethod
def play_warning_sound(): pass def play_warning_sound() -> None:
pass
_IN_DOCKER = os.path.exists('/.dockerenv')
if _IN_DOCKER: try:
Sound = DummySound import numpy as np
else: import simpleaudio as sa
try: import shutil
import numpy as np import subprocess
import simpleaudio as sa import tempfile
import shutil, subprocess, tempfile, wave as wavmod import wave as wavmod
class Sound:
"""
Sound effects for the application with enhanced complexity.
Each sound uses at least 6 distinct tones and lasts no more than max_length seconds,
except the intro sound which is a detailed 26-second Berlin techno-style build-up, 12-second celebration with a descending-fifth chord sequence of 7 chords, and breakdown with melodic background.
Transitions between phases now crossfade over 3 seconds for smoother flow.
"""
fs = 44100 # Sampling rate (samples per second) class Sound:
complexity_factor = 10 # Number of harmonics to sum for richer timbres """
max_length = 2.0 # Maximum total duration of any sound in seconds Sound effects for the application.
"""
@staticmethod fs = 44100
def _generate_complex_wave(frequency: float, duration: float, harmonics: int = None) -> np.ndarray: complexity_factor = 10
if harmonics is None: max_length = 2.0
harmonics = Sound.complexity_factor
t = np.linspace(0, duration, int(Sound.fs * duration), False)
wave = np.zeros_like(t)
for n in range(1, harmonics + 1):
wave += (1 / n) * np.sin(2 * np.pi * frequency * n * t)
# ADSR envelope
attack = int(0.02 * Sound.fs)
release = int(0.05 * Sound.fs)
env = np.ones_like(wave)
env[:attack] = np.linspace(0, 1, attack)
env[-release:] = np.linspace(1, 0, release)
wave *= env
wave /= np.max(np.abs(wave))
return (wave * (2**15 - 1)).astype(np.int16)
@staticmethod @staticmethod
def _crossfade(w1: np.ndarray, w2: np.ndarray, fade_len: int) -> np.ndarray: def _generate_complex_wave(
# Ensure fade_len less than each frequency: float,
fade_len = min(fade_len, len(w1), len(w2)) duration: float,
fade_out = np.linspace(1, 0, fade_len) harmonics: int | None = None,
fade_in = np.linspace(0, 1, fade_len) ) -> np.ndarray:
w1_end = w1[-fade_len:] * fade_out if harmonics is None:
w2_start = w2[:fade_len] * fade_in harmonics = Sound.complexity_factor
middle = (w1_end + w2_start).astype(np.int16)
return np.concatenate([w1[:-fade_len], middle, w2[fade_len:]])
@staticmethod t = np.linspace(0, duration, int(Sound.fs * duration), False)
def _play_via_system(wave: np.ndarray): wave = np.zeros_like(t)
# Write a temp WAV and play it via available system player
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f:
fname = f.name
try:
with wavmod.open(fname, "wb") as w:
w.setnchannels(1)
w.setsampwidth(2)
w.setframerate(Sound.fs)
w.writeframes(wave.tobytes())
def run(cmd):
return subprocess.run(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
).returncode == 0
# Preferred order: PipeWire → PulseAudio → ALSA → ffplay
if shutil.which("pw-play") and run(["pw-play", fname]): return
if shutil.which("paplay") and run(["paplay", fname]): return
if shutil.which("aplay") and run(["aplay", "-q", fname]): return
if shutil.which("ffplay") and run(["ffplay", "-autoexit", "-nodisp", fname]): return
# Last resort if no system player exists: simpleaudio
play_obj = sa.play_buffer(wave, 1, 2, Sound.fs)
play_obj.wait_done()
finally:
try: os.unlink(fname)
except Exception: pass
@staticmethod for n in range(1, harmonics + 1):
def _play(wave: np.ndarray): wave += (1 / n) * np.sin(2 * np.pi * frequency * n * t)
# Switch via env: system | simpleaudio | auto (default)
backend = os.getenv("INFINITO_AUDIO_BACKEND", "auto").lower() # ADSR envelope
if backend == "system": attack = int(0.02 * Sound.fs)
return Sound._play_via_system(wave) release = int(0.05 * Sound.fs)
if backend == "simpleaudio": env = np.ones_like(wave)
play_obj = sa.play_buffer(wave, 1, 2, Sound.fs) env[:attack] = np.linspace(0, 1, attack)
play_obj.wait_done() env[-release:] = np.linspace(1, 0, release)
wave *= env
wave /= np.max(np.abs(wave))
return (wave * (2**15 - 1)).astype(np.int16)
@staticmethod
def _crossfade(w1: np.ndarray, w2: np.ndarray, fade_len: int) -> np.ndarray:
fade_len = min(fade_len, len(w1), len(w2))
if fade_len <= 0:
return np.concatenate([w1, w2])
fade_out = np.linspace(1, 0, fade_len)
fade_in = np.linspace(0, 1, fade_len)
w1_end = w1[-fade_len:].astype(np.float32) * fade_out
w2_start = w2[:fade_len].astype(np.float32) * fade_in
middle = (w1_end + w2_start).astype(np.int16)
return np.concatenate([w1[:-fade_len], middle, w2[fade_len:]])
@staticmethod
def _play_via_system(wave: np.ndarray) -> None:
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f:
fname = f.name
try:
with wavmod.open(fname, "wb") as w:
w.setnchannels(1)
w.setsampwidth(2)
w.setframerate(Sound.fs)
w.writeframes(wave.tobytes())
def run(cmd: list[str]) -> bool:
return (
subprocess.run(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode
== 0
)
if shutil.which("pw-play") and run(["pw-play", fname]):
return return
# auto: try simpleaudio first; if it fails, fall back to system if shutil.which("paplay") and run(["paplay", fname]):
return
if shutil.which("aplay") and run(["aplay", "-q", fname]):
return
if shutil.which("ffplay") and run(["ffplay", "-autoexit", "-nodisp", fname]):
return
play_obj = sa.play_buffer(wave, 1, 2, Sound.fs)
play_obj.wait_done()
finally:
try: try:
play_obj = sa.play_buffer(wave, 1, 2, Sound.fs) os.unlink(fname)
play_obj.wait_done()
except Exception: except Exception:
Sound._play_via_system(wave) pass
@classmethod @staticmethod
def play_infinito_intro_sound(cls): def _play(wave: np.ndarray) -> None:
# Phase durations backend = os.getenv("INFINITO_AUDIO_BACKEND", "auto").lower()
build_time = 10.0
celebr_time = 12.0
breakdown_time = 10.0
overlap = 3.0 # seconds of crossfade
bass_seg = 0.125 # 1/8s kick
melody_seg = 0.25 # 2/8s melody
bass_freq = 65.41 # C2 kick
melody_freqs = [261.63, 293.66, 329.63, 392.00, 440.00, 523.25]
# Build-up phase if backend == "system":
steps = int(build_time / (bass_seg + melody_seg)) Sound._play_via_system(wave)
build_seq = [] return
for i in range(steps):
amp = (i + 1) / steps
b = cls._generate_complex_wave(bass_freq, bass_seg).astype(np.float32) * amp
m = cls._generate_complex_wave(melody_freqs[i % len(melody_freqs)], melody_seg).astype(np.float32) * amp
build_seq.append(b.astype(np.int16))
build_seq.append(m.astype(np.int16))
build_wave = np.concatenate(build_seq)
# Celebration phase: 7 descending-fifth chords if backend == "simpleaudio":
roots = [523.25, 349.23, 233.08, 155.56, 103.83, 69.30, 46.25] play_obj = sa.play_buffer(wave, 1, 2, Sound.fs)
chord_time = celebr_time / len(roots) play_obj.wait_done()
celebr_seq = [] return
for root in roots:
t = np.linspace(0, chord_time, int(cls.fs * chord_time), False)
chord = sum(np.sin(2 * np.pi * f * t) for f in [root, root * 5/4, root * 3/2])
chord /= np.max(np.abs(chord))
celebr_seq.append((chord * (2**15 - 1)).astype(np.int16))
celebr_wave = np.concatenate(celebr_seq)
# Breakdown phase (mirror of build-up) # auto
breakdown_wave = np.concatenate(list(reversed(build_seq))) try:
play_obj = sa.play_buffer(wave, 1, 2, Sound.fs)
play_obj.wait_done()
except Exception:
Sound._play_via_system(wave)
# Crossfade transitions @classmethod
fade_samples = int(overlap * cls.fs) def play_infinito_intro_sound(cls) -> None:
bc = cls._crossfade(build_wave, celebr_wave, fade_samples) build_time = 10.0
full = cls._crossfade(bc, breakdown_wave, fade_samples) celebr_time = 12.0
breakdown_time = 10.0
overlap = 3.0
cls._play(full) bass_seg = 0.125
melody_seg = 0.25
bass_freq = 65.41
melody_freqs = [261.63, 293.66, 329.63, 392.00, 440.00, 523.25]
@classmethod steps = int(build_time / (bass_seg + melody_seg))
def play_start_sound(cls): build_seq: list[np.ndarray] = []
freqs = [523.25, 659.26, 783.99, 880.00, 1046.50, 1174.66]
cls._prepare_and_play(freqs)
@classmethod for i in range(steps):
def play_finished_successfully_sound(cls): amp = (i + 1) / steps
freqs = [523.25, 587.33, 659.26, 783.99, 880.00, 987.77] b = cls._generate_complex_wave(bass_freq, bass_seg).astype(np.float32) * amp
cls._prepare_and_play(freqs) m = cls._generate_complex_wave(
melody_freqs[i % len(melody_freqs)], melody_seg
).astype(np.float32) * amp
build_seq.append(b.astype(np.int16))
build_seq.append(m.astype(np.int16))
@classmethod build_wave = np.concatenate(build_seq)
def play_finished_failed_sound(cls):
freqs = [880.00, 830.61, 783.99, 659.26, 622.25, 523.25]
durations = [0.4, 0.3, 0.25, 0.25, 0.25, 0.25]
cls._prepare_and_play(freqs, durations)
@classmethod roots = [523.25, 349.23, 233.08, 155.56, 103.83, 69.30, 46.25]
def play_warning_sound(cls): chord_time = celebr_time / len(roots)
freqs = [700.00, 550.00, 750.00, 500.00, 800.00, 450.00] celebr_seq: list[np.ndarray] = []
cls._prepare_and_play(freqs)
@classmethod for root in roots:
def _prepare_and_play(cls, freqs, durations=None): t = np.linspace(0, chord_time, int(cls.fs * chord_time), False)
count = len(freqs) chord = sum(np.sin(2 * np.pi * f * t) for f in [root, root * 5 / 4, root * 3 / 2])
if durations is None: chord /= np.max(np.abs(chord))
durations = [cls.max_length / count] * count celebr_seq.append((chord * (2**15 - 1)).astype(np.int16))
else:
total = sum(durations) celebr_wave = np.concatenate(celebr_seq)
durations = [d * cls.max_length / total for d in durations] breakdown_wave = np.concatenate(list(reversed(build_seq)))
waves = [cls._generate_complex_wave(f, d) for f, d in zip(freqs, durations)]
cls._play(np.concatenate(waves)) fade_samples = int(overlap * cls.fs)
except Exception: bc = cls._crossfade(build_wave, celebr_wave, fade_samples)
warnings.warn("Sound support disabled: numpy or simpleaudio could not be imported", RuntimeWarning) full = cls._crossfade(bc, breakdown_wave, fade_samples)
Sound = DummySound
cls._play(full)
@classmethod
def play_start_sound(cls) -> None:
freqs = [523.25, 659.26, 783.99, 880.00, 1046.50, 1174.66]
cls._prepare_and_play(freqs)
@classmethod
def play_finished_successfully_sound(cls) -> None:
freqs = [523.25, 587.33, 659.26, 783.99, 880.00, 987.77]
cls._prepare_and_play(freqs)
@classmethod
def play_finished_failed_sound(cls) -> None:
freqs = [880.00, 830.61, 783.99, 659.26, 622.25, 523.25]
durations = [0.4, 0.3, 0.25, 0.25, 0.25, 0.25]
cls._prepare_and_play(freqs, durations)
@classmethod
def play_warning_sound(cls) -> None:
freqs = [700.00, 550.00, 750.00, 500.00, 800.00, 450.00]
cls._prepare_and_play(freqs)
@classmethod
def _prepare_and_play(cls, freqs: list[float], durations: list[float] | None = None) -> None:
count = len(freqs)
if durations is None:
durations = [cls.max_length / count] * count
else:
total = sum(durations)
durations = [d * cls.max_length / total for d in durations]
waves = [cls._generate_complex_wave(f, d) for f, d in zip(freqs, durations)]
cls._play(np.concatenate(waves))
except ImportError as exc:
warnings.warn(f"Sound support disabled: {exc}", RuntimeWarning)
Sound = DummySound

48
pyproject.toml Normal file
View File

@@ -0,0 +1,48 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "infinito-nexus"
version = "0.0.0"
description = "Infinito.Nexus"
readme = "README.md"
requires-python = ">=3.10"
license = { file = "LICENSE.md" }
dependencies = [
"numpy",
"ansible",
"colorscheme-generator @ https://github.com/kevinveenbirkenbach/colorscheme-generator/archive/refs/tags/v0.3.0.zip",
"bcrypt",
"ruamel.yaml",
"PyYAML",
"tld",
"passlib",
"requests",
]
[project.optional-dependencies]
audio = [
"simpleaudio",
]
[tool.setuptools]
# Non-src layout: explicitly control packaged modules
packages = { find = { where = ["."], include = [
"cli*",
"filter_plugins*",
"lookup_plugins*",
"module_utils*",
"library*",
], exclude = [
"roles*",
"assets*",
"docs*",
"templates*",
"logs*",
"tasks*",
"tests*",
"__pycache__*",
] } }
include-package-data = true

View File

@@ -1,9 +0,0 @@
colorscheme-generator @ https://github.com/kevinveenbirkenbach/colorscheme-generator/archive/refs/tags/v0.3.0.zip
numpy
bcrypt
ruamel.yaml
tld
passlib
requests
ansible
pytest

View File

@@ -2,8 +2,3 @@ collections:
- name: kewlfft.aur - name: kewlfft.aur
- name: community.general - name: community.general
- name: hetzner.hcloud - name: hetzner.hcloud
yay:
- python-simpleaudio
- python-numpy
pacman:
- ansible

View File

@@ -1,29 +0,0 @@
# Installation Guide
1. **Navigate to the Docker Compose Directory**
Change into the directory where the Docker Compose files reside.
```bash
cd {{ PATH_DOCKER_COMPOSE_INSTANCES }}akaunting/
```
2. **Set Environment Variables**
Ensure timeouts are increased to handle long operations:
```bash
export COMPOSE_HTTP_TIMEOUT=600
export DOCKER_CLIENT_TIMEOUT=600
```
3. **Start Akaunting Service**
Run the setup command with the `AKAUNTING_SETUP` variable:
```bash
AKAUNTING_SETUP=true docker-compose -p akaunting up -d
```
4. **Finalizing Setup**
After verifying that the web interface works, restart services:
```bash
docker-compose down
docker-compose -p akaunting up -d
```
For further details, visit the [Akaunting Documentation](https://akaunting.com/) and the [Akaunting GitHub Repository](https://github.com/akaunting/docker).

View File

@@ -1,29 +0,0 @@
# Administration
## track docker container status
```bash
watch -n 2 "docker ps -a | grep peertube"
```
## clean rebuild
```bash
cd {{ PATH_DOCKER_COMPOSE_INSTANCES }}peertube/ &&
docker-compose down
docker volume rm peertube_assets peertube_config peertube_data peertube_database peertube_redis
docker-compose up -d
```
## access terminal
```bash
docker-compose exec -it application /bin/bash
```
## update config
```bash
apt update && apt install nano && nano ./config/default.yaml
```
## get root pasword
```bash
docker logs peertube-application-1 | grep -A1 root
```

View File

@@ -5,4 +5,4 @@
- name: "configure pgadmin servers" - name: "configure pgadmin servers"
include_tasks: configuration.yml include_tasks: configuration.yml
when: applications | get_app_conf(application_id, 'server_mode', True) | bool when: applications | get_app_conf(application_id, 'server_mode') | bool

View File

@@ -3,6 +3,6 @@
name: sys-stk-full-stateless name: sys-stk-full-stateless
vars: vars:
docker_compose_flush_handlers: true docker_compose_flush_handlers: true
docker_git_repository_address: "https://github.com/kevinveenbirkenbach/roulette-wheel.git" docker_git_repository_address: "https://github.com/kevinveenbirkenbach/roulette-wheel.git"
docker_git_repository_pull: true docker_git_repository_pull: true
docker_git_repository_branch: "master" docker_git_repository_branch: "master"

View File

@@ -16,6 +16,8 @@
- name: "load docker, proxy for '{{ application_id }}'" - name: "load docker, proxy for '{{ application_id }}'"
include_role: include_role:
name: sys-stk-full-stateless name: sys-stk-full-stateless
vars:
application_id: "web-app-sphinx"
# Hack because it wasn't possible to fix an handler bug in pkgmgr install # Hack because it wasn't possible to fix an handler bug in pkgmgr install
- name: „Trigger“ docker compose up - name: „Trigger“ docker compose up

View File

@@ -19,5 +19,5 @@ url: "{{ WEB_PROTOCOL }}://<< defaults_applications.web-svc-file.domains.canonic
``` ```
- The `<< ... >>` placeholders are resolved by the [`DictRenderer`](../../../utils/dict_renderer.py) helper class. - The `<< ... >>` placeholders are resolved by the [`DictRenderer`](../../../utils/dict_renderer.py) helper class.
- The CLI uses the [`DefaultsGenerator`](../../../cli/build/defaults/applications.py) class to merge all role configurations into a single YAML and then calls the renderer to substitute each `<< ... >>` occurrence. - The CLI uses the [`DefaultsGenerator`](../../../cli/setup/applications.py) class to merge all role configurations into a single YAML and then calls the renderer to substitute each `<< ... >>` occurrence.
- Use the `--verbose` flag on the CLI script to log every replacement step, and rely on the builtin timeout (default: 10 seconds) to prevent infinite loops. - Use the `--verbose` flag on the CLI script to log every replacement step, and rely on the builtin timeout (default: 10 seconds) to prevent infinite loops.

View File

@@ -1,7 +1,7 @@
import unittest import unittest
from pathlib import Path from pathlib import Path
import re import re
from typing import Any, Iterable, Set, List from typing import Any, Iterable, Set, List, Dict, Tuple
import yaml import yaml
@@ -10,6 +10,7 @@ class TestVarsPassedAreUsed(unittest.TestCase):
Integration test: Integration test:
- Walk all *.yml/*.yaml and *.j2 files - Walk all *.yml/*.yaml and *.j2 files
- Collect variable names passed via task-level `vars:` - Collect variable names passed via task-level `vars:`
AND remember where they were defined (file + line)
- Consider a var "used" if it appears in ANY of: - Consider a var "used" if it appears in ANY of:
Jinja output blocks: {{ ... var_name ... }} Jinja output blocks: {{ ... var_name ... }}
Jinja statement blocks: {% ... var_name ... %} Jinja statement blocks: {% ... var_name ... %}
@@ -52,19 +53,64 @@ class TestVarsPassedAreUsed(unittest.TestCase):
for item in node: for item in node:
yield from self._walk_mapping(item) yield from self._walk_mapping(item)
# ---------- Collect vars passed via `vars:` ---------- # ---------- Collect vars passed via `vars:` (with locations) ----------
def _collect_vars_passed(self) -> Set[str]: def _collect_vars_passed_with_locations(self) -> Tuple[Set[str], Dict[str, Set[Tuple[Path, int]]]]:
"""
Returns:
- a set of all var names passed via `vars:`
- a mapping var_name -> set of (path, line_number) where that var is defined under a vars: block
Line numbers are best-effort based on raw text scanning (not YAML AST),
because PyYAML doesn't preserve line info.
"""
collected: Set[str] = set() collected: Set[str] = set()
locations: Dict[str, Set[Tuple[Path, int]]] = {}
# Regex-based scan for:
# <indent>vars:
# <more-indent>key:
vars_block_re = re.compile(r"^(\s*)vars:\s*$")
key_re = re.compile(r"^(\s*)([A-Za-z_][A-Za-z0-9_]*)\s*:")
for yml in self._iter_files(self.YAML_EXTENSIONS): for yml in self._iter_files(self.YAML_EXTENSIONS):
docs = self._load_yaml_documents(yml) try:
for doc in docs: lines = yml.read_text(encoding="utf-8").splitlines()
for mapping in self._walk_mapping(doc): except Exception:
if "vars" in mapping and isinstance(mapping["vars"], dict): continue
for k in mapping["vars"].keys():
if isinstance(k, str) and k.strip(): i = 0
collected.add(k.strip()) while i < len(lines):
return collected m = vars_block_re.match(lines[i])
if not m:
i += 1
continue
base_indent = len(m.group(1))
i += 1
while i < len(lines):
line = lines[i]
# allow blank lines inside vars block
if not line.strip():
i += 1
continue
indent = len(line) - len(line.lstrip(" "))
# end of vars block when indentation drops back
if indent <= base_indent:
break
km = key_re.match(line)
if km:
key = km.group(2).strip()
if key:
collected.add(key)
locations.setdefault(key, set()).add((yml, i + 1)) # 1-based line number
i += 1
return collected, locations
# ---------- Gather text for Jinja usage scanning ---------- # ---------- Gather text for Jinja usage scanning ----------
@@ -114,15 +160,12 @@ class TestVarsPassedAreUsed(unittest.TestCase):
We use a tempered regex to avoid stopping at the first '}}'/'%}' and a negative lookahead We use a tempered regex to avoid stopping at the first '}}'/'%}' and a negative lookahead
`(?!\\s*\\()` after the token. `(?!\\s*\\()` after the token.
""" """
# Word token not followed by '(' → real variable usage
token = r"\b" + re.escape(var_name) + r"\b(?!\s*\()" token = r"\b" + re.escape(var_name) + r"\b(?!\s*\()"
# Output blocks: {{ ... }}
pat_output = re.compile( pat_output = re.compile(
r"{{(?:(?!}}).)*" + token + r"(?:(?!}}).)*}}", r"{{(?:(?!}}).)*" + token + r"(?:(?!}}).)*}}",
re.DOTALL, re.DOTALL,
) )
# Statement blocks: {% ... %}
pat_stmt = re.compile( pat_stmt = re.compile(
r"{%(?:(?!%}).)*" + token + r"(?:(?!%}).)*%}", r"{%(?:(?!%}).)*" + token + r"(?:(?!%}).)*%}",
re.DOTALL, re.DOTALL,
@@ -140,7 +183,7 @@ class TestVarsPassedAreUsed(unittest.TestCase):
# ---------- Test ---------- # ---------- Test ----------
def test_vars_passed_are_used_in_yaml_or_jinja(self): def test_vars_passed_are_used_in_yaml_or_jinja(self):
vars_passed = self._collect_vars_passed() vars_passed, vars_locations = self._collect_vars_passed_with_locations()
self.assertTrue( self.assertTrue(
vars_passed, vars_passed,
"No variables passed via `vars:` were found. " "No variables passed via `vars:` were found. "
@@ -157,18 +200,34 @@ class TestVarsPassedAreUsed(unittest.TestCase):
or self._used_in_ansible_exprs(var_name, ansible_exprs) or self._used_in_ansible_exprs(var_name, ansible_exprs)
) )
if not used: if not used:
if var_name not in ['ansible_python_interpreter']: if var_name not in ["ansible_python_interpreter"]:
unused.append(var_name) unused.append(var_name)
if unused: if unused:
msg = ( lines: List[str] = []
lines.append(
"The following variables are passed via `vars:` but never referenced in:\n" "The following variables are passed via `vars:` but never referenced in:\n"
" • Jinja output/statement blocks ({{ ... }} / {% ... %}) OR\n" " • Jinja output/statement blocks ({{ ... }} / {% ... %}) OR\n"
" • Ansible expressions (when/loop/with_*)\n\n" " • Ansible expressions (when/loop/with_*)\n"
+ "\n".join(f" - {v}" for v in unused)
+ "\n\nNotes:\n"
" • Function-like tokens (name followed by '(') are ignored intentionally.\n"
" • If a var is only used in Python code or other file types, extend the test accordingly\n"
" or remove the var if it's truly unused."
) )
self.fail(msg)
for v in unused:
lines.append(f"- {v}")
locs = sorted(
vars_locations.get(v, set()),
key=lambda t: (str(t[0]), t[1]),
)
if locs:
for path, lineno in locs:
rel = path.relative_to(self.REPO_ROOT)
lines.append(f"{rel}:{lineno}")
else:
lines.append(" • (location unknown)")
lines.append(
"\nNotes:\n"
" • Function-like tokens (name followed by '(') are ignored intentionally.\n"
" • If a var is only used in Python code or other file types, extend the test accordingly\n"
" or remove the var if it's truly unused."
)
self.fail("\n".join(lines))

View File

@@ -230,12 +230,12 @@ class TestRunAnsiblePlaybook(unittest.TestCase):
"Expected 'make clean' when MODE_CLEANUP is true", "Expected 'make clean' when MODE_CLEANUP is true",
) )
self.assertTrue( self.assertTrue(
any(call == ["make", "messy-build"] for call in calls), any(call == ["make", "setup"] for call in calls),
"Expected 'make messy-build' when skip_build=False", "Expected 'make setup' when skip_build=False",
) )
self.assertTrue( self.assertTrue(
any(call == ["make", "messy-test"] for call in calls), any(call == ["make", "test-messy"] for call in calls),
"Expected 'make messy-test' when skip_tests=False", "Expected 'make test-messy' when skip_tests=False",
) )
self.assertTrue( self.assertTrue(
any( any(
@@ -329,8 +329,8 @@ class TestRunAnsiblePlaybook(unittest.TestCase):
) )
# No cleanup, no build, no tests, no inventory validation # No cleanup, no build, no tests, no inventory validation
self.assertFalse(any(call == ["make", "clean"] for call in calls)) self.assertFalse(any(call == ["make", "clean"] for call in calls))
self.assertFalse(any(call == ["make", "messy-build"] for call in calls)) self.assertFalse(any(call == ["make", "setup"] for call in calls))
self.assertFalse(any(call == ["make", "messy-test"] for call in calls)) self.assertFalse(any(call == ["make", "test-messy"] for call in calls))
self.assertFalse( self.assertFalse(
any( any(
isinstance(call, list) isinstance(call, list)

View File

View File

@@ -10,7 +10,7 @@ import subprocess
class TestGenerateDefaultApplications(unittest.TestCase): class TestGenerateDefaultApplications(unittest.TestCase):
def setUp(self): def setUp(self):
# Path to the generator script under test # Path to the generator script under test
self.script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "applications.py" self.script_path = Path(__file__).resolve().parents[4] / "cli" / "setup" / "applications.py"
# Create temp role structure # Create temp role structure
self.temp_dir = Path(tempfile.mkdtemp()) self.temp_dir = Path(tempfile.mkdtemp())
self.roles_dir = self.temp_dir / "roles" self.roles_dir = self.temp_dir / "roles"
@@ -32,7 +32,7 @@ class TestGenerateDefaultApplications(unittest.TestCase):
shutil.rmtree(self.temp_dir) shutil.rmtree(self.temp_dir)
def test_script_generates_expected_yaml(self): def test_script_generates_expected_yaml(self):
script_path = Path(__file__).resolve().parent.parent.parent.parent.parent.parent / "cli/build/defaults/applications.py" script_path = Path(__file__).resolve().parent.parent.parent.parent.parent / "cli/setup/applications.py"
result = subprocess.run( result = subprocess.run(
[ [

View File

@@ -45,7 +45,7 @@ class TestGenerateDefaultApplicationsUsers(unittest.TestCase):
When a users.yml exists with defined users, the script should inject a 'users' When a users.yml exists with defined users, the script should inject a 'users'
mapping in the generated YAML, mapping each username to a Jinja2 reference. mapping in the generated YAML, mapping each username to a Jinja2 reference.
""" """
script_path = Path(__file__).resolve().parents[5] / "cli" / "build/defaults/applications.py" script_path = Path(__file__).resolve().parents[4] / "cli" / "setup/applications.py"
result = subprocess.run([ result = subprocess.run([
"python3", str(script_path), "python3", str(script_path),
"--roles-dir", str(self.roles_dir), "--roles-dir", str(self.roles_dir),

View File

@@ -1,59 +1,56 @@
import os
import sys
import unittest import unittest
import tempfile import tempfile
import shutil import shutil
import os
import yaml import yaml
from collections import OrderedDict from collections import OrderedDict
# Add cli/ to import path from cli.setup import users
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../..", "cli/build/defaults/")))
import users
class TestGenerateUsers(unittest.TestCase): class TestGenerateUsers(unittest.TestCase):
def test_build_users_auto_increment_and_overrides(self): def test_build_users_auto_increment_and_overrides(self):
defs = { defs = {
'alice': {}, "alice": {},
'bob': {'uid': 2000, 'email': 'bob@custom.com', 'description': 'Custom user'}, "bob": {"uid": 2000, "email": "bob@custom.com", "description": "Custom user"},
'carol': {} "carol": {},
} }
build = users.build_users( build = users.build_users(
defs=defs, defs=defs,
primary_domain='example.com', primary_domain="example.com",
start_id=1001, start_id=1001,
become_pwd='pw' become_pwd="pw",
) )
# alice should get uid/gid 1001 # alice should get uid/gid 1001
self.assertEqual(build['alice']['uid'], 1001) self.assertEqual(build["alice"]["uid"], 1001)
self.assertEqual(build['alice']['gid'], 1001) self.assertEqual(build["alice"]["gid"], 1001)
self.assertEqual(build['alice']['email'], 'alice@example.com') self.assertEqual(build["alice"]["email"], "alice@example.com")
# bob overrides # bob overrides
self.assertEqual(build['bob']['uid'], 2000) self.assertEqual(build["bob"]["uid"], 2000)
self.assertEqual(build['bob']['gid'], 2000) self.assertEqual(build["bob"]["gid"], 2000)
self.assertEqual(build['bob']['email'], 'bob@custom.com') self.assertEqual(build["bob"]["email"], "bob@custom.com")
self.assertIn('description', build['bob']) self.assertIn("description", build["bob"])
# carol should get next free id = 1002 # carol should get next free id = 1002
self.assertEqual(build['carol']['uid'], 1002) self.assertEqual(build["carol"]["uid"], 1002)
self.assertEqual(build['carol']['gid'], 1002) self.assertEqual(build["carol"]["gid"], 1002)
def test_build_users_default_lookup_password(self): def test_build_users_default_lookup_password(self):
""" """
When no 'password' override is provided, When no 'password' override is provided,
the become_pwd lookup template string must be used as the password. the become_pwd lookup template string must be used as the password.
""" """
defs = {'frank': {}} defs = {"frank": {}}
lookup_template = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}' lookup_template = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}'
build = users.build_users( build = users.build_users(
defs=defs, defs=defs,
primary_domain='example.com', primary_domain="example.com",
start_id=1001, start_id=1001,
become_pwd=lookup_template become_pwd=lookup_template,
) )
self.assertEqual( self.assertEqual(
build['frank']['password'], build["frank"]["password"],
lookup_template, lookup_template,
"The lookup template string was not correctly applied as the default password" "The lookup template string was not correctly applied as the default password",
) )
def test_build_users_override_password(self): def test_build_users_override_password(self):
@@ -61,72 +58,71 @@ class TestGenerateUsers(unittest.TestCase):
When a 'password' override is provided, When a 'password' override is provided,
that custom password must be used instead of become_pwd. that custom password must be used instead of become_pwd.
""" """
defs = {'eva': {'password': 'custompw'}} defs = {"eva": {"password": "custompw"}}
lookup_template = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}' lookup_template = '{{ lookup("password", "/dev/null length=42 chars=ascii_letters,digits") }}'
build = users.build_users( build = users.build_users(
defs=defs, defs=defs,
primary_domain='example.com', primary_domain="example.com",
start_id=1001, start_id=1001,
become_pwd=lookup_template become_pwd=lookup_template,
) )
self.assertEqual( self.assertEqual(
build['eva']['password'], build["eva"]["password"],
'custompw', "custompw",
"The override password was not correctly applied" "The override password was not correctly applied",
) )
def test_build_users_duplicate_override_uid(self): def test_build_users_duplicate_override_uid(self):
defs = { defs = {
'u1': {'uid': 1001}, "u1": {"uid": 1001},
'u2': {'uid': 1001} "u2": {"uid": 1001},
} }
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
users.build_users(defs, 'ex.com', 1001, 'pw') users.build_users(defs, "ex.com", 1001, "pw")
def test_build_users_shared_gid_allowed(self): def test_build_users_shared_gid_allowed(self):
# Allow two users to share the same GID when one overrides gid and the other uses that as uid # Allow two users to share the same GID when one overrides gid and the other uses that as uid
defs = { defs = {
'a': {'uid': 1500}, "a": {"uid": 1500},
'b': {'gid': 1500} "b": {"gid": 1500},
} }
build = users.build_users(defs, 'ex.com', 1500, 'pw') build = users.build_users(defs, "ex.com", 1500, "pw")
# Both should have gid 1500 # Both should have gid 1500
self.assertEqual(build['a']['gid'], 1500) self.assertEqual(build["a"]["gid"], 1500)
self.assertEqual(build['b']['gid'], 1500) self.assertEqual(build["b"]["gid"], 1500)
def test_build_users_duplicate_username_email(self): def test_build_users_duplicate_username_email(self):
defs = { defs = {
'u1': {'username': 'same', 'email': 'same@ex.com'}, "u1": {"username": "same", "email": "same@ex.com"},
'u2': {'username': 'same'} "u2": {"username": "same"},
} }
# second user with same username should raise # second user with same username should raise
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
users.build_users(defs, 'ex.com', 1001, 'pw') users.build_users(defs, "ex.com", 1001, "pw")
def test_dictify_converts_ordereddict(self): def test_dictify_converts_ordereddict(self):
od = users.OrderedDict([('a', 1), ('b', {'c': 2})]) od = users.OrderedDict([("a", 1), ("b", {"c": 2})])
result = users.dictify(OrderedDict(od)) result = users.dictify(OrderedDict(od))
self.assertIsInstance(result, dict) self.assertIsInstance(result, dict)
self.assertEqual(result, {'a': 1, 'b': {'c': 2}}) self.assertEqual(result, {"a": 1, "b": {"c": 2}})
def test_load_user_defs_and_conflict(self): def test_load_user_defs_and_conflict(self):
# create temp roles structure # create temp roles structure
tmp = tempfile.mkdtemp() tmp = tempfile.mkdtemp()
try: try:
os.makedirs(os.path.join(tmp, 'role1/users')) os.makedirs(os.path.join(tmp, "role1/users"))
os.makedirs(os.path.join(tmp, 'role2/users')) os.makedirs(os.path.join(tmp, "role2/users"))
# role1 defines user x # role1 defines user x
with open(os.path.join(tmp, 'role1/users/main.yml'), 'w') as f: with open(os.path.join(tmp, "role1/users/main.yml"), "w") as f:
yaml.safe_dump({'users': {'x': {'email': 'x@a'}}}, f) yaml.safe_dump({"users": {"x": {"email": "x@a"}}}, f)
# role2 defines same user x with same value # role2 defines same user x with same value
with open(os.path.join(tmp, 'role2/users/main.yml'), 'w') as f: with open(os.path.join(tmp, "role2/users/main.yml"), "w") as f:
yaml.safe_dump({'users': {'x': {'email': 'x@a'}}}, f) yaml.safe_dump({"users": {"x": {"email": "x@a"}}}, f)
defs = users.load_user_defs(tmp) defs = users.load_user_defs(tmp)
self.assertIn('x', defs) self.assertIn("x", defs)
# now conflict definition # now conflict definition
with open(os.path.join(tmp, 'role2/users/main.yml'), 'w') as f: with open(os.path.join(tmp, "role2/users/main.yml"), "w") as f:
yaml.safe_dump({'users': {'x': {'email': 'x@b'}}}, f) yaml.safe_dump({"users": {"x": {"email": "x@b"}}}, f)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
users.load_user_defs(tmp) users.load_user_defs(tmp)
finally: finally:
@@ -136,7 +132,6 @@ class TestGenerateUsers(unittest.TestCase):
""" """
Ensure that default_users keys are written in alphabetical order. Ensure that default_users keys are written in alphabetical order.
""" """
import tempfile
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -147,10 +142,10 @@ class TestGenerateUsers(unittest.TestCase):
# Create multiple roles with users in unsorted order # Create multiple roles with users in unsorted order
for role, users_map in [ for role, users_map in [
("role-zeta", {"zeta": {"email": "z@ex"}}), ("role-zeta", {"zeta": {"email": "z@ex"}}),
("role-alpha", {"alpha": {"email": "a@ex"}}), ("role-alpha", {"alpha": {"email": "a@ex"}}),
("role-mu", {"mu": {"email": "m@ex"}}), ("role-mu", {"mu": {"email": "m@ex"}}),
("role-beta", {"beta": {"email": "b@ex"}}), ("role-beta", {"beta": {"email": "b@ex"}}),
]: ]:
(roles_dir / role / "users").mkdir(parents=True, exist_ok=True) (roles_dir / role / "users").mkdir(parents=True, exist_ok=True)
with open(roles_dir / role / "users" / "main.yml", "w") as f: with open(roles_dir / role / "users" / "main.yml", "w") as f:
@@ -158,15 +153,20 @@ class TestGenerateUsers(unittest.TestCase):
out_file = tmpdir / "users.yml" out_file = tmpdir / "users.yml"
# Resolve script path like in other tests (relative to repo root) # Always resolve the real script path from the imported module
script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "users.py" script_path = Path(users.__file__).resolve()
# Run generator
result = subprocess.run( result = subprocess.run(
["python3", str(script_path), [
"--roles-dir", str(roles_dir), "python3",
"--output", str(out_file)], str(script_path),
capture_output=True, text=True "--roles-dir",
str(roles_dir),
"--output",
str(out_file),
],
capture_output=True,
text=True,
) )
self.assertEqual(result.returncode, 0, msg=result.stderr) self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertTrue(out_file.exists(), "Output file was not created.") self.assertTrue(out_file.exists(), "Output file was not created.")
@@ -176,24 +176,21 @@ class TestGenerateUsers(unittest.TestCase):
users_map = data["default_users"] users_map = data["default_users"]
keys_in_file = list(users_map.keys()) keys_in_file = list(users_map.keys())
# Expect alphabetical order
self.assertEqual( self.assertEqual(
keys_in_file, sorted(keys_in_file), keys_in_file,
msg=f"Users are not sorted alphabetically: {keys_in_file}" sorted(keys_in_file),
msg=f"Users are not sorted alphabetically: {keys_in_file}",
) )
# Sanity: all expected keys present
for k in ["alpha", "beta", "mu", "zeta"]: for k in ["alpha", "beta", "mu", "zeta"]:
self.assertIn(k, users_map) self.assertIn(k, users_map)
finally: finally:
shutil.rmtree(tmpdir) shutil.rmtree(tmpdir)
def test_cli_users_sorting_stable_across_runs(self): def test_cli_users_sorting_stable_across_runs(self):
""" """
Running the generator multiple times yields identical content (stable sort). Running the generator multiple times yields identical content (stable sort).
""" """
import tempfile
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -202,7 +199,6 @@ class TestGenerateUsers(unittest.TestCase):
roles_dir = tmpdir / "roles" roles_dir = tmpdir / "roles"
roles_dir.mkdir() roles_dir.mkdir()
# Unsorted creation order on purpose
cases = [ cases = [
("role-d", {"duser": {"email": "d@ex"}}), ("role-d", {"duser": {"email": "d@ex"}}),
("role-a", {"auser": {"email": "a@ex"}}), ("role-a", {"auser": {"email": "a@ex"}}),
@@ -215,35 +211,45 @@ class TestGenerateUsers(unittest.TestCase):
yaml.safe_dump({"users": users_map}, f) yaml.safe_dump({"users": users_map}, f)
out_file = tmpdir / "users.yml" out_file = tmpdir / "users.yml"
script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "users.py" script_path = Path(users.__file__).resolve()
# First run
r1 = subprocess.run( r1 = subprocess.run(
["python3", str(script_path), [
"--roles-dir", str(roles_dir), "python3",
"--output", str(out_file)], str(script_path),
capture_output=True, text=True "--roles-dir",
str(roles_dir),
"--output",
str(out_file),
],
capture_output=True,
text=True,
) )
self.assertEqual(r1.returncode, 0, msg=r1.stderr) self.assertEqual(r1.returncode, 0, msg=r1.stderr)
content1 = out_file.read_text() content1 = out_file.read_text()
# Touch dirs to shuffle filesystem mtimes
for p in roles_dir.iterdir(): for p in roles_dir.iterdir():
os.utime(p, None) os.utime(p, None)
# Second run
r2 = subprocess.run( r2 = subprocess.run(
["python3", str(script_path), [
"--roles-dir", str(roles_dir), "python3",
"--output", str(out_file)], str(script_path),
capture_output=True, text=True "--roles-dir",
str(roles_dir),
"--output",
str(out_file),
],
capture_output=True,
text=True,
) )
self.assertEqual(r2.returncode, 0, msg=r2.stderr) self.assertEqual(r2.returncode, 0, msg=r2.stderr)
content2 = out_file.read_text() content2 = out_file.read_text()
self.assertEqual( self.assertEqual(
content1, content2, content1,
msg="Output differs between runs; user sorting should be stable." content2,
msg="Output differs between runs; user sorting should be stable.",
) )
finally: finally:
shutil.rmtree(tmpdir) shutil.rmtree(tmpdir)
@@ -265,11 +271,8 @@ class TestGenerateUsers(unittest.TestCase):
become_pwd="pw", become_pwd="pw",
) )
# Reserved user should carry the flag
self.assertIn("reserved", build["admin"]) self.assertIn("reserved", build["admin"])
self.assertTrue(build["admin"]["reserved"]) self.assertTrue(build["admin"]["reserved"])
# Non-reserved user should not have the flag at all
self.assertNotIn("reserved", build["bob"]) self.assertNotIn("reserved", build["bob"])
def test_cli_reserved_usernames_flag_sets_reserved_field(self): def test_cli_reserved_usernames_flag_sets_reserved_field(self):
@@ -278,7 +281,6 @@ class TestGenerateUsers(unittest.TestCase):
in the generated YAML, and that existing definitions are preserved in the generated YAML, and that existing definitions are preserved
(only 'reserved' is added). (only 'reserved' is added).
""" """
import tempfile
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -287,7 +289,6 @@ class TestGenerateUsers(unittest.TestCase):
roles_dir = tmpdir / "roles" roles_dir = tmpdir / "roles"
roles_dir.mkdir() roles_dir.mkdir()
# Role with an existing user definition "admin"
(roles_dir / "role-base" / "users").mkdir(parents=True, exist_ok=True) (roles_dir / "role-base" / "users").mkdir(parents=True, exist_ok=True)
with open(roles_dir / "role-base" / "users" / "main.yml", "w") as f: with open(roles_dir / "role-base" / "users" / "main.yml", "w") as f:
yaml.safe_dump( yaml.safe_dump(
@@ -303,7 +304,7 @@ class TestGenerateUsers(unittest.TestCase):
) )
out_file = tmpdir / "users.yml" out_file = tmpdir / "users.yml"
script_path = Path(__file__).resolve().parents[5] / "cli" / "build" / "defaults" / "users.py" script_path = Path(users.__file__).resolve()
result = subprocess.run( result = subprocess.run(
[ [
@@ -326,12 +327,9 @@ class TestGenerateUsers(unittest.TestCase):
self.assertIn("default_users", data) self.assertIn("default_users", data)
users_map = data["default_users"] users_map = data["default_users"]
# "service" was created from the reserved list and must be reserved
self.assertIn("service", users_map) self.assertIn("service", users_map)
self.assertTrue(users_map["service"].get("reserved", False)) self.assertTrue(users_map["service"].get("reserved", False))
# "admin" existed before; its fields must remain unchanged,
# but it must now be marked as reserved
self.assertIn("admin", users_map) self.assertIn("admin", users_map)
self.assertEqual(users_map["admin"]["email"], "admin@ex") self.assertEqual(users_map["admin"]["email"], "admin@ex")
self.assertEqual(users_map["admin"]["description"], "Admin from role") self.assertEqual(users_map["admin"]["description"], "Admin from role")
@@ -340,5 +338,6 @@ class TestGenerateUsers(unittest.TestCase):
finally: finally:
shutil.rmtree(tmpdir) shutil.rmtree(tmpdir)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -69,12 +69,12 @@ class TestMainHelpers(unittest.TestCase):
""" """
available = [ available = [
(None, "deploy"), (None, "deploy"),
("build/defaults", "users"), ("setup", "users"),
] ]
main.show_full_help_for_all("/fake/cli", available) main.show_full_help_for_all("/fake/cli", available)
expected_modules = {"cli.deploy", "cli.build.defaults.users"} expected_modules = {"cli.deploy", "cli.setup.users"}
invoked_modules = set() invoked_modules = set()
for call in mock_run.call_args_list: for call in mock_run.call_args_list: