Merge pull request #1859 from eyJhb/proxyauthheader

adds support for auth proxy header
This commit is contained in:
Roland Geider
2025-05-11 18:02:43 +02:00
committed by GitHub
12 changed files with 502 additions and 24 deletions

View File

@@ -102,6 +102,15 @@ WGER_SETTINGS["SYNC_OFF_DAILY_DELTA_CELERY"] = env.bool("SYNC_OFF_DAILY_DELTA_CE
WGER_SETTINGS["USE_RECAPTCHA"] = env.bool("USE_RECAPTCHA", False)
WGER_SETTINGS["USE_CELERY"] = env.bool("USE_CELERY", False)
#
# Auth Proxy Authentication
# https://wger.readthedocs.io/en/latest/administration/auth_proxy.html
AUTH_PROXY_HEADER = env.str("AUTH_PROXY_HEADER", '')
AUTH_PROXY_TRUSTED_IPS = env.list("AUTH_PROXY_TRUSTED_IPS", default=[])
AUTH_PROXY_CREATE_UNKNOWN_USER = env.bool("AUTH_PROXY_CREATE_UNKNOWN_USER", False)
AUTH_PROXY_USER_EMAIL_HEADER = env.str("AUTH_PROXY_USER_EMAIL_HEADER", '')
AUTH_PROXY_USER_NAME_HEADER = env.str("AUTH_PROXY_USER_NAME_HEADER", '')
# Cache
if os.environ.get("DJANGO_CACHE_BACKEND"):
CACHES = {

80
wger/core/backends.py Normal file
View File

@@ -0,0 +1,80 @@
# This file is part of wger Workout Manager.
#
# wger Workout Manager is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# wger Workout Manager is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# Standard Library
import logging
# Django
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import BaseBackend
logger = logging.getLogger(__name__)
User = get_user_model()
class AuthProxyUserBackend(BaseBackend):
"""
Authenticates against a username passed in the request meta (header).
Relies on the middleware to ensure the header comes from a trusted source.
"""
def authenticate(
self,
request,
username: str | None = None,
email: str | None = None,
name: str | None = None,
):
"""
Authenticate the user based on the username provided.
The middleware ensures this is only called when the source is trusted.
"""
if not username:
# This backend requires a username passed explicitly
return None
create_unknown_user = getattr(settings, 'AUTH_PROXY_CREATE_UNKNOWN_USER', False)
user = None
try:
user = User.objects.get(username=username)
logger.debug(f"AuthProxy: Found existing user '{username}'")
except User.DoesNotExist:
if create_unknown_user:
try:
user = User.objects.create_user(username=username, email=email, first_name=name)
logger.info(f"AuthProxy: Created new user '{username}'")
except Exception as e:
logger.error(f"AuthProxy: Failed to create user '{username}': {e}")
return None
else:
logger.warning(
f"AuthProxy: User '{username}' not found and auto-creation is disabled."
)
return None
except Exception as e:
logger.error(f"AuthProxy: Error fetching user '{username}': {e}")
return None
return user
def get_user(self, user_id):
"""
Standard Django method to retrieve a user by ID.
"""
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None

136
wger/core/middleware.py Normal file
View File

@@ -0,0 +1,136 @@
# This file is part of wger Workout Manager.
#
# wger Workout Manager is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# wger Workout Manager is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# Standard Library
import logging
# Django
from django.conf import settings
from django.contrib.auth import (
authenticate,
get_user_model,
login,
logout,
)
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.deprecation import MiddlewareMixin
# wger
from wger.utils.helpers import remove_language_code
logger = logging.getLogger(__name__)
User = get_user_model()
class AuthProxyHeaderMiddleware(MiddlewareMixin):
"""
Middleware to authenticate users based on a header set by a trusted proxy.
Relies on settings:
- AUTH_PROXY_HEADER: The request.META key containing the username.
- AUTH_PROXY_TRUSTED_IPS: List of IPs allowed to set the header.
"""
def __init__(self, get_response=None):
super().__init__(get_response)
self.login_url_path = remove_language_code(reverse('core:user:login'))
def process_request(self, request):
header_key = getattr(settings, 'AUTH_PROXY_HEADER', None)
user_email_key = getattr(settings, 'AUTH_PROXY_USER_EMAIL_HEADER', None)
user_name_key = getattr(settings, 'AUTH_PROXY_USER_NAME_HEADER', None)
trusted_ips = set(getattr(settings, 'AUTH_PROXY_TRUSTED_IPS', []))
# Skip processing if not configured
if not header_key or not trusted_ips:
# logger.debug(
# 'AuthProxyMiddleware: AUTH_PROXY_HEADER or AUTH_PROXY_TRUSTED_IPS not configured.'
# )
return None
# Only handle requests to the login page.
# Here the user will be logged in using the proxy headers and redirected to the original page.
if remove_language_code(request.path_info) != self.login_url_path:
# logger.debug(f'AuthProxyMiddleware: not request to login page. Skipping.')
return None
# Get the client IP address.
# Use REMOTE_ADDR as it's the direct connection IP (should be the proxy).
client_ip = request.META.get('REMOTE_ADDR')
# Check if the request comes from a trusted IP
if not client_ip or client_ip not in trusted_ips:
# If the header *is* present but the IP is not trusted, log a warning
# as this might indicate a misconfiguration or security probing.
if header_key in request.META:
logger.warning(
f"AuthProxyMiddleware: Header '{header_key}' received from "
f"untrusted IP '{client_ip}'. Ignoring header."
)
# Not a trusted IP, do nothing.
return None
username = request.META.get(header_key)
email = request.META.get(user_email_key, '') if user_email_key else None
name = request.META.get(user_name_key, '') if user_name_key else None
if not username:
# Trusted IP, but no header. Could mean proxy auth failed upstream.
# Log, but otherwise do nothing.
logger.debug(
f"AuthProxyMiddleware: No username found in header '{header_key}' from "
f"trusted IP '{client_ip}'."
)
return None
# If user is already authenticated and matches the header, do nothing.
if request.user.is_authenticated:
if request.user.get_username() == username:
return None
# Logged in as someone else? This shouldn't usually happen if the
# proxy is forcing the user, but we should log out the old session
# and log in the header user for consistency.
else:
logger.warning(
f"AuthProxyMiddleware: User mismatch. Session user '{request.user.get_username()}' "
f"differs from proxy header user '{username}'. Logging out old user."
)
logout(request)
# Authenticate using our custom backend
user = authenticate(request, username=username, email=email, name=name)
if user:
# Authentication successful, log the user in.
login(request, user)
logger.info(
f"AuthProxyMiddleware: User '{username}' authenticated via header from "
f"trusted IP '{client_ip}'."
)
next_url = request.GET.get('next', reverse('core:dashboard'))
if request.user.is_authenticated:
return redirect(next_url)
else:
# Authentication failed (e.g., user couldn't be found/created by backend)
logger.error(
f"AuthProxyMiddleware: Authentication failed for username '{username}' "
f"from header '{header_key}' (Trusted IP: {client_ip})."
)
# Explicitly clear any potentially lingering user object
request.user = None
return None

View File

@@ -0,0 +1,181 @@
# This file is part of wger Workout Manager.
#
# wger Workout Manager is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# wger Workout Manager is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# Django
from django.contrib.auth import get_user_model
from django.test import (
Client,
TestCase,
override_settings,
)
from django.urls import reverse
User = get_user_model()
TRUSTED_IP = '192.0.2.1'
UNTRUSTED_IP = '198.51.100.5'
PROXY_HEADER_KEY = 'HTTP_X_REMOTE_USER'
PROXY_EMAIL_HEADER_KEY = 'HTTP_X_REMOTE_USER_EMAIL'
PROXY_NAME_HEADER_KEY = 'HTTP_X_REMOTE_USER_NAME'
USERNAME = 'admin'
NEW_USER_VALUE = 'auth_proxy_user'
class AuthProxyMiddlewareTests(TestCase):
fixtures = (
'test-languages',
'gym_config',
)
def setUp(self):
self.client = Client()
self.existing_user = User.objects.create_user(
username=USERNAME,
password='password123',
)
self.protected_url = reverse('core:dashboard')
self.login_url = reverse('core:user:login')
# Helper to make requests with specific IP and header
def make_request(
self,
ip_addr: str,
proxy_header_value: str | None = None,
email_header_value: str | None = None,
name_header_value: str | None = None,
):
headers = {}
if proxy_header_value:
headers[PROXY_HEADER_KEY] = proxy_header_value
if email_header_value:
headers[PROXY_EMAIL_HEADER_KEY] = email_header_value
if name_header_value:
headers[PROXY_NAME_HEADER_KEY] = name_header_value
return self.client.get(self.protected_url, REMOTE_ADDR=ip_addr, follow=True, **headers)
@override_settings(
AUTH_PROXY_TRUSTED_IPS=[TRUSTED_IP],
AUTH_PROXY_HEADER=PROXY_HEADER_KEY,
WGER_SETTINGS={'ALLOW_GUEST_USERS': False},
)
def test_success_trusted_ip_existing_user(self):
response = self.make_request(TRUSTED_IP, USERNAME)
self.assertEqual(response.status_code, 200)
# Check if the correct user is logged into the session
self.assertEqual(int(self.client.session.get('_auth_user_id', 0)), self.existing_user.pk)
@override_settings(
AUTH_PROXY_TRUSTED_IPS=[TRUSTED_IP],
AUTH_PROXY_HEADER=PROXY_HEADER_KEY,
AUTH_PROXY_CREATE_UNKNOWN_USER=True,
AUTH_PROXY_USER_EMAIL_HEADER=PROXY_EMAIL_HEADER_KEY,
AUTH_PROXY_USER_NAME_HEADER=PROXY_NAME_HEADER_KEY,
WGER_SETTINGS={'ALLOW_GUEST_USERS': False},
)
def test_success_trusted_ip_new_user_created(self):
self.assertFalse(User.objects.filter(username=NEW_USER_VALUE).exists())
response = self.make_request(
TRUSTED_IP,
proxy_header_value=NEW_USER_VALUE,
email_header_value='admin@google.com',
name_header_value='Admin User',
)
self.assertEqual(response.status_code, 200)
# Verify the user was created with the correct values
new_user = User.objects.filter(username=NEW_USER_VALUE).first()
self.assertIsNotNone(new_user)
self.assertEqual(new_user.email, 'admin@google.com')
self.assertEqual(new_user.first_name, 'Admin User')
self.assertEqual(int(self.client.session['_auth_user_id']), new_user.pk)
@override_settings(
AUTH_PROXY_HEADER=PROXY_HEADER_KEY,
AUTH_PROXY_TRUSTED_IPS=[TRUSTED_IP],
WGER_SETTINGS={'ALLOW_GUEST_USERS': False},
)
def test_failure_untrusted_ip_header_present(self):
"""Should redirect to login because the middleware shouldn't authenticate"""
response = self.make_request(UNTRUSTED_IP, USERNAME)
self.assertEqual(response.status_code, 200)
self.assertTrue(response.request['PATH_INFO'].startswith(self.login_url))
self.assertNotIn('_auth_user_id', self.client.session)
@override_settings(
AUTH_PROXY_HEADER=PROXY_HEADER_KEY,
AUTH_PROXY_TRUSTED_IPS=[TRUSTED_IP],
WGER_SETTINGS={'ALLOW_GUEST_USERS': False},
)
def test_failure_trusted_ip_header_missing(self):
"""Should redirect to login"""
response = self.make_request(TRUSTED_IP, proxy_header_value=None)
self.assertEqual(response.status_code, 200)
self.assertTrue(response.request['PATH_INFO'].startswith(self.login_url))
self.assertNotIn('_auth_user_id', self.client.session)
@override_settings(
AUTH_PROXY_HEADER=PROXY_HEADER_KEY,
AUTH_PROXY_TRUSTED_IPS=[TRUSTED_IP],
AUTH_PROXY_CREATE_UNKNOWN_USER=False,
WGER_SETTINGS={'ALLOW_GUEST_USERS': False},
)
def test_failure_trusted_ip_new_user_creation_disabled(self):
self.assertFalse(User.objects.filter(username=NEW_USER_VALUE).exists())
response = self.make_request(TRUSTED_IP, NEW_USER_VALUE)
# Should redirect to login
self.assertEqual(response.status_code, 200)
self.assertTrue(response.request['PATH_INFO'].startswith(self.login_url))
# Verify user was NOT created
self.assertFalse(User.objects.filter(username=NEW_USER_VALUE).exists())
self.assertNotIn('_auth_user_id', self.client.session)
@override_settings(
AUTH_PROXY_TRUSTED_IPS=[TRUSTED_IP],
AUTH_PROXY_HEADER='HTTP_X_DIFFERENT_USER',
WGER_SETTINGS={'ALLOW_GUEST_USERS': False},
)
def test_alternate_header_name(self):
# Request using the *correctly* configured header name
response = self.client.get(
self.protected_url,
REMOTE_ADDR=TRUSTED_IP,
HTTP_X_DIFFERENT_USER=USERNAME,
follow=True,
)
self.assertEqual(response.status_code, 200)
self.assertEqual(int(self.client.session['_auth_user_id']), self.existing_user.pk)
# Clear session before next request
self.client.logout()
# Request using the *default/wrong* header name should fail
response_wrong1 = self.client.get(
self.protected_url,
REMOTE_ADDR=TRUSTED_IP,
HTTP_X_REMOTE_USER=USERNAME,
follow=True,
)
response_wrong2 = self.client.get(self.protected_url, follow=True)
self.assertEqual(response_wrong1.status_code, 200)
self.assertEqual(response_wrong2.status_code, 200)
self.assertNotIn('_auth_user_id', self.client.session)

View File

@@ -34,7 +34,6 @@ from wger.core.views import (
weight_units,
)
# sub patterns for languages
patterns_language = [
path(
@@ -68,7 +67,10 @@ patterns_language = [
patterns_user = [
path(
'login',
views.LoginView.as_view(template_name='user/login.html', authentication_form=UserLoginForm),
user.WgerLoginView.as_view(
template_name='user/login.html',
authentication_form=UserLoginForm,
),
name='login',
),
path('logout', user.logout, name='logout'),

View File

@@ -13,7 +13,7 @@
# You should have received a copy of the GNU Affero General Public License
# Django
from django.http import HttpResponseForbidden
from django.contrib.auth.views import redirect_to_login
from django.views.generic import TemplateView
@@ -39,6 +39,6 @@ class ReactView(TemplateView):
Only logged-in users are allowed to access this page
"""
if self.login_required and not request.user.is_authenticated:
return HttpResponseForbidden('You are not allowed to access this page')
return redirect_to_login(request.path)
return super().dispatch(request, *args, **kwargs)

View File

@@ -17,6 +17,15 @@
# Standard Library
import logging
# Third Party
from crispy_forms.helper import FormHelper
from crispy_forms.layout import (
ButtonHolder,
Column,
Layout,
Row,
Submit,
)
# Django
from django.conf import settings
from django.contrib import messages
@@ -44,6 +53,7 @@ from django.http import (
)
from django.shortcuts import (
get_object_or_404,
redirect,
render,
)
from django.template.context_processors import csrf
@@ -62,16 +72,6 @@ from django.views.generic import (
RedirectView,
UpdateView,
)
# Third Party
from crispy_forms.helper import FormHelper
from crispy_forms.layout import (
ButtonHolder,
Column,
Layout,
Row,
Submit,
)
from django_email_verification import send_email
from rest_framework.authtoken.models import Token
@@ -81,7 +81,6 @@ from wger.core.forms import (
PasswordConfirmationForm,
RegistrationForm,
RegistrationFormNoCaptcha,
UserLoginForm,
UserPersonalInformationForm,
UserPreferencesForm,
)
@@ -104,7 +103,6 @@ from wger.utils.generic_views import (
from wger.utils.language import load_language
from wger.weight.models import WeightEntry
logger = logging.getLogger(__name__)
@@ -536,8 +534,8 @@ class UserDetailView(LoginRequiredMixin, WgerMultiplePermissionRequiredMixin, De
)
context['routine_data'] = out
context['weight_entries'] = WeightEntry.objects.filter(user=self.object).order_by('-date')[
:5
]
:5
]
context['nutrition_plans'] = NutritionPlan.objects.filter(user=self.object).order_by(
'-creation_date'
)[:5]
@@ -648,3 +646,18 @@ def confirm_email(request):
)
return HttpResponseRedirect(reverse('core:dashboard'))
class WgerLoginView(LoginView):
"""
If the user is already logged in and there's a "next" parameter in the URL,
redirect there. Otherwise, proceed with the normal login logic from Django
"""
def dispatch(self, request, *args, **kwargs):
next_url = request.GET.get('next', reverse('core:dashboard'))
if request.user.is_authenticated:
return redirect(next_url)
# Proceed with the normal login page logic
return super().dispatch(request, *args, **kwargs)

View File

@@ -30,7 +30,6 @@ from wger.nutrition.views import (
unit_ingredient,
)
# sub patterns for nutritional plans
patterns_plan = [
path(
@@ -136,7 +135,7 @@ patterns_unit_ingredient = [
patterns_bmi = [
path(
'',
ReactView.as_view(),
ReactView.as_view(login_required=True),
name='view',
),
]

View File

@@ -23,7 +23,6 @@ from datetime import timedelta
from wger.utils.constants import DOWNLOAD_INGREDIENT_WGER
from wger.version import get_version
"""
This file contains the global settings that don't usually need to be changed.
For a full list of options, visit:
@@ -127,6 +126,9 @@ MIDDLEWARE = [
# Django Admin
'django.contrib.auth.middleware.AuthenticationMiddleware',
# Auth proxy middleware
'wger.core.middleware.AuthProxyHeaderMiddleware',
# Javascript Header. Sends helper headers for AJAX
'wger.utils.middleware.JavascriptAJAXRedirectionMiddleware',
@@ -151,6 +153,8 @@ MIDDLEWARE = [
AUTHENTICATION_BACKENDS = (
'axes.backends.AxesStandaloneBackend', # should be the first one in the list
'wger.core.backends.AuthProxyUserBackend',
'django.contrib.auth.backends.ModelBackend',
'wger.utils.helpers.EmailAuthBackend',
)
@@ -551,6 +555,18 @@ WGER_SETTINGS = {
'WGER_INSTANCE': 'https://wger.de',
}
#
# Auth Proxy Authentication
#
# Please read the documentation before enabling this feature:
# https://wger.readthedocs.io/en/latest/administration/auth_proxy.html
#
AUTH_PROXY_HEADER = ''
AUTH_PROXY_USER_EMAIL_HEADER = ''
AUTH_PROXY_USER_NAME_HEADER = ''
AUTH_PROXY_TRUSTED_IPS = []
AUTH_PROXY_CREATE_UNKNOWN_USER = False
#
# Prometheus metrics
#

View File

@@ -36,8 +36,8 @@ def processor(request):
# yapf: disable
context = {
'mastodon': settings.WGER_SETTINGS['MASTODON'],
'twitter': settings.WGER_SETTINGS['TWITTER'],
'mastodon': settings.WGER_SETTINGS.get('MASTODON', ''),
'twitter': settings.WGER_SETTINGS.get('TWITTER', ''),
# Languages
'i18n_language':

View File

@@ -21,6 +21,7 @@ import json
import logging
import os
import random
import re
import string
from decimal import Decimal
from functools import wraps
@@ -34,7 +35,6 @@ from django.shortcuts import get_object_or_404
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
logger = logging.getLogger(__name__)
@@ -192,3 +192,12 @@ class BaseImage:
if not generate_uuid:
image.uuid = json_data['uuid']
return image
def remove_language_code(path):
"""
Removes optional language code at the start of a path
"""
pattern = r'^/[a-z]{2}(?=/)'
return re.sub(pattern, '', path)

View File

@@ -0,0 +1,33 @@
# This file is part of wger Workout Manager.
#
# wger Workout Manager is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# wger Workout Manager is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# Django
from django.test import TestCase
# wger
from wger.utils.helpers import remove_language_code
class TestRemoveLanguageCode(TestCase):
def test_remove_code(self):
self.assertEqual(
remove_language_code('/de/some/url/'),
'/some/url/',
)
def test_no_language(self):
self.assertEqual(
remove_language_code('/api/v2/endpoint'),
'/api/v2/endpoint',
)