# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from uuid import UUID
from django.conf import settings
from django.contrib.auth.models import Group
from django.http import HttpRequest, QueryDict
from django.urls import reverse as django_reverse
from swh.auth.django.models import OIDCUser
from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect
[docs]
def keycloak_uuid_to_django_id(keycloak_uuid: str) -> int:
"""Convert keycloak user uuid to its django integer id.
Args:
keycloak_uuid: UUID identifier of a Keycloak user
Returns:
Django integer identifier for the user
"""
return UUID(keycloak_uuid).int
[docs]
def django_id_to_keycloak_uuid(django_id: int) -> str:
"""Convert django user integer id to its keycloak uuid.
Args:
django_id: Integer identifier of a django user
Returns:
Keycloak UUID identifier for the user
"""
return str(UUID(int=django_id))
[docs]
def oidc_user_from_decoded_token(
decoded_token: Dict[str, Any], client_id: Optional[str] = None
) -> OIDCUser:
"""Create an OIDCUser out of a decoded token
Args:
decoded_token: Decoded token Dict
client_id: Optional client id of the keycloak client instance used to decode
the token. If not provided, the permissions will be empty.
Returns:
The OIDCUser instance
"""
# compute an integer user identifier for Django User model
# by concatenating all groups of the UUID4 user identifier
# generated by Keycloak and converting it from hex to decimal
user_id = keycloak_uuid_to_django_id(decoded_token["sub"])
# create a Django user that will not be saved to database
user = OIDCUser(
id=user_id,
username=decoded_token.get("preferred_username", ""),
password="",
first_name=decoded_token.get("given_name", ""),
last_name=decoded_token.get("family_name", ""),
email=decoded_token.get("email", ""),
)
# process keycloak groups
group_names = set()
if "groups" in decoded_token:
# set is_staff user property based on group membership
user.is_staff = "/staff" in decoded_token["groups"]
for group_name in decoded_token["groups"]:
# remove leading slash added by keycloak to group name
django_group_name = group_name.lstrip("/")
# ensure a corresponding django group exist
Group.objects.get_or_create(name=django_group_name)
group_names.add(django_group_name)
realm_access = decoded_token.get("realm_access", {})
permissions = realm_access.get("roles", [])
if client_id:
# extract user permissions if any
resource_access = decoded_token.get("resource_access", {})
client_resource_access = resource_access.get(client_id, {})
permissions += client_resource_access.get("roles", [])
# set user permissions and filter out default keycloak realm roles
user.permissions = set(permissions) - {"offline_access", "uma_authorization"}
# set user groups
user.group_names = group_names
# add user sub to custom User proxy model
user.sub = decoded_token["sub"]
return user
[docs]
def oidc_user_from_profile(
oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any]
) -> OIDCUser:
"""Initialize an OIDCUser out of an oidc profile dict.
Args:
oidc_client: KeycloakOpenIDConnect used to discuss with keycloak
oidc_profile: OIDC profile retrieved once connected to keycloak
Returns:
OIDCUser instance parsed out of the token received.
"""
# decode JWT token
try:
access_token = oidc_profile["access_token"]
decoded_token = oidc_client.decode_token(access_token)
# access token has expired
except ExpiredSignatureError:
# get a new access token from authentication provider
oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"])
# decode access token
decoded_token = oidc_client.decode_token(oidc_profile["access_token"])
# create OIDCUser from decoded token
user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id)
# get authentication init datetime
auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
# compute OIDC tokens expiration date
oidc_profile["expires_at"] = exp_datetime
oidc_profile["refresh_expires_at"] = auth_datetime + timedelta(
seconds=oidc_profile["refresh_expires_in"]
)
# add OIDC profile data to custom User proxy model
for key, val in oidc_profile.items():
if hasattr(user, key):
setattr(user, key, val)
return user
[docs]
def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str:
return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}"
[docs]
def keycloak_oidc_client() -> KeycloakOpenIDConnect:
"""
Instantiate a KeycloakOpenIDConnect class from the following django settings:
* SWH_AUTH_SERVER_URL
* SWH_AUTH_REALM_NAME
* SWH_AUTH_CLIENT_ID
Returns:
An object to ease the interaction with the Keycloak server
Raises:
ValueError: at least one mandatory django setting is not set
"""
server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None)
realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None)
client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None)
if server_url is None or realm_name is None or client_id is None:
raise ValueError(
"SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django "
"settings are mandatory to instantiate KeycloakOpenIDConnect class"
)
return KeycloakOpenIDConnect(
server_url=server_url, realm_name=realm_name, client_id=client_id
)
[docs]
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[Dict[str, Any]] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v is not None}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url