mirror of
https://github.com/rommapp/romm.git
synced 2026-02-18 23:42:07 +01:00
359 lines
12 KiB
Python
359 lines
12 KiB
Python
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
from authlib.integrations.starlette_client.apps import StarletteOAuth2App
|
|
from fastapi import HTTPException
|
|
from joserfc.jwt import Token
|
|
|
|
from handler.auth.base_handler import OpenIDHandler
|
|
from models.user import Role
|
|
|
|
# Mock constants
|
|
OIDC_SERVER_APPLICATION_URL = "http://mock-oidc-server"
|
|
OIDC_ENABLED = True
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_oidc_disabled(mocker):
|
|
mocker.patch("handler.auth.base_handler.OIDC_ENABLED", False)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_oidc_enabled(mocker):
|
|
mocker.patch("handler.auth.base_handler.OIDC_ENABLED", True)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_token():
|
|
return {
|
|
"access_token": "",
|
|
"token_type": "Bearer",
|
|
"expires_in": 300,
|
|
"id_token": "",
|
|
"expires_at": 1735397872,
|
|
"userinfo": {
|
|
"iss": "http://localhost:9000/application/o/romm/",
|
|
"sub": "",
|
|
"aud": "",
|
|
"exp": 1735397871,
|
|
"iat": 1735397571,
|
|
"auth_time": 1735397571,
|
|
"acr": "goauthentik.io/providers/oauth2/default",
|
|
"amr": ["pwd"],
|
|
"nonce": "",
|
|
"sid": "",
|
|
"email": "test@example.com",
|
|
"email_verified": True,
|
|
"name": "Test User",
|
|
"given_name": "Test User",
|
|
"preferred_username": "testuser",
|
|
"nickname": "testuser",
|
|
"groups": ["Default Users"],
|
|
},
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_openid_configuration():
|
|
return {
|
|
"issuer": "https://authentik.example.com/application/o/romm/",
|
|
"authorization_endpoint": "https://authentik.example.com/application/o/authorize/",
|
|
"token_endpoint": "https://authentik.example.com/application/o/token/",
|
|
"userinfo_endpoint": "https://authentik.example.com/application/o/userinfo/",
|
|
"end_session_endpoint": "https://authentik.example.com/application/o/romm/end-session/",
|
|
"introspection_endpoint": "https://authentik.example.com/application/o/introspect/",
|
|
"revocation_endpoint": "https://authentik.example.com/application/o/revoke/",
|
|
"device_authorization_endpoint": "https://authentik.example.com/application/o/device/",
|
|
"response_types_supported": [
|
|
"code",
|
|
"id_token",
|
|
"id_token token",
|
|
"code token",
|
|
"code id_token",
|
|
"code id_token token",
|
|
],
|
|
"response_modes_supported": ["query", "fragment", "form_post"],
|
|
"jwks_uri": "https://authentik.example.com/application/o/romm/jwks/",
|
|
"grant_types_supported": [
|
|
"authorization_code",
|
|
"refresh_token",
|
|
"implicit",
|
|
"client_credentials",
|
|
"password",
|
|
"urn:ietf:params:oauth:grant-type:device_code",
|
|
],
|
|
"id_token_signing_alg_values_supported": ["RS256"],
|
|
"subject_types_supported": ["public"],
|
|
"token_endpoint_auth_methods_supported": [
|
|
"client_secret_post",
|
|
"client_secret_basic",
|
|
],
|
|
"acr_values_supported": ["goauthentik.io/providers/oauth2/default"],
|
|
"scopes_supported": ["openid", "email", "profile"],
|
|
"request_parameter_supported": False,
|
|
"claims_supported": [
|
|
"sub",
|
|
"iss",
|
|
"aud",
|
|
"exp",
|
|
"iat",
|
|
"auth_time",
|
|
"acr",
|
|
"amr",
|
|
"nonce",
|
|
"email",
|
|
"email_verified",
|
|
"name",
|
|
"given_name",
|
|
"preferred_username",
|
|
"nickname",
|
|
"groups",
|
|
],
|
|
"claims_parameter_supported": False,
|
|
"code_challenge_methods_supported": ["plain", "S256"],
|
|
}
|
|
|
|
|
|
async def test_oidc_disabled(mock_oidc_disabled, mock_token):
|
|
"""Test that OIDC is disabled."""
|
|
oidc_handler = OpenIDHandler()
|
|
user, userinfo = await oidc_handler.get_current_active_user_from_openid_token(
|
|
mock_token
|
|
)
|
|
assert user is None
|
|
assert userinfo is None
|
|
|
|
|
|
async def test_oidc_valid_token_decoding(
|
|
mocker, mock_oidc_enabled, mock_token, mock_openid_configuration
|
|
):
|
|
"""Test token decoding with valid RSA key and token."""
|
|
mock_jwt_payload = Token(
|
|
header={"alg": "RS256"},
|
|
claims={"iss": OIDC_SERVER_APPLICATION_URL, "email": "test@example.com"},
|
|
)
|
|
mock_user = MagicMock(enabled=True, role=Role.VIEWER)
|
|
mocker.patch(
|
|
"handler.database.db_user_handler.get_user_by_email", return_value=mock_user
|
|
)
|
|
mocker.patch.object(
|
|
StarletteOAuth2App,
|
|
"load_server_metadata",
|
|
return_value=mock_openid_configuration,
|
|
)
|
|
|
|
oidc_handler = OpenIDHandler()
|
|
user, userinfo = await oidc_handler.get_current_active_user_from_openid_token(
|
|
mock_token
|
|
)
|
|
|
|
assert user is not None
|
|
assert userinfo is not None
|
|
|
|
assert user == mock_user
|
|
assert userinfo.get("email") == mock_jwt_payload.claims.get("email")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"config_override,token_roles,romm_role",
|
|
[
|
|
# Without OIDC role mapping.
|
|
[{"OIDC_CLAIM_ROLES": ""}, ["admin", "editor"], Role.VIEWER],
|
|
# OIDC role mapping.
|
|
[{}, ["admin", "editor", "viewer"], Role.ADMIN],
|
|
[{}, ["editor", "viewer"], Role.EDITOR],
|
|
[{}, ["viewer"], Role.VIEWER],
|
|
# OIDC viewer role fallback.
|
|
[{"OIDC_ROLE_VIEWER": "*"}, [], Role.VIEWER],
|
|
],
|
|
ids=["no-mapping", "admin-role", "editor-role", "viewer-role", "viewer-fallback"],
|
|
)
|
|
async def test_oidc_valid_add_user(
|
|
mocker,
|
|
mock_oidc_enabled,
|
|
mock_token,
|
|
mock_openid_configuration,
|
|
config_override,
|
|
token_roles,
|
|
romm_role,
|
|
):
|
|
"""Test user creation & role assignment on login."""
|
|
mocker.patch(
|
|
"handler.auth.base_handler.OIDC_CLAIM_ROLES",
|
|
config_override.get("OIDC_CLAIM_ROLES", "roles"),
|
|
)
|
|
mocker.patch(
|
|
"handler.auth.base_handler.OIDC_ROLE_ADMIN",
|
|
config_override.get("OIDC_ROLE_ADMIN", "admin"),
|
|
)
|
|
mocker.patch(
|
|
"handler.auth.base_handler.OIDC_ROLE_EDITOR",
|
|
config_override.get("OIDC_ROLE_EDITOR", "editor"),
|
|
)
|
|
mocker.patch(
|
|
"handler.auth.base_handler.OIDC_ROLE_VIEWER",
|
|
config_override.get("OIDC_ROLE_VIEWER", "viewer"),
|
|
)
|
|
mock_token["userinfo"]["roles"] = token_roles
|
|
mock_user = MagicMock(enabled=True, role=Role.VIEWER)
|
|
mock_add_user = mocker.patch(
|
|
"handler.database.db_user_handler.add_user", return_value=mock_user
|
|
)
|
|
mocker.patch.object(
|
|
StarletteOAuth2App,
|
|
"load_server_metadata",
|
|
return_value=mock_openid_configuration,
|
|
)
|
|
|
|
oidc_handler = OpenIDHandler()
|
|
await oidc_handler.get_current_active_user_from_openid_token(mock_token)
|
|
|
|
mock_add_user.assert_called_once()
|
|
assert (
|
|
mock_add_user.call_args.args[0].username
|
|
== mock_token["userinfo"]["preferred_username"]
|
|
)
|
|
assert mock_add_user.call_args.args[0].email == mock_token["userinfo"]["email"]
|
|
assert mock_add_user.call_args.args[0].enabled
|
|
assert mock_add_user.call_args.args[0].role == romm_role
|
|
|
|
|
|
async def test_oidc_valid_edit_user_role(
|
|
mocker,
|
|
mock_oidc_enabled,
|
|
mock_token,
|
|
mock_openid_configuration,
|
|
):
|
|
"""Test role change for existing user on login based on OIDC role mapping."""
|
|
mocker.patch("handler.auth.base_handler.OIDC_CLAIM_ROLES", "roles")
|
|
mocker.patch("handler.auth.base_handler.OIDC_ROLE_ADMIN", "admin")
|
|
mock_token["userinfo"]["roles"] = ["admin"]
|
|
mock_user = MagicMock(enabled=True, role=Role.VIEWER)
|
|
mocker.patch(
|
|
"handler.database.db_user_handler.get_user_by_email", return_value=mock_user
|
|
)
|
|
mock_user_edited = MagicMock(enabled=True, role=Role.ADMIN)
|
|
mock_edit_user = mocker.patch(
|
|
"handler.database.db_user_handler.update_user", return_value=mock_user_edited
|
|
)
|
|
mocker.patch.object(
|
|
StarletteOAuth2App,
|
|
"load_server_metadata",
|
|
return_value=mock_openid_configuration,
|
|
)
|
|
|
|
oidc_handler = OpenIDHandler()
|
|
user, _ = await oidc_handler.get_current_active_user_from_openid_token(mock_token)
|
|
|
|
assert user == mock_user_edited
|
|
mock_edit_user.assert_called_once_with(mock_user.id, {"role": Role.ADMIN})
|
|
|
|
|
|
async def test_oidc_valid_no_edit_user_role_if_mapping_disabled(
|
|
mocker,
|
|
mock_oidc_enabled,
|
|
mock_token,
|
|
mock_openid_configuration,
|
|
):
|
|
"""Test that role is not changed for existing user on login if OIDC role mapping is disabled."""
|
|
mock_token["userinfo"]["roles"] = ["admin"]
|
|
mock_user = MagicMock(enabled=True, role=Role.EDITOR)
|
|
mocker.patch(
|
|
"handler.database.db_user_handler.get_user_by_email", return_value=mock_user
|
|
)
|
|
mock_edit_user = mocker.patch(
|
|
"handler.database.db_user_handler.update_user", return_value=mock_user
|
|
)
|
|
mocker.patch.object(
|
|
StarletteOAuth2App,
|
|
"load_server_metadata",
|
|
return_value=mock_openid_configuration,
|
|
)
|
|
|
|
oidc_handler = OpenIDHandler()
|
|
user, _ = await oidc_handler.get_current_active_user_from_openid_token(mock_token)
|
|
|
|
mock_edit_user.assert_not_called()
|
|
|
|
|
|
async def test_oidc_invalid_user_no_roles(
|
|
mocker,
|
|
mock_oidc_enabled,
|
|
mock_token,
|
|
mock_openid_configuration,
|
|
):
|
|
"""Test valid token response for user with no roles/access to this application."""
|
|
mocker.patch("handler.auth.base_handler.OIDC_CLAIM_ROLES", "roles")
|
|
mock_token["userinfo"]["roles"] = ["not-mapped"]
|
|
mocker.patch.object(
|
|
StarletteOAuth2App,
|
|
"load_server_metadata",
|
|
return_value=mock_openid_configuration,
|
|
)
|
|
|
|
oidc_handler = OpenIDHandler()
|
|
with pytest.raises(HTTPException, match="has not been granted any roles"):
|
|
await oidc_handler.get_current_active_user_from_openid_token(mock_token)
|
|
|
|
|
|
async def test_oidc_token_unverified_email(
|
|
mocker, mock_oidc_enabled, mock_token, mock_openid_configuration
|
|
):
|
|
"""Test token decoding for unverified email."""
|
|
mocker.patch.object(
|
|
StarletteOAuth2App,
|
|
"load_server_metadata",
|
|
return_value=mock_openid_configuration,
|
|
)
|
|
|
|
unverified_token = mock_token
|
|
unverified_token["userinfo"]["email_verified"] = False
|
|
|
|
oidc_handler = OpenIDHandler()
|
|
with pytest.raises(HTTPException):
|
|
await oidc_handler.get_current_active_user_from_openid_token(unverified_token)
|
|
|
|
|
|
async def test_oidc_token_without_email_verified_claim(
|
|
mocker, mock_oidc_enabled, mock_token, mock_openid_configuration
|
|
):
|
|
"""Test token decoding with server not supporting email_verified claim."""
|
|
mock_jwt_payload = Token(
|
|
header={"alg": "RS256"},
|
|
claims={"iss": OIDC_SERVER_APPLICATION_URL, "email": "test@example.com"},
|
|
)
|
|
mock_user = MagicMock(enabled=True, role=Role.VIEWER)
|
|
mocker.patch(
|
|
"handler.database.db_user_handler.get_user_by_email", return_value=mock_user
|
|
)
|
|
|
|
openid_conf = mock_openid_configuration
|
|
openid_conf["claims_supported"] = openid_conf["claims_supported"].remove(
|
|
"email_verified"
|
|
)
|
|
mocker.patch.object(
|
|
StarletteOAuth2App, "load_server_metadata", return_value=openid_conf
|
|
)
|
|
|
|
unverified_token = mock_token
|
|
del unverified_token["userinfo"]["email_verified"]
|
|
|
|
oidc_handler = OpenIDHandler()
|
|
user, userinfo = await oidc_handler.get_current_active_user_from_openid_token(
|
|
mock_token
|
|
)
|
|
|
|
assert user is not None
|
|
assert userinfo is not None
|
|
|
|
assert user == mock_user
|
|
assert userinfo.get("email") == mock_jwt_payload.claims.get("email")
|
|
|
|
|
|
async def test_oidc_invalid_token_signature(mock_oidc_enabled):
|
|
"""Test token decoding raises exception for invalid signature."""
|
|
oidc_handler = OpenIDHandler()
|
|
token = {"id_token": "invalid_signature_token"}
|
|
with pytest.raises(HTTPException):
|
|
await oidc_handler.get_current_active_user_from_openid_token(token)
|