Source code for swh.auth.django.utils

# Copyright (C) 2020-2021  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

from datetime import datetime, timedelta
from typing import Any, Dict, Optional

from django.conf import settings
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse

from swh.auth.django.models import OIDCUser
from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect

[docs]def oidc_user_from_decoded_token( decoded_token: Dict[str, Any], client_id: Optional[str] = None ) -> OIDCUser: """Create an OIDCUser out of a decoded token Args: decoded_token: Decoded token Dict client_id: Optional client id of the keycloak client instance used to decode the token. If not provided, the permissions will be empty. Returns: The OIDCUser instance """ # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token.get("preferred_username", ""), password="", first_name=decoded_token.get("given_name", ""), last_name=decoded_token.get("family_name", ""), email=decoded_token.get("email", ""), ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] realm_access = decoded_token.get("realm_access", {}) permissions = realm_access.get("roles", []) if client_id: # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) client_resource_access = resource_access.get(client_id, {}) permissions += client_resource_access.get("roles", []) # set user permissions and filter out default keycloak realm roles user.permissions = set(permissions) - {"offline_access", "uma_authorization"} # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user
[docs]def oidc_user_from_profile( oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any] ) -> OIDCUser: """Initialize an OIDCUser out of an oidc profile dict. Args: oidc_client: KeycloakOpenIDConnect used to discuss with keycloak oidc_profile: OIDC profile retrieved once connected to keycloak Returns: OIDCUser instance parsed out of the token received. """ # decode JWT token try: access_token = oidc_profile["access_token"] decoded_token = oidc_client.decode_token(access_token) # access token has expired except ExpiredSignatureError: # get a new access token from authentication provider oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user
[docs]def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str: return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}"
[docs]def keycloak_oidc_client() -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class from the following django settings: * SWH_AUTH_SERVER_URL * SWH_AUTH_REALM_NAME * SWH_AUTH_CLIENT_ID Returns: An object to ease the interaction with the Keycloak server Raises: ValueError: at least one mandatory django setting is not set """ server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None) realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None) client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None) if server_url is None or realm_name is None or client_id is None: raise ValueError( "SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django " "settings are mandatory to instantiate KeycloakOpenIDConnect class" ) return KeycloakOpenIDConnect( server_url=server_url, realm_name=realm_name, client_id=client_id )
[docs]def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[Dict[str, Any]] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url