Source code for swh.auth.django.utils

# Copyright (C) 2020-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from uuid import UUID

from django.conf import settings
from django.contrib.auth.models import Group
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse

from swh.auth.django.models import OIDCUser
from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect


[docs] def keycloak_uuid_to_django_id(keycloak_uuid: str) -> int: """Convert keycloak user uuid to its django integer id. Args: keycloak_uuid: UUID identifier of a Keycloak user Returns: Django integer identifier for the user """ return UUID(keycloak_uuid).int
[docs] def django_id_to_keycloak_uuid(django_id: int) -> str: """Convert django user integer id to its keycloak uuid. Args: django_id: Integer identifier of a django user Returns: Keycloak UUID identifier for the user """ return str(UUID(int=django_id))
[docs] def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = keycloak_uuid_to_django_id(decoded_token["sub"]) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token.get("preferred_username", ""), password="", first_name=decoded_token.get("given_name", ""), last_name=decoded_token.get("family_name", ""), email=decoded_token.get("email", ""), ) # process keycloak groups group_names = set() if "groups" in decoded_token: # set is_staff user property based on group membership user.is_staff = "/staff" in decoded_token["groups"] for group_name in decoded_token["groups"]: # remove leading slash added by keycloak to group name django_group_name = group_name.lstrip("/") # ensure a corresponding django group exist Group.objects.get_or_create(name=django_group_name) group_names.add(django_group_name) realm_access = decoded_token.get("realm_access", {}) permissions = realm_access.get("roles", []) if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions += client_resource_access.get("roles", []) # set user permissions and filter out default keycloak realm roles user.permissions = set(permissions) - {"offline_access", "uma_authorization"} # set user groups user.group_names = group_names # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user
[docs] def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token try: access_token = oidc_profile["access_token"] decoded_token = oidc_client.decode_token(access_token) # access token has expired except ExpiredSignatureError: # get a new access token from authentication provider oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user
[docs] def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str: return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}"
[docs] def keycloak_oidc_client() -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class from the following django settings: * SWH_AUTH_SERVER_URL * SWH_AUTH_REALM_NAME * SWH_AUTH_CLIENT_ID Returns: An object to ease the interaction with the Keycloak server Raises: ValueError: at least one mandatory django setting is not set """ server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None) realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None) client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None) if server_url is None or realm_name is None or client_id is None: raise ValueError( "SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django " "settings are mandatory to instantiate KeycloakOpenIDConnect class" ) return KeycloakOpenIDConnect( server_url=server_url, realm_name=realm_name, client_id=client_id )
[docs] def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[Dict[str, Any]] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url