Compare commits

..

No commits in common. "7dd8fd4a5fcec44eb6d912d30a1b17ea31c02589" and "ceab517dfaf095c27f385c3192e6219d31827ea6" have entirely different histories.

11 changed files with 37 additions and 315 deletions

View File

@ -6,8 +6,6 @@ from typing import Dict
from utils.handler.yaml import YamlHandler from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar from utils.handler.vault import VaultHandler, VaultScalar
import string import string
import sys
import base64
class InventoryManager: class InventoryManager:
def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]): def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]):
@ -114,6 +112,4 @@ class InventoryManager:
return bcrypt.hashpw(pw, bcrypt.gensalt()).decode() return bcrypt.hashpw(pw, bcrypt.gensalt()).decode()
if algorithm == "alphanumeric": if algorithm == "alphanumeric":
return self.generate_secure_alphanumeric(64) return self.generate_secure_alphanumeric(64)
if algorithm == "base64_prefixed_32":
return "base64:" + base64.b64encode(secrets.token_bytes(32)).decode()
return "undefined" return "undefined"

69
main.py
View File

@ -7,8 +7,6 @@ import sys
import textwrap import textwrap
import threading import threading
import signal import signal
from datetime import datetime
import pty
from cli.sounds import Sound from cli.sounds import Sound
@ -48,10 +46,6 @@ def extract_description_via_help(cli_script_path):
except Exception: except Exception:
return "-" return "-"
def git_clean_repo():
"""Remove all Git-ignored files and directories in the current repository."""
subprocess.run(['git', 'clean', '-Xfd'], check=True)
def play_start_intro(): def play_start_intro():
Sound.play_start_sound() Sound.play_start_sound()
Sound.play_cymais_intro_sound() Sound.play_cymais_intro_sound()
@ -65,32 +59,18 @@ def failure_with_warning_loop():
except KeyboardInterrupt: except KeyboardInterrupt:
print("Warnings stopped by user.") print("Warnings stopped by user.")
from cli.sounds import Sound # ensure Sound imported
def _main():
# existing main block logic here
pass
if __name__ == "__main__": if __name__ == "__main__":
_main() # Parse --no-sound early and remove from args
# Parse --no-sound, --log and --git-clean early and remove from args
no_sound = False no_sound = False
log_enabled = False
git_clean = False
if '--no-sound' in sys.argv: if '--no-sound' in sys.argv:
no_sound = True no_sound = True
sys.argv.remove('--no-sound') sys.argv.remove('--no-sound')
if '--log' in sys.argv:
log_enabled = True
sys.argv.remove('--log')
if '--git-clean' in sys.argv:
git_clean = True
sys.argv.remove('--git-clean')
# Setup segfault handler to catch crashes # Setup segfault handler to catch crashes
def segv_handler(signum, frame): def segv_handler(signum, frame):
if not no_sound: if not no_sound:
Sound.play_finished_failed_sound() Sound.play_finished_failed_sound()
# Loop warning until interrupted
try: try:
while True: while True:
Sound.play_warning_sound() Sound.play_warning_sound()
@ -109,20 +89,14 @@ if __name__ == "__main__":
cli_dir = os.path.join(script_dir, "cli") cli_dir = os.path.join(script_dir, "cli")
os.chdir(script_dir) os.chdir(script_dir)
# If requested, clean git-ignored files
if git_clean:
git_clean_repo()
available_cli_commands = list_cli_commands(cli_dir) available_cli_commands = list_cli_commands(cli_dir)
# Handle help invocation # Handle help invocation
if len(sys.argv) == 1 or sys.argv[1] in ('-h', '--help'): if len(sys.argv) == 1 or sys.argv[1] in ('-h', '--help'):
print("CyMaIS CLI proxy to tools in ./cli/") print("CyMaIS CLI proxy to tools in ./cli/")
print("Usage: cymais [--no-sound] [--log] [--git-clean] <command> [options]") print("Usage: cymais [--no-sound] <command> [options]")
print("Options:") print("Options:")
print(" --no-sound Suppress all sounds during execution") print(" --no-sound Suppress all sounds during execution")
print(" --log Log all proxied command output to logfile.log")
print(" --git-clean Remove all Git-ignored files before running")
print(" -h, --help Show this help message and exit") print(" -h, --help Show this help message and exit")
print("Available commands:") print("Available commands:")
for cmd in available_cli_commands: for cmd in available_cli_commands:
@ -145,39 +119,10 @@ if __name__ == "__main__":
cmd_path = os.path.join(cli_dir, f"{args.cli_command}.py") cmd_path = os.path.join(cli_dir, f"{args.cli_command}.py")
full_cmd = [sys.executable, cmd_path] + args.cli_args full_cmd = [sys.executable, cmd_path] + args.cli_args
log_file = None
if log_enabled:
log_file_path = os.path.join(script_dir, 'logfile.log')
log_file = open(log_file_path, 'a', encoding='utf-8')
try: try:
if log_enabled: proc = subprocess.Popen(full_cmd)
# Use a pseudo-terminal to preserve color formatting proc.wait()
master_fd, slave_fd = pty.openpty() rc = proc.returncode
proc = subprocess.Popen(
full_cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
text=True
)
os.close(slave_fd)
with os.fdopen(master_fd) as master:
for line in master:
ts = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
log_file.write(f"{ts} {line}")
log_file.flush()
print(line, end='')
proc.wait()
rc = proc.returncode
else:
proc = subprocess.Popen(full_cmd)
proc.wait()
rc = proc.returncode
if log_file:
log_file.close()
if rc != 0: if rc != 0:
print(f"Command '{args.cli_command}' failed with exit code {rc}.") print(f"Command '{args.cli_command}' failed with exit code {rc}.")
failure_with_warning_loop() failure_with_warning_loop()

View File

@ -15,7 +15,6 @@ This role deploys Moodle using Docker, automating the setup of both the Moodle a
- **Scalable Deployment:** Leverage Docker for a portable and scalable installation that adapts as your user base grows. - **Scalable Deployment:** Leverage Docker for a portable and scalable installation that adapts as your user base grows.
- **Robust Data Management:** Secure and reliable storage of both the Moodle application and user data through Docker volumes. - **Robust Data Management:** Secure and reliable storage of both the Moodle application and user data through Docker volumes.
- **Secure Web Access:** Configured to work seamlessly behind an Nginx reverse proxy for enhanced security and performance. - **Secure Web Access:** Configured to work seamlessly behind an Nginx reverse proxy for enhanced security and performance.
* **Single Sign-On (SSO) / OpenID Connect (OIDC):** Seamless integration with external identity providers for centralized authentication.
## Additional Resources ## Additional Resources

View File

@ -2,22 +2,30 @@
## Description ## Description
Pixelfed is a decentralized image-sharing platform that champions creativity and privacy. It offers a secure, community-driven alternative to centralized social networks by enabling federated communication and seamless content sharing through a modern web interface. Pixelfed is a decentralized image sharing platform that champions creativity and privacy. It offers a secure, communitydriven alternative to centralized social media networks by enabling federated communication and robust content sharing through a modern web interface.
## Overview ## Overview
This Docker Compose deployment automates the installation and operation of a Pixelfed instance. This Docker Compose deployment automates the installation and management of a Pixelfed instance
## Features ## Features
* **Decentralized Content Sharing:** Empower users to share photos and visual content across an interoperable, federated network with enhanced privacy controls. - **Decentralized Content Sharing:**
* **Modern, Responsive Web Interface:** Access an intuitive and adaptive UI for effortless browsing, administration, and content management. Empower users to share photos and visual content on an interoperable, federated network with enhanced privacy controls.
* **Robust Scalability & Performance:** Leverage integrated Redis caching and a reliable database (MariaDB or PostgreSQL) for smooth scaling and high performance.
* **Flexible Configuration:** Customize cache sizes, domain settings, and authentication options via environment variables and templated configuration files. - **Modern, Responsive Web Interface:**
* **Maintenance & Administration Tools:** Built-in CLI and web-based tools to clear caches, manage the database, and monitor application health. Access an intuitive and dynamic user interface designed for effortless browsing, administration, and content management.
* **Single Sign-On (SSO) / OpenID Connect (OIDC):** Seamless integration with external identity providers for centralized authentication.
- **Robust Scalability & Performance:**
Leverage integrated Redis caching and a secure database (MariaDB or PostgreSQL) to ensure smooth scaling and high performance.
- **Flexible Configuration:**
Easily customize settings such as cache sizes, domain settings, and authentication options with environment variables and templated configuration files.
- **Maintenance & Administration Tools:**
Includes a suite of CLI commands and webbased management tools to clear cache, manage the database, and monitor application status.
## Other Resources ## Other Resources
* [Official Pixelfed website](https://pixelfed.org/) - [Pixelfed GitHub Repository](https://github.com/pixelfed/pixelfed)
* [Pixelfed GitHub repository](https://github.com/pixelfed/pixelfed) - [OIDC Plugin Installation Guide](https://chat.openai.com/share/67a4f448-4be8-800f-8639-4c15cb2fb44e)

View File

@ -0,0 +1,2 @@
# Todo
- [Integrate OIDC as soon as possible](https://github.com/pixelfed/pixelfed/pull/5608)

View File

@ -1,5 +1,5 @@
credentials: credentials:
app_key: app_key:
description: "Generic 32-byte base64 key with base64: prefix" description: "Application key used for encryption in Pixelfed (.env APP_KEY)"
algorithm: base64_prefixed_32 algorithm: "plain"
validation: '^base64:[A-Za-z0-9+/]{43}=$' validation: "^base64:[A-Za-z0-9+/=]{40,}$"

View File

@ -149,6 +149,6 @@ PF_OIDC_USERNAME_FIELD="{{oidc.attributes.username}}"
PF_OIDC_FIELD_ID="{{oidc.attributes.username}}" PF_OIDC_FIELD_ID="{{oidc.attributes.username}}"
PF_OIDC_CLIENT_SECRET={{oidc.client.secret}} PF_OIDC_CLIENT_SECRET={{oidc.client.secret}}
PF_OIDC_CLIENT_ID={{oidc.client.id}} PF_OIDC_CLIENT_ID={{oidc.client.id}}
PF_OIDC_SCOPES="openid profile email" PF_OIDC_SCOPES="openid,profile,email"
{% endif %} {% endif %}

View File

@ -1,18 +1,16 @@
titel: "Pictures on {{primary_domain}}" titel: "Pictures on {{primary_domain}}"
#version: "latest" #version: "latest"
images: images:
pixelfed: "zknt/pixelfed:latest" pixelfed: "ghcr.io/pixelfed/pixelfed:latest"
features: features:
matomo: true matomo: true
css: false # Needs to be reactivated css: true
portfolio_iframe: false portfolio_iframe: false
central_database: true central_database: true
oidc: true
csp: csp:
flags: flags:
script-src: script-src:
unsafe-eval: true unsafe-eval: true
unsafe-inline: true
script-src-elem: script-src-elem:
unsafe-inline: true unsafe-inline: true
unsafe-eval: true unsafe-eval: true

View File

@ -1,5 +1,5 @@
credentials: credentials:
app_key: app_key:
description: "Generic 32-byte base64 key with base64: prefix" description: "Application encryption key for Snipe-IT (.env APP_KEY)"
algorithm: base64_prefixed_32 algorithm: "plain"
validation: '^base64:[A-Za-z0-9+/]{43}=$' validation: "^base64:[A-Za-z0-9+/=]{40,}$"

View File

@ -1,143 +0,0 @@
import unittest
import sys
import os
import tempfile
import shutil
from pathlib import Path
from unittest.mock import patch
# Ensure the cli package is on sys.path
sys.path.insert(
0,
os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../cli")
),
)
from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar
from cli.utils.manager.inventory import InventoryManager
class TestInventoryManager(unittest.TestCase):
def setUp(self):
# Create a temporary directory for role and inventory files
self.tmpdir = Path(tempfile.mkdtemp())
# Patch YamlHandler.load_yaml
self.load_yaml_patcher = patch.object(
YamlHandler,
'load_yaml',
side_effect=self.fake_load_yaml
)
self.load_yaml_patcher.start()
# Patch VaultHandler.encrypt_string with correct signature
self.encrypt_patcher = patch.object(
VaultHandler,
'encrypt_string',
new=lambda self, plain, key: f"{key}: !vault |\n encrypted_{plain}"
)
self.encrypt_patcher.start()
def tearDown(self):
# Stop patchers
patch.stopall()
# Remove temporary directory
shutil.rmtree(self.tmpdir)
def fake_load_yaml(self, path):
path = Path(path)
# Return schema for meta/schema.yml
if path.match("*/meta/schema.yml"):
return {
"credentials": {
"plain_cred": {"description": "desc", "algorithm": "plain", "validation": {}},
"nested": {"inner": {"description": "desc2", "algorithm": "sha256", "validation": {}}}
}
}
# Return application_id for vars/main.yml
if path.match("*/vars/main.yml"):
return {"application_id": "testapp"}
# Return feature flags for vars/configuration.yml
if path.match("*/vars/configuration.yml"):
return {"features": {"central_database": True}}
# Return empty inventory for inventory.yml
if path.name == "inventory.yml":
return {}
raise FileNotFoundError(f"Unexpected load_yaml path: {path}")
def test_load_application_id_missing(self):
"""Loading application_id without it should raise SystemExit."""
role_dir = self.tmpdir / "role"
(role_dir / "vars").mkdir(parents=True)
(role_dir / "vars" / "main.yml").write_text("{}")
with patch.object(YamlHandler, 'load_yaml', return_value={}):
with self.assertRaises(SystemExit):
InventoryManager(role_dir, self.tmpdir / "inventory.yml", "pw", {}).load_application_id(role_dir)
def test_generate_value_algorithms(self):
"""Verify generate_value produces outputs of the expected form."""
# Bypass __init__ to avoid YAML loading
im = InventoryManager.__new__(InventoryManager)
# random_hex → 64 bytes hex = 128 chars
hex_val = im.generate_value("random_hex")
self.assertEqual(len(hex_val), 128)
self.assertTrue(all(c in "0123456789abcdef" for c in hex_val))
# sha256 → 64 hex chars
sha256_val = im.generate_value("sha256")
self.assertEqual(len(sha256_val), 64)
# sha1 → 40 hex chars
sha1_val = im.generate_value("sha1")
self.assertEqual(len(sha1_val), 40)
# bcrypt → starts with bcrypt prefix
bcrypt_val = im.generate_value("bcrypt")
self.assertTrue(bcrypt_val.startswith("$2"))
# alphanumeric → 64 chars
alnum = im.generate_value("alphanumeric")
self.assertEqual(len(alnum), 64)
self.assertTrue(alnum.isalnum())
# base64_prefixed_32 → starts with "base64:"
b64 = im.generate_value("base64_prefixed_32")
self.assertTrue(b64.startswith("base64:"))
def test_apply_schema_and_recurse(self):
"""
apply_schema should inject central_database password and vault nested.inner
"""
# Setup role directory
role_dir = self.tmpdir / "role"
(role_dir / "meta").mkdir(parents=True)
(role_dir / "vars").mkdir(parents=True)
# Create empty inventory.yml
inv_file = self.tmpdir / "inventory.yml"
inv_file.write_text(" ")
# Provide override for plain_cred to avoid SystemExit
overrides = {'credentials.plain_cred': 'OVERRIDE_PLAIN'}
# Instantiate manager with overrides
mgr = InventoryManager(role_dir, inv_file, "pw", overrides=overrides)
# Patch generate_value locally for predictable values
with patch.object(InventoryManager, 'generate_value', lambda self, alg: f"GEN_{alg}"):
result = mgr.apply_schema()
apps = result["applications"]["testapp"]
# central_database entry
self.assertEqual(apps["credentials"]["database_password"], "GEN_alphanumeric")
# plain_cred vaulted from override
self.assertIsInstance(apps["credentials"]["plain_cred"], VaultScalar)
# nested.inner should not be vaulted due to code's prefix check
self.assertEqual(
apps["credentials"]["nested"]["inner"],
{"description": "desc2", "algorithm": "sha256", "validation": {}},
)

View File

@ -1,83 +0,0 @@
# tests/unit/test_main.py
import os
import sys
import stat
import tempfile
import unittest
from unittest import mock
# Insert project root into import path so we can import main.py
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
)
import main # assumes main.py lives at the project root
class TestMainHelpers(unittest.TestCase):
def test_format_command_help_basic(self):
name = "cmd"
description = "A basic description"
output = main.format_command_help(
name, description,
indent=2, col_width=20, width=40
)
# Should start with two spaces and the command name
self.assertTrue(output.startswith(" cmd"))
# Description should appear somewhere in the wrapped text
self.assertIn("A basic description", output)
def test_list_cli_commands_filters_and_sorts(self):
# Create a temporary directory with sample files
with tempfile.TemporaryDirectory() as tmpdir:
open(os.path.join(tmpdir, "one.py"), "w").close()
open(os.path.join(tmpdir, "__init__.py"), "w").close()
open(os.path.join(tmpdir, "ignore.txt"), "w").close()
open(os.path.join(tmpdir, "two.py"), "w").close()
# Only 'one' and 'two' should be returned, in sorted order
commands = main.list_cli_commands(tmpdir)
self.assertEqual(commands, ["one", "two"])
def test_git_clean_repo_invokes_git_clean(self):
with mock.patch('main.subprocess.run') as mock_run:
main.git_clean_repo()
mock_run.assert_called_once_with(['git', 'clean', '-Xfd'], check=True)
def test_extract_description_via_help_with_description(self):
# Create a dummy script that prints a help description
with tempfile.TemporaryDirectory() as tmpdir:
script_path = os.path.join(tmpdir, "dummy.py")
with open(script_path, "w") as f:
f.write(
"#!/usr/bin/env python3\n"
"import sys\n"
"if '--help' in sys.argv:\n"
" print('usage: dummy.py [options]')\n"
" print()\n"
" print('This is a help description.')\n"
)
# Make it executable
mode = os.stat(script_path).st_mode
os.chmod(script_path, mode | stat.S_IXUSR)
description = main.extract_description_via_help(script_path)
self.assertEqual(description, "This is a help description.")
def test_extract_description_via_help_without_description(self):
# Script that has no help description
with tempfile.TemporaryDirectory() as tmpdir:
script_path = os.path.join(tmpdir, "empty.py")
with open(script_path, "w") as f:
f.write(
"#!/usr/bin/env python3\n"
"print('no help here')\n"
)
description = main.extract_description_via_help(script_path)
self.assertEqual(description, "-")
if __name__ == "__main__":
unittest.main()