# Copyright (C) 2020-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID
from django.conf import settings
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse
from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect
if TYPE_CHECKING:
from swh.auth.django.models import OIDCUser
[docs]
def keycloak_uuid_to_django_id(keycloak_uuid: str) -> int:
"""Convert keycloak user uuid to its django integer id.
Args:
keycloak_uuid: UUID identifier of a Keycloak user
Returns:
Django integer identifier for the user
"""
return UUID(keycloak_uuid).int
[docs]
def django_id_to_keycloak_uuid(django_id: int) -> str:
"""Convert django user integer id to its keycloak uuid.
Args:
django_id: Integer identifier of a django user
Returns:
Keycloak UUID identifier for the user
"""
return str(UUID(int=django_id))
[docs]
def oidc_user_from_decoded_token(
decoded_token: Dict[str, Any], client_id: Optional[str] = None
) -> OIDCUser:
"""Create an OIDCUser out of a decoded token
Args:
decoded_token: Decoded token Dict
client_id: Optional client id of the keycloak client instance used to decode
the token. If not provided, the permissions will be empty.
Returns:
The OIDCUser instance
"""
from django.contrib.auth.models import Group
from swh.auth.django.models import OIDCUser
# compute an integer user identifier for Django User model
# by concatenating all groups of the UUID4 user identifier
# generated by Keycloak and converting it from hex to decimal
user_id = keycloak_uuid_to_django_id(decoded_token["sub"])
# create a Django user that will not be saved to database
user = OIDCUser(
id=user_id,
username=decoded_token.get("preferred_username", ""),
password="",
first_name=decoded_token.get("given_name", ""),
last_name=decoded_token.get("family_name", ""),
email=decoded_token.get("email", ""),
)
# process keycloak groups
group_names = set()
if "groups" in decoded_token:
# set is_staff user property based on group membership
user.is_staff = "/staff" in decoded_token["groups"]
for group_name in decoded_token["groups"]:
# remove leading slash added by keycloak to group name
django_group_name = group_name.lstrip("/")
# ensure a corresponding django group exist
Group.objects.get_or_create(name=django_group_name)
group_names.add(django_group_name)
realm_access = decoded_token.get("realm_access", {})
permissions = realm_access.get("roles", [])
if client_id:
# extract user permissions if any
resource_access = decoded_token.get("resource_access", {})
client_resource_access = resource_access.get(client_id, {})
permissions += client_resource_access.get("roles", [])
# set user permissions and filter out default keycloak realm roles
user.permissions = set(permissions) - {"offline_access", "uma_authorization"}
# set user groups
user.group_names = group_names
# add user sub to custom User proxy model
user.sub = decoded_token["sub"]
return user
[docs]
def oidc_user_from_profile(
oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any]
) -> OIDCUser:
"""Initialize an OIDCUser out of an oidc profile dict.
Args:
oidc_client: KeycloakOpenIDConnect used to discuss with keycloak
oidc_profile: OIDC profile retrieved once connected to keycloak
Returns:
OIDCUser instance parsed out of the token received.
"""
# decode JWT token
try:
access_token = oidc_profile["access_token"]
decoded_token = oidc_client.decode_token(access_token)
# access token has expired
except ExpiredSignatureError:
# get a new access token from authentication provider
oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"])
# decode access token
decoded_token = oidc_client.decode_token(oidc_profile["access_token"])
# create OIDCUser from decoded token
user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id)
# get authentication init datetime
auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
# compute OIDC tokens expiration date
oidc_profile["expires_at"] = exp_datetime
oidc_profile["refresh_expires_at"] = auth_datetime + timedelta(
seconds=oidc_profile["refresh_expires_in"]
)
# add OIDC profile data to custom User proxy model
for key, val in oidc_profile.items():
if hasattr(user, key):
setattr(user, key, val)
return user
[docs]
def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str:
return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}"
[docs]
def keycloak_oidc_client() -> KeycloakOpenIDConnect:
"""
Instantiate a KeycloakOpenIDConnect class from the following django settings:
* SWH_AUTH_SERVER_URL
* SWH_AUTH_REALM_NAME
* SWH_AUTH_CLIENT_ID
Returns:
An object to ease the interaction with the Keycloak server
Raises:
ValueError: at least one mandatory django setting is not set
"""
server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None)
realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None)
client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None)
if server_url is None or realm_name is None or client_id is None:
raise ValueError(
"SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django "
"settings are mandatory to instantiate KeycloakOpenIDConnect class"
)
return KeycloakOpenIDConnect(
server_url=server_url, realm_name=realm_name, client_id=client_id
)
[docs]
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[Dict[str, Any]] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url