Source code for swh.deposit.auth
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Optional
from django.core.cache import cache
from django.utils import timezone
from rest_framework import status
from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.permissions import BasePermission
from sentry_sdk import capture_exception
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import oidc_user_from_profile
from swh.auth.keycloak import (
KeycloakError,
KeycloakOpenIDConnect,
keycloak_error_message,
)
from swh.deposit.errors import UNAUTHORIZED, make_error_response
from swh.deposit.models import DepositClient
logger = logging.getLogger(__name__)
OIDC_DEPOSIT_CLIENT_ID = "swh-deposit"
DEPOSIT_PERMISSION = "swh.deposit.api"
[docs]
def convert_response(request, content):
"""Convert response from drf's basic authentication mechanism to a
swh-deposit one.
Args:
request (Request): Use to build the response
content (bytes): The drf's answer
Returns:
Response with the same status error as before, only the
body is now an swh-deposit compliant one.
"""
from json import loads
content = loads(content.decode("utf-8"))
detail = content.get("detail")
if detail:
verbose_description = "API is protected by basic authentication"
else:
detail = "API is protected by basic authentication"
verbose_description = None
response = make_error_response(
request, UNAUTHORIZED, summary=detail, verbose_description=verbose_description
)
response["WWW-Authenticate"] = 'Basic realm=""'
return response
[docs]
class WrapBasicAuthenticationResponseMiddleware:
"""Middleware to capture potential authentication error and convert
them to standard deposit response.
This is to be installed in django's settings.py module.
"""
def __init__(self, get_response):
super().__init__()
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
if response.status_code is status.HTTP_401_UNAUTHORIZED:
content_type = response.get("content-type")
if content_type == "application/json":
return convert_response(request, response.content)
return response
[docs]
class HasDepositPermission(BasePermission):
"""Allows access to authenticated users with the DEPOSIT_PERMISSION."""
[docs]
def has_permission(self, request, view):
assert isinstance(request.user, DepositClient)
return request.user.oidc_user.has_perm(DEPOSIT_PERMISSION)
[docs]
class KeycloakBasicAuthentication(BasicAuthentication):
"""Keycloack authentication against username/password.
Deposit users will continue sending `Basic authentication` queries to the deposit
server. Transparently, the deposit server will stop authenticate itself the users.
It will delegate the authentication queries to the keycloak instance.
Technically, reuses :class:`rest_framework.BasicAuthentication` and overrides the
func:`authenticate_credentials` method to discuss with keycloak.
As an implementation detail, this also uses the django cache mechanism to avoid too
many authentication request to keycloak.
"""
_client: Optional[KeycloakOpenIDConnect] = None
@property
def client(self):
if self._client is None:
self._client = KeycloakOpenIDConnect.from_configfile(
client_id=OIDC_DEPOSIT_CLIENT_ID
)
return self._client
def _cache_key(self, user_id: str) -> str:
"""Internal key to use to store user id token."""
return f"oidc_user_{self.client.realm_name}_{self.client.client_id}_{user_id}"
[docs]
def get_user(self, user_id: str) -> Optional[OIDCUser]:
"""Retrieve user from cache if any."""
oidc_profile = cache.get(self._cache_key(user_id))
if oidc_profile:
try:
return oidc_user_from_profile(self.client, oidc_profile)
except Exception as e:
logger.warning("Error during cache token retrieval: %s", e)
capture_exception(e)
return None
[docs]
def authenticate_credentials(self, user_id, password, request):
"""Authenticate the user_id/password against keycloak.
Raises:
AuthenticationFailed in case of authentication failure
Returns:
Tuple of deposit_client, None.
"""
try:
oidc_profile = self.client.login(user_id, password)
except KeycloakError as e:
logger.debug("KeycloakError: e: %s", e)
error_msg = keycloak_error_message(e)
raise AuthenticationFailed(error_msg)
oidc_user = oidc_user_from_profile(self.client, oidc_profile)
ttl = int(oidc_user.refresh_expires_at.timestamp() - timezone.now().timestamp())
# Making sure the associated deposit client is correctly configured in backend
try:
deposit_client = DepositClient.objects.get(username=user_id)
except DepositClient.DoesNotExist:
raise AuthenticationFailed(f"Unknown user {user_id}")
if not deposit_client.is_active:
raise AuthenticationFailed(f"Deactivated user {user_id}")
deposit_client.oidc_user = oidc_user
if ttl:
# cache the oidc_profile user while it's valid
cache.set(
self._cache_key(user_id),
oidc_profile,
timeout=max(0, ttl),
)
return (deposit_client, None)