Source code for swh.auth.django.utils

# Copyright (C) 2020-2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

from __future__ import annotations

from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID

from django.conf import settings
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse

from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect

if TYPE_CHECKING:
    from swh.auth.django.models import OIDCUser


[docs] def keycloak_uuid_to_django_id(keycloak_uuid: str) -> int: """Convert keycloak user uuid to its django integer id. Args: keycloak_uuid: UUID identifier of a Keycloak user Returns: Django integer identifier for the user """ return UUID(keycloak_uuid).int
[docs] def django_id_to_keycloak_uuid(django_id: int) -> str: """Convert django user integer id to its keycloak uuid. Args: django_id: Integer identifier of a django user Returns: Keycloak UUID identifier for the user """ return str(UUID(int=django_id))
[docs] def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ from django.contrib.auth.models import Group from swh.auth.django.models import OIDCUser # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = keycloak_uuid_to_django_id(decoded_token["sub"]) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token.get("preferred_username", ""), password="", first_name=decoded_token.get("given_name", ""), last_name=decoded_token.get("family_name", ""), email=decoded_token.get("email", ""), ) # process keycloak groups group_names = set() if "groups" in decoded_token: # set is_staff user property based on group membership user.is_staff = "/staff" in decoded_token["groups"] for group_name in decoded_token["groups"]: # remove leading slash added by keycloak to group name django_group_name = group_name.lstrip("/") # ensure a corresponding django group exist Group.objects.get_or_create(name=django_group_name) group_names.add(django_group_name) realm_access = decoded_token.get("realm_access", {}) permissions = realm_access.get("roles", []) if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions += client_resource_access.get("roles", []) # set user permissions and filter out default keycloak realm roles user.permissions = set(permissions) - {"offline_access", "uma_authorization"} # set user groups user.group_names = group_names # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user
[docs] def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token try: access_token = oidc_profile["access_token"] decoded_token = oidc_client.decode_token(access_token) # access token has expired except ExpiredSignatureError: # get a new access token from authentication provider oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user
[docs] def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str: return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}"
[docs] def keycloak_oidc_client() -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class from the following django settings: * SWH_AUTH_SERVER_URL * SWH_AUTH_REALM_NAME * SWH_AUTH_CLIENT_ID Returns: An object to ease the interaction with the Keycloak server Raises: ValueError: at least one mandatory django setting is not set """ server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None) realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None) client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None) if server_url is None or realm_name is None or client_id is None: raise ValueError( "SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django " "settings are mandatory to instantiate KeycloakOpenIDConnect class" ) return KeycloakOpenIDConnect( server_url=server_url, realm_name=realm_name, client_id=client_id )
[docs] def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[Dict[str, Any]] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url