# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, cast
import uuid
from django.contrib.auth import authenticate, login, logout
from django.core.cache import cache
from django.http import HttpRequest
from django.http.response import (
HttpResponse,
HttpResponseBadRequest,
HttpResponseRedirect,
HttpResponseServerError,
)
from django.urls import re_path as url
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import keycloak_oidc_client, oidc_profile_cache_key, reverse
from swh.auth.keycloak import KeycloakError, keycloak_error_message
from swh.auth.utils import gen_oidc_pkce_codes
[docs]
def oidc_login_view(request: HttpRequest, redirect_uri: str, scope: str = "openid"):
"""
Helper view function that initiates a login process using OIDC authorization
code flow with PKCE.
OIDC session scope can be modified using the dedicated parameter.
"""
# generate a CSRF token
state = str(uuid.uuid4())
code_verifier, code_challenge = gen_oidc_pkce_codes()
request.session["login_data"] = {
"code_verifier": code_verifier,
"state": state,
"redirect_uri": redirect_uri,
"next": request.GET.get("next", ""),
}
authorization_url_params = {
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"scope": scope,
}
try:
oidc_client = keycloak_oidc_client()
authorization_url = oidc_client.authorization_url(
redirect_uri, **authorization_url_params
)
except KeycloakError as ke:
return HttpResponseServerError(keycloak_error_message(ke))
return HttpResponseRedirect(authorization_url)
[docs]
def get_oidc_login_data(request: HttpRequest) -> Dict[str, Any]:
"""
Check and get login data stored in django session.
"""
if "login_data" not in request.session:
raise Exception("Login process has not been initialized.")
login_data = request.session["login_data"]
if "code" not in request.GET or "state" not in request.GET:
raise ValueError("Missing query parameters for authentication.")
# get CSRF token returned by OIDC server
state = request.GET["state"]
if state != login_data["state"]:
raise ValueError("Wrong CSRF token, aborting login process.")
return login_data
[docs]
def oidc_login(request: HttpRequest) -> HttpResponse:
"""
Django view to initiate login process using OpenID Connect authorization
code flow with PKCE.
"""
redirect_uri = reverse("oidc-login-complete", request=request)
return oidc_login_view(request, redirect_uri=redirect_uri)
[docs]
def oidc_login_complete(request: HttpRequest) -> HttpResponse:
"""
Django view to finalize login process using OpenID Connect authorization
code flow with PKCE.
"""
if "error" in request.GET:
return HttpResponseServerError(request.GET["error"])
try:
login_data = get_oidc_login_data(request)
except ValueError as ve:
return HttpResponseBadRequest(str(ve))
except Exception as e:
return HttpResponseServerError(str(e))
next = login_data["next"] or request.build_absolute_uri("/")
user = authenticate(
request=request,
code=request.GET["code"],
code_verifier=login_data["code_verifier"],
redirect_uri=login_data["redirect_uri"],
)
if user is None:
return HttpResponseServerError("User authentication failed.")
login(request, user)
return HttpResponseRedirect(next)
[docs]
def oidc_logout(request: HttpRequest) -> HttpResponse:
"""
Django view to logout using OpenID Connect.
"""
user = request.user
logout(request)
if hasattr(user, "refresh_token"):
user = cast(OIDCUser, user)
refresh_token = cast(str, user.refresh_token)
try:
# end OpenID Connect session
oidc_client = keycloak_oidc_client()
oidc_client.logout(refresh_token)
except KeycloakError as ke:
return HttpResponseServerError(keycloak_error_message(ke))
# remove user data from cache
cache.delete(oidc_profile_cache_key(oidc_client, user.id))
return HttpResponseRedirect(request.GET.get("next", "/"))
urlpatterns = [
url(r"^oidc/login/$", oidc_login, name="oidc-login"),
url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"),
url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"),
]