mirror of
https://github.com/kevinveenbirkenbach/computer-playbook.git
synced 2025-05-16 01:37:19 +02:00
Refactored CSP j2 filter logic to python code
This commit is contained in:
parent
0ac9dac658
commit
ec5dbc7e80
95
filter_plugins/csp_filters.py
Normal file
95
filter_plugins/csp_filters.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
from ansible.errors import AnsibleFilterError
|
||||||
|
|
||||||
|
class FilterModule(object):
|
||||||
|
"""
|
||||||
|
Custom filters for Content Security Policy generation and CSP-related utilities.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def filters(self):
|
||||||
|
return {
|
||||||
|
'get_csp_whitelist': self.get_csp_whitelist,
|
||||||
|
'get_csp_flags': self.get_csp_flags,
|
||||||
|
'build_csp_header': self.build_csp_header,
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_csp_whitelist(applications, application_id, directive):
|
||||||
|
"""
|
||||||
|
Return the list of extra hosts/URLs to whitelist for a given CSP directive.
|
||||||
|
"""
|
||||||
|
app = applications.get(application_id, {})
|
||||||
|
wl = app.get('csp', {}).get('whitelist', {}).get(directive, [])
|
||||||
|
if isinstance(wl, list):
|
||||||
|
return wl
|
||||||
|
if wl:
|
||||||
|
return [wl]
|
||||||
|
return []
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_csp_flags(applications, application_id, directive):
|
||||||
|
"""
|
||||||
|
Read 'unsafe_eval' and 'unsafe_inline' flags for a given CSP directive.
|
||||||
|
Returns a list of string tokens, e.g. ["'unsafe-eval'", "'unsafe-inline'"]
|
||||||
|
"""
|
||||||
|
app = applications.get(application_id, {})
|
||||||
|
flags_config = app.get('csp', {}).get('flags', {}).get(directive, {})
|
||||||
|
tokens = []
|
||||||
|
if flags_config.get('unsafe_eval', False):
|
||||||
|
tokens.append("'unsafe-eval'")
|
||||||
|
if flags_config.get('unsafe_inline', False):
|
||||||
|
tokens.append("'unsafe-inline'")
|
||||||
|
return tokens
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_feature_enabled(applications, feature, application_id):
|
||||||
|
"""
|
||||||
|
Check if a named feature is enabled for the given application.
|
||||||
|
"""
|
||||||
|
app = applications.get(application_id, {})
|
||||||
|
return bool(app.get('features', {}).get(feature, False))
|
||||||
|
|
||||||
|
def build_csp_header(self, applications, application_id, domains, web_protocol='https', matomo_feature_name='matomo'):
|
||||||
|
"""
|
||||||
|
Build the Content-Security-Policy header value dynamically based on application settings.
|
||||||
|
|
||||||
|
:param applications: dict of application configurations
|
||||||
|
:param application_id: the id of the application
|
||||||
|
:param domains: dict mapping names (e.g., 'matomo') to domain strings
|
||||||
|
:param web_protocol: protocol prefix for Matomo (default: 'https')
|
||||||
|
:param matomo_feature_name: feature flag name for Matomo (default: 'matomo')
|
||||||
|
:return: CSP header string, e.g. "default-src 'self'; script-src 'self' 'unsafe-eval' https://example.com; img-src * data: blob:;"
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
directives = [
|
||||||
|
'default-src',
|
||||||
|
'connect-src',
|
||||||
|
'frame-ancestors',
|
||||||
|
'frame-src',
|
||||||
|
'script-src',
|
||||||
|
'style-src',
|
||||||
|
'font-src'
|
||||||
|
]
|
||||||
|
parts = []
|
||||||
|
|
||||||
|
for directive in directives:
|
||||||
|
tokens = ["'self'"]
|
||||||
|
# unsafe flags
|
||||||
|
tokens += self.get_csp_flags(applications, application_id, directive)
|
||||||
|
# Matomo integration
|
||||||
|
if self.is_feature_enabled(applications, matomo_feature_name, application_id) \
|
||||||
|
and directive in ['script-src', 'connect-src']:
|
||||||
|
matomo_domain = domains.get('matomo')
|
||||||
|
if matomo_domain:
|
||||||
|
tokens.append(f"{web_protocol}://{matomo_domain}")
|
||||||
|
# whitelist
|
||||||
|
tokens += self.get_csp_whitelist(applications, application_id, directive)
|
||||||
|
parts.append(f"{directive} {' '.join(tokens)};")
|
||||||
|
|
||||||
|
# static img-src
|
||||||
|
parts.append("img-src * data: blob:;")
|
||||||
|
|
||||||
|
# join parts with space
|
||||||
|
return ' '.join(parts)
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
raise AnsibleFilterError(f"build_csp_header failed: {exc}")
|
@ -1,45 +1,2 @@
|
|||||||
{# Create a namespace to hold the accumulated CSP parts #}
|
add_header Content-Security-Policy "{{ applications | build_csp_header(application_id, domains) }}" always;
|
||||||
{% set ns = namespace(csp_parts=[]) %}
|
|
||||||
|
|
||||||
{# List of directives to build dynamically (except img-src) #}
|
|
||||||
{% set directives = [
|
|
||||||
'default-src',
|
|
||||||
'connect-src',
|
|
||||||
'frame-ancestors',
|
|
||||||
'frame-src',
|
|
||||||
'script-src',
|
|
||||||
'style-src',
|
|
||||||
'font-src'
|
|
||||||
] %}
|
|
||||||
|
|
||||||
{# Build each directive line #}
|
|
||||||
{% for directive in directives %}
|
|
||||||
{# Always start with 'self' #}
|
|
||||||
{% set tokens = ["'self'"] %}
|
|
||||||
|
|
||||||
{# Add any unsafe flags for this directive #}
|
|
||||||
{% for flag in applications | get_csp_flags(application_id, directive) %}
|
|
||||||
{% set tokens = tokens + [flag] %}
|
|
||||||
{% endfor %}
|
|
||||||
|
|
||||||
{# If Matomo is enabled, allow its script and connect endpoints #}
|
|
||||||
{% if applications | is_feature_enabled('matomo', application_id)
|
|
||||||
and directive in ['script-src', 'connect-src'] %}
|
|
||||||
{% set tokens = tokens + [web_protocol ~ '://' ~ domains.matomo] %}
|
|
||||||
{% endif %}
|
|
||||||
|
|
||||||
{# Append any extra whitelist URLs for this directive #}
|
|
||||||
{% for url in applications | get_csp_whitelist(application_id, directive) %}
|
|
||||||
{% set tokens = tokens + [url] %}
|
|
||||||
{% endfor %}
|
|
||||||
|
|
||||||
{# Store the completed directive line in the namespace #}
|
|
||||||
{% set ns.csp_parts = ns.csp_parts + [directive ~ ' ' ~ (tokens | join(' ')) ~ ';'] %}
|
|
||||||
{% endfor %}
|
|
||||||
|
|
||||||
{# Add the (static) img-src directive #}
|
|
||||||
{% set ns.csp_parts = ns.csp_parts + ['img-src * data: blob:;'] %}
|
|
||||||
|
|
||||||
{# Emit the final header and hide any upstream header #}
|
|
||||||
add_header Content-Security-Policy "{{ ns.csp_parts | join(' ') }}" always;
|
|
||||||
proxy_hide_header Content-Security-Policy;
|
proxy_hide_header Content-Security-Policy;
|
79
tests/unit/test_csp_filters.py
Normal file
79
tests/unit/test_csp_filters.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
import unittest
|
||||||
|
from filter_plugins.csp_filters import FilterModule
|
||||||
|
|
||||||
|
class TestCspFilters(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.filter = FilterModule()
|
||||||
|
self.apps = {
|
||||||
|
'app1': {
|
||||||
|
'features': {
|
||||||
|
'oauth2': True,
|
||||||
|
'matomo': True,
|
||||||
|
},
|
||||||
|
'csp': {
|
||||||
|
'whitelist': {
|
||||||
|
'script-src': ['https://cdn.example.com'],
|
||||||
|
'connect-src': 'https://api.example.com',
|
||||||
|
},
|
||||||
|
'flags': {
|
||||||
|
'script-src': {
|
||||||
|
'unsafe_eval': True,
|
||||||
|
'unsafe_inline': False,
|
||||||
|
},
|
||||||
|
'style-src': {
|
||||||
|
'unsafe_inline': True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'app2': {}
|
||||||
|
}
|
||||||
|
self.domains = {
|
||||||
|
'matomo': 'matomo.example.org'
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_get_csp_whitelist_list(self):
|
||||||
|
result = self.filter.get_csp_whitelist(self.apps, 'app1', 'script-src')
|
||||||
|
self.assertEqual(result, ['https://cdn.example.com'])
|
||||||
|
|
||||||
|
def test_get_csp_whitelist_string(self):
|
||||||
|
result = self.filter.get_csp_whitelist(self.apps, 'app1', 'connect-src')
|
||||||
|
self.assertEqual(result, ['https://api.example.com'])
|
||||||
|
|
||||||
|
def test_get_csp_whitelist_none(self):
|
||||||
|
result = self.filter.get_csp_whitelist(self.apps, 'app1', 'font-src')
|
||||||
|
self.assertEqual(result, [])
|
||||||
|
|
||||||
|
def test_get_csp_flags_eval(self):
|
||||||
|
result = self.filter.get_csp_flags(self.apps, 'app1', 'script-src')
|
||||||
|
self.assertIn("'unsafe-eval'", result)
|
||||||
|
self.assertNotIn("'unsafe-inline'", result)
|
||||||
|
|
||||||
|
def test_get_csp_flags_inline(self):
|
||||||
|
result = self.filter.get_csp_flags(self.apps, 'app1', 'style-src')
|
||||||
|
self.assertIn("'unsafe-inline'", result)
|
||||||
|
self.assertNotIn("'unsafe-eval'", result)
|
||||||
|
|
||||||
|
def test_get_csp_flags_none(self):
|
||||||
|
result = self.filter.get_csp_flags(self.apps, 'app1', 'connect-src')
|
||||||
|
self.assertEqual(result, [])
|
||||||
|
|
||||||
|
def test_build_csp_header_basic(self):
|
||||||
|
header = self.filter.build_csp_header(self.apps, 'app1', self.domains, web_protocol='https')
|
||||||
|
# Ensure core directives are present
|
||||||
|
self.assertIn("default-src 'self';", header)
|
||||||
|
self.assertIn("script-src 'self' 'unsafe-eval' https://matomo.example.org https://cdn.example.com;", header)
|
||||||
|
self.assertIn("connect-src 'self' https://matomo.example.org https://api.example.com;", header)
|
||||||
|
self.assertTrue(header.strip().endswith('img-src * data: blob:;'))
|
||||||
|
|
||||||
|
def test_build_csp_header_without_matomo_or_flags(self):
|
||||||
|
header = self.filter.build_csp_header(self.apps, 'app2', self.domains)
|
||||||
|
# default-src only contains 'self'
|
||||||
|
self.assertIn("default-src 'self';", header)
|
||||||
|
# no external URLs
|
||||||
|
self.assertNotIn('http', header)
|
||||||
|
# ends with img-src
|
||||||
|
self.assertTrue(header.strip().endswith('img-src * data: blob:;'))
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
x
Reference in New Issue
Block a user