Compare commits

...

3 Commits

11 changed files with 315 additions and 37 deletions

View File

@ -6,6 +6,8 @@ from typing import Dict
from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar
import string
import sys
import base64
class InventoryManager:
def __init__(self, role_path: Path, inventory_path: Path, vault_pw: str, overrides: Dict[str, str]):
@ -112,4 +114,6 @@ class InventoryManager:
return bcrypt.hashpw(pw, bcrypt.gensalt()).decode()
if algorithm == "alphanumeric":
return self.generate_secure_alphanumeric(64)
if algorithm == "base64_prefixed_32":
return "base64:" + base64.b64encode(secrets.token_bytes(32)).decode()
return "undefined"

69
main.py
View File

@ -7,6 +7,8 @@ import sys
import textwrap
import threading
import signal
from datetime import datetime
import pty
from cli.sounds import Sound
@ -46,6 +48,10 @@ def extract_description_via_help(cli_script_path):
except Exception:
return "-"
def git_clean_repo():
"""Remove all Git-ignored files and directories in the current repository."""
subprocess.run(['git', 'clean', '-Xfd'], check=True)
def play_start_intro():
Sound.play_start_sound()
Sound.play_cymais_intro_sound()
@ -59,18 +65,32 @@ def failure_with_warning_loop():
except KeyboardInterrupt:
print("Warnings stopped by user.")
from cli.sounds import Sound # ensure Sound imported
def _main():
# existing main block logic here
pass
if __name__ == "__main__":
# Parse --no-sound early and remove from args
_main()
# Parse --no-sound, --log and --git-clean early and remove from args
no_sound = False
log_enabled = False
git_clean = False
if '--no-sound' in sys.argv:
no_sound = True
sys.argv.remove('--no-sound')
if '--log' in sys.argv:
log_enabled = True
sys.argv.remove('--log')
if '--git-clean' in sys.argv:
git_clean = True
sys.argv.remove('--git-clean')
# Setup segfault handler to catch crashes
def segv_handler(signum, frame):
if not no_sound:
Sound.play_finished_failed_sound()
# Loop warning until interrupted
try:
while True:
Sound.play_warning_sound()
@ -89,14 +109,20 @@ if __name__ == "__main__":
cli_dir = os.path.join(script_dir, "cli")
os.chdir(script_dir)
# If requested, clean git-ignored files
if git_clean:
git_clean_repo()
available_cli_commands = list_cli_commands(cli_dir)
# Handle help invocation
# Handle help invocation
if len(sys.argv) == 1 or sys.argv[1] in ('-h', '--help'):
print("CyMaIS CLI proxy to tools in ./cli/")
print("Usage: cymais [--no-sound] <command> [options]")
print("Usage: cymais [--no-sound] [--log] [--git-clean] <command> [options]")
print("Options:")
print(" --no-sound Suppress all sounds during execution")
print(" --log Log all proxied command output to logfile.log")
print(" --git-clean Remove all Git-ignored files before running")
print(" -h, --help Show this help message and exit")
print("Available commands:")
for cmd in available_cli_commands:
@ -119,10 +145,39 @@ if __name__ == "__main__":
cmd_path = os.path.join(cli_dir, f"{args.cli_command}.py")
full_cmd = [sys.executable, cmd_path] + args.cli_args
log_file = None
if log_enabled:
log_file_path = os.path.join(script_dir, 'logfile.log')
log_file = open(log_file_path, 'a', encoding='utf-8')
try:
proc = subprocess.Popen(full_cmd)
proc.wait()
rc = proc.returncode
if log_enabled:
# Use a pseudo-terminal to preserve color formatting
master_fd, slave_fd = pty.openpty()
proc = subprocess.Popen(
full_cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
text=True
)
os.close(slave_fd)
with os.fdopen(master_fd) as master:
for line in master:
ts = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
log_file.write(f"{ts} {line}")
log_file.flush()
print(line, end='')
proc.wait()
rc = proc.returncode
else:
proc = subprocess.Popen(full_cmd)
proc.wait()
rc = proc.returncode
if log_file:
log_file.close()
if rc != 0:
print(f"Command '{args.cli_command}' failed with exit code {rc}.")
failure_with_warning_loop()

View File

@ -15,6 +15,7 @@ This role deploys Moodle using Docker, automating the setup of both the Moodle a
- **Scalable Deployment:** Leverage Docker for a portable and scalable installation that adapts as your user base grows.
- **Robust Data Management:** Secure and reliable storage of both the Moodle application and user data through Docker volumes.
- **Secure Web Access:** Configured to work seamlessly behind an Nginx reverse proxy for enhanced security and performance.
* **Single Sign-On (SSO) / OpenID Connect (OIDC):** Seamless integration with external identity providers for centralized authentication.
## Additional Resources

View File

@ -2,30 +2,22 @@
## Description
Pixelfed is a decentralized image sharing platform that champions creativity and privacy. It offers a secure, communitydriven alternative to centralized social media networks by enabling federated communication and robust content sharing through a modern web interface.
Pixelfed is a decentralized image-sharing platform that champions creativity and privacy. It offers a secure, community-driven alternative to centralized social networks by enabling federated communication and seamless content sharing through a modern web interface.
## Overview
This Docker Compose deployment automates the installation and management of a Pixelfed instance
This Docker Compose deployment automates the installation and operation of a Pixelfed instance.
## Features
## Features
- **Decentralized Content Sharing:**
Empower users to share photos and visual content on an interoperable, federated network with enhanced privacy controls.
- **Modern, Responsive Web Interface:**
Access an intuitive and dynamic user interface designed for effortless browsing, administration, and content management.
- **Robust Scalability & Performance:**
Leverage integrated Redis caching and a secure database (MariaDB or PostgreSQL) to ensure smooth scaling and high performance.
- **Flexible Configuration:**
Easily customize settings such as cache sizes, domain settings, and authentication options with environment variables and templated configuration files.
- **Maintenance & Administration Tools:**
Includes a suite of CLI commands and webbased management tools to clear cache, manage the database, and monitor application status.
* **Decentralized Content Sharing:** Empower users to share photos and visual content across an interoperable, federated network with enhanced privacy controls.
* **Modern, Responsive Web Interface:** Access an intuitive and adaptive UI for effortless browsing, administration, and content management.
* **Robust Scalability & Performance:** Leverage integrated Redis caching and a reliable database (MariaDB or PostgreSQL) for smooth scaling and high performance.
* **Flexible Configuration:** Customize cache sizes, domain settings, and authentication options via environment variables and templated configuration files.
* **Maintenance & Administration Tools:** Built-in CLI and web-based tools to clear caches, manage the database, and monitor application health.
* **Single Sign-On (SSO) / OpenID Connect (OIDC):** Seamless integration with external identity providers for centralized authentication.
## Other Resources
- [Pixelfed GitHub Repository](https://github.com/pixelfed/pixelfed)
- [OIDC Plugin Installation Guide](https://chat.openai.com/share/67a4f448-4be8-800f-8639-4c15cb2fb44e)
* [Official Pixelfed website](https://pixelfed.org/)
* [Pixelfed GitHub repository](https://github.com/pixelfed/pixelfed)

View File

@ -1,2 +0,0 @@
# Todo
- [Integrate OIDC as soon as possible](https://github.com/pixelfed/pixelfed/pull/5608)

View File

@ -1,5 +1,5 @@
credentials:
app_key:
description: "Application key used for encryption in Pixelfed (.env APP_KEY)"
algorithm: "plain"
validation: "^base64:[A-Za-z0-9+/=]{40,}$"
description: "Generic 32-byte base64 key with base64: prefix"
algorithm: base64_prefixed_32
validation: '^base64:[A-Za-z0-9+/]{43}=$'

View File

@ -149,6 +149,6 @@ PF_OIDC_USERNAME_FIELD="{{oidc.attributes.username}}"
PF_OIDC_FIELD_ID="{{oidc.attributes.username}}"
PF_OIDC_CLIENT_SECRET={{oidc.client.secret}}
PF_OIDC_CLIENT_ID={{oidc.client.id}}
PF_OIDC_SCOPES="openid,profile,email"
PF_OIDC_SCOPES="openid profile email"
{% endif %}

View File

@ -1,16 +1,18 @@
titel: "Pictures on {{primary_domain}}"
#version: "latest"
images:
pixelfed: "ghcr.io/pixelfed/pixelfed:latest"
pixelfed: "zknt/pixelfed:latest"
features:
matomo: true
css: true
css: false # Needs to be reactivated
portfolio_iframe: false
central_database: true
oidc: true
csp:
flags:
script-src:
unsafe-eval: true
unsafe-inline: true
script-src-elem:
unsafe-inline: true
unsafe-eval: true

View File

@ -1,5 +1,5 @@
credentials:
app_key:
description: "Application encryption key for Snipe-IT (.env APP_KEY)"
algorithm: "plain"
validation: "^base64:[A-Za-z0-9+/=]{40,}$"
description: "Generic 32-byte base64 key with base64: prefix"
algorithm: base64_prefixed_32
validation: '^base64:[A-Za-z0-9+/]{43}=$'

View File

@ -0,0 +1,143 @@
import unittest
import sys
import os
import tempfile
import shutil
from pathlib import Path
from unittest.mock import patch
# Ensure the cli package is on sys.path
sys.path.insert(
0,
os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../cli")
),
)
from utils.handler.yaml import YamlHandler
from utils.handler.vault import VaultHandler, VaultScalar
from cli.utils.manager.inventory import InventoryManager
class TestInventoryManager(unittest.TestCase):
def setUp(self):
# Create a temporary directory for role and inventory files
self.tmpdir = Path(tempfile.mkdtemp())
# Patch YamlHandler.load_yaml
self.load_yaml_patcher = patch.object(
YamlHandler,
'load_yaml',
side_effect=self.fake_load_yaml
)
self.load_yaml_patcher.start()
# Patch VaultHandler.encrypt_string with correct signature
self.encrypt_patcher = patch.object(
VaultHandler,
'encrypt_string',
new=lambda self, plain, key: f"{key}: !vault |\n encrypted_{plain}"
)
self.encrypt_patcher.start()
def tearDown(self):
# Stop patchers
patch.stopall()
# Remove temporary directory
shutil.rmtree(self.tmpdir)
def fake_load_yaml(self, path):
path = Path(path)
# Return schema for meta/schema.yml
if path.match("*/meta/schema.yml"):
return {
"credentials": {
"plain_cred": {"description": "desc", "algorithm": "plain", "validation": {}},
"nested": {"inner": {"description": "desc2", "algorithm": "sha256", "validation": {}}}
}
}
# Return application_id for vars/main.yml
if path.match("*/vars/main.yml"):
return {"application_id": "testapp"}
# Return feature flags for vars/configuration.yml
if path.match("*/vars/configuration.yml"):
return {"features": {"central_database": True}}
# Return empty inventory for inventory.yml
if path.name == "inventory.yml":
return {}
raise FileNotFoundError(f"Unexpected load_yaml path: {path}")
def test_load_application_id_missing(self):
"""Loading application_id without it should raise SystemExit."""
role_dir = self.tmpdir / "role"
(role_dir / "vars").mkdir(parents=True)
(role_dir / "vars" / "main.yml").write_text("{}")
with patch.object(YamlHandler, 'load_yaml', return_value={}):
with self.assertRaises(SystemExit):
InventoryManager(role_dir, self.tmpdir / "inventory.yml", "pw", {}).load_application_id(role_dir)
def test_generate_value_algorithms(self):
"""Verify generate_value produces outputs of the expected form."""
# Bypass __init__ to avoid YAML loading
im = InventoryManager.__new__(InventoryManager)
# random_hex → 64 bytes hex = 128 chars
hex_val = im.generate_value("random_hex")
self.assertEqual(len(hex_val), 128)
self.assertTrue(all(c in "0123456789abcdef" for c in hex_val))
# sha256 → 64 hex chars
sha256_val = im.generate_value("sha256")
self.assertEqual(len(sha256_val), 64)
# sha1 → 40 hex chars
sha1_val = im.generate_value("sha1")
self.assertEqual(len(sha1_val), 40)
# bcrypt → starts with bcrypt prefix
bcrypt_val = im.generate_value("bcrypt")
self.assertTrue(bcrypt_val.startswith("$2"))
# alphanumeric → 64 chars
alnum = im.generate_value("alphanumeric")
self.assertEqual(len(alnum), 64)
self.assertTrue(alnum.isalnum())
# base64_prefixed_32 → starts with "base64:"
b64 = im.generate_value("base64_prefixed_32")
self.assertTrue(b64.startswith("base64:"))
def test_apply_schema_and_recurse(self):
"""
apply_schema should inject central_database password and vault nested.inner
"""
# Setup role directory
role_dir = self.tmpdir / "role"
(role_dir / "meta").mkdir(parents=True)
(role_dir / "vars").mkdir(parents=True)
# Create empty inventory.yml
inv_file = self.tmpdir / "inventory.yml"
inv_file.write_text(" ")
# Provide override for plain_cred to avoid SystemExit
overrides = {'credentials.plain_cred': 'OVERRIDE_PLAIN'}
# Instantiate manager with overrides
mgr = InventoryManager(role_dir, inv_file, "pw", overrides=overrides)
# Patch generate_value locally for predictable values
with patch.object(InventoryManager, 'generate_value', lambda self, alg: f"GEN_{alg}"):
result = mgr.apply_schema()
apps = result["applications"]["testapp"]
# central_database entry
self.assertEqual(apps["credentials"]["database_password"], "GEN_alphanumeric")
# plain_cred vaulted from override
self.assertIsInstance(apps["credentials"]["plain_cred"], VaultScalar)
# nested.inner should not be vaulted due to code's prefix check
self.assertEqual(
apps["credentials"]["nested"]["inner"],
{"description": "desc2", "algorithm": "sha256", "validation": {}},
)

83
tests/unit/test_main.py Normal file
View File

@ -0,0 +1,83 @@
# tests/unit/test_main.py
import os
import sys
import stat
import tempfile
import unittest
from unittest import mock
# Insert project root into import path so we can import main.py
sys.path.insert(
0,
os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
)
import main # assumes main.py lives at the project root
class TestMainHelpers(unittest.TestCase):
def test_format_command_help_basic(self):
name = "cmd"
description = "A basic description"
output = main.format_command_help(
name, description,
indent=2, col_width=20, width=40
)
# Should start with two spaces and the command name
self.assertTrue(output.startswith(" cmd"))
# Description should appear somewhere in the wrapped text
self.assertIn("A basic description", output)
def test_list_cli_commands_filters_and_sorts(self):
# Create a temporary directory with sample files
with tempfile.TemporaryDirectory() as tmpdir:
open(os.path.join(tmpdir, "one.py"), "w").close()
open(os.path.join(tmpdir, "__init__.py"), "w").close()
open(os.path.join(tmpdir, "ignore.txt"), "w").close()
open(os.path.join(tmpdir, "two.py"), "w").close()
# Only 'one' and 'two' should be returned, in sorted order
commands = main.list_cli_commands(tmpdir)
self.assertEqual(commands, ["one", "two"])
def test_git_clean_repo_invokes_git_clean(self):
with mock.patch('main.subprocess.run') as mock_run:
main.git_clean_repo()
mock_run.assert_called_once_with(['git', 'clean', '-Xfd'], check=True)
def test_extract_description_via_help_with_description(self):
# Create a dummy script that prints a help description
with tempfile.TemporaryDirectory() as tmpdir:
script_path = os.path.join(tmpdir, "dummy.py")
with open(script_path, "w") as f:
f.write(
"#!/usr/bin/env python3\n"
"import sys\n"
"if '--help' in sys.argv:\n"
" print('usage: dummy.py [options]')\n"
" print()\n"
" print('This is a help description.')\n"
)
# Make it executable
mode = os.stat(script_path).st_mode
os.chmod(script_path, mode | stat.S_IXUSR)
description = main.extract_description_via_help(script_path)
self.assertEqual(description, "This is a help description.")
def test_extract_description_via_help_without_description(self):
# Script that has no help description
with tempfile.TemporaryDirectory() as tmpdir:
script_path = os.path.join(tmpdir, "empty.py")
with open(script_path, "w") as f:
f.write(
"#!/usr/bin/env python3\n"
"print('no help here')\n"
)
description = main.extract_description_via_help(script_path)
self.assertEqual(description, "-")
if __name__ == "__main__":
unittest.main()