import logging
from abc import ABC, abstractmethod
import jwt
from fastapi.security import HTTPAuthorizationCredentials, SecurityScopes
from pydantic import HttpUrl
from lomas_core.constants import Scopes
from lomas_core.error_handler import UnauthorizedAccessException
from lomas_core.models.collections import UserId
from lomas_server.constants import KCAttributeNames
logger = logging.getLogger(__name__)
[docs]
class UserAuthenticator(ABC):
"""Abstract base class for providing user authentification methods."""
[docs]
@abstractmethod
def get_user_id(
self,
security_scopes: SecurityScopes,
auth_creds: HTTPAuthorizationCredentials,
) -> UserId:
"""Extracts user id from bearer token.
Args:
security_scopes (SecurityScopes): The required scopes for the endpoint.
auth_creds (HTTPAuthorizationCredentials): Authorization credentials.
Returns:
UserId: The UserId object containing user infos.
"""
[docs]
class FreePassAuthenticator(UserAuthenticator):
"""Authenticator class that simply extracts user information from.
the provided bearer.
! No verification is performed!
"""
[docs]
def get_user_id(
self,
security_scopes: SecurityScopes,
auth_creds: HTTPAuthorizationCredentials,
) -> UserId:
"""Parses the HTTP bearer token as a json string to construct a UserId.
!Does NOT perform any verification!
Args:
security_scopes (SecurityScopes): The required scopes for the endpoint.
auth_creds (HTTPAuthorizationCredentials): Authorization credentials.
Returns:
UserId: The parsed UserId.
"""
try:
if Scopes.ADMIN in security_scopes.scopes:
# Admins don't come with proper user id, so we create a dummy one.
user = UserId(name="admin", email="admin@example.com")
else:
user = UserId.model_validate_json(auth_creds.credentials)
except Exception as e:
raise UnauthorizedAccessException("Failed bearer token verification.") from e
logger.debug(f"Authenticated user {user.name}")
return user
[docs]
class JWTAuthenticator(UserAuthenticator):
"""Authenticator class that identifies users by validating the provided JWT token."""
def __init__(self, keycloak_url: HttpUrl, realm: str) -> None:
"""Constructor method.
Initializes instance PyJWKClient with caching.
Args:
keycloak_address (str): The keycloak address for this app instance.
keycloak_port (int): The keycloak port
keycloak_use_tls (str): Whether to use tls or not for interacting with keycloak.
realm (str): The realm name for this app instance.
"""
self.jwk_client = jwt.PyJWKClient(
f"{keycloak_url}/realms/{realm}/protocol/openid-connect/certs",
cache_keys=True,
)
[docs]
def get_user_id(
self,
security_scopes: SecurityScopes,
auth_creds: HTTPAuthorizationCredentials,
) -> UserId:
"""Parses the JWT bearer token to construct a UserId.
The JWT is verified against the certificates provided by the Id Provider.
! Does not verify scopes yet !
Args:
security_scopes (SecurityScopes): The required scopes for the endpoint.
auth_creds (HTTPAuthorizationCredentials): Authorization credentials.
Returns:
UserId: The parsed UserId.
"""
try:
# Extracts kid from JWT and fetches corresponding key from keycloak (or cache).
key = self.jwk_client.get_signing_key_from_jwt(auth_creds.credentials)
# Decodes and validates JWT
token_content = jwt.decode(auth_creds.credentials, key=key)
if Scopes.ADMIN in security_scopes.scopes:
# We use only one generic admin for now
if (
token_content["client_id"] != "lomas_admin"
): # TODO need to add admin role/scope see issue 399
raise UnauthorizedAccessException("Only admin user can query this endpoint.")
user = UserId(name="admin", email="noemailexample.com")
else:
user = UserId(
name=token_content[KCAttributeNames.USER_NAME],
email=token_content[KCAttributeNames.USER_EMAIL],
)
except UnauthorizedAccessException as e:
raise e
except Exception as e:
# TODO problematic to add e into error message to client?
raise UnauthorizedAccessException("Failed bearer token verification.") from e
logger.debug(f"Authenticated user {user.name}")
return user