Source code for swh.web.alter.utils

# Copyright (C) 2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

from __future__ import annotations

from collections import namedtuple
from functools import wraps
import itertools
from typing import TYPE_CHECKING, Any, Optional

from django.conf import settings
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.translation import gettext as _

from .models import Alteration, Origin

if TYPE_CHECKING:
    from uuid import UUID

    from django.http import HttpRequest, HttpResponse

Step = namedtuple("Step", ["url", "active", "disabled"])

ProcessState = dict[str, Step]
STEPS_NAMES = ["email", "category", "origins", "reasons", "summary"]


[docs] def get_django_group_emails(group_name: str) -> list[str]: """Get `group_name` members email addresses from django auth system. Args: group_name: the name of the Group Returns: a list of email addresses """ User = get_user_model() return [user.email for user in User.objects.filter(groups__name=group_name)]
[docs] def get_keycloak_group_emails(group_name: str) -> list[str]: """Get the mail alias for `group_name`. For now we use email aliases defined in the config file. Args: group_name: the name of the Group Returns: a list containing a single email alias for `group_name` """ return [settings.ALTER_SETTINGS[f"{group_name}_mail_alias"]]
[docs] def get_group_emails(group_name: str) -> list[str]: """Get a list of recipients for `group_name`. The method used to fetch the emails depends on the auth backend. Args: group_name: the name of the Group Returns: a list of email addresses """ if settings.SWH_AUTH_SERVER_URL: return get_keycloak_group_emails(group_name) else: return get_django_group_emails(group_name)
SESSION_ALTERATION_IDS = "alteration_ids" """Session key to store the alteration ids current user has access to.""" SESSION_VERIFIED_EMAIL = "alteration_email" """Session key to store an email address validated by the current user."""
[docs] def has_access(request: HttpRequest, pk: UUID) -> bool: """Check if the current user has access to an `Alteration`. Args: request: an HttpRequest pk: an `Alteration` id Returns: True if the user is allowed. """ return str(pk) in request.session.get(SESSION_ALTERATION_IDS, [])
[docs] def set_access(request: HttpRequest, pk: UUID) -> None: """Store `pk` in the user's session. Args: request: an HttpRequest pk: an `Alteration` id """ if SESSION_ALTERATION_IDS not in request.session: request.session[SESSION_ALTERATION_IDS] = [str(pk)] else: request.session[SESSION_ALTERATION_IDS].append(str(pk)) request.session.modified = True
[docs] def set_verified_email(request: HttpRequest, email: str) -> None: """Store a verified email in the user's session. Args: request: an HttpRequest email: an email address """ request.session[SESSION_VERIFIED_EMAIL] = email
[docs] def verified_email(request: HttpRequest) -> str: """A validated email for the current user. Value is set by the `assistant_email_verification` view. Args: request: an HttpRequest Returns: A verified email address or an empty string if no email has been verified or if the verified email has been blocked meanwhile """ from .models import BlockList email = request.session.get(SESSION_VERIFIED_EMAIL) if not email or BlockList.is_blocked(email): return "" return email
[docs] def requestors_restricted(view_func): """A decorator to protect views from unauthorized access. Requires a view with a `pk` keyword argument matching an `Alteration` id. The wrapper checks if the current user has access it or send a redirect to the `alteration_access` view. """ @wraps(view_func) def wrap(request: HttpRequest, *args, **kwargs): pk = kwargs["pk"] if has_access(request, pk): return view_func(request, *args, **kwargs) else: messages.warning(request, _("Access to this page is restricted.")) return redirect("alteration-access", pk=pk) return wrap
[docs] def generate_origin_changelog(old_url: str, old_values: dict[str, Any]) -> str: """Generate a short changelog after updating an Origin. Args: old_url: the original origin url old_values: the list of changed fields (e.g. Form.changed_data) Returns: A text describing the changes """ return ( generate_changelog( Origin(**old_values), _("Origin %(url)s was modified, changes:" % {"url": old_url}), old_values, ) if old_values else "" )
[docs] def generate_alteration_changelog(old_values: dict[str, Any]) -> str: """Generate a short changelog after updating an Alteration. Args: old_values: the list of changed fields (e.g. Form.changed_data) Returns: A text describing the changes """ return ( generate_changelog( Alteration(**old_values), _("Alteration request was modified, changes:"), old_values, ) if old_values else "" )
[docs] def generate_changelog( instance: Alteration | Origin, introduction: str, old_values: dict[str, Any] ) -> str: """Generic changelog generator. Args: instance: a model instance introduction: a sentence introducing the changes old_values: a dict of changed values (field: old value) Returns: A text listing the fields impacted and their previous values """ parts = [introduction] for fieldname, value in old_values.items(): label = getattr(type(instance), fieldname).field.verbose_name display_method = f"get_{fieldname}_display" # available on choice fields if hasattr(instance, display_method): value = getattr(instance, display_method)() parts.append(f"- {label}: {value}") return "\n".join(parts)
[docs] def process_state(request) -> ProcessState: """Describe the alteration request process state. The alteration request assistant is a multi-step form, where the user is able to go back and forward in the process as long as required data has been filled. This method builds a dict of each steps as keys (email verification, category chooser, etc.) and step state as values (url, current active step or not, available step or not) using session vars & `request` path. Args: request: an ``HttpRequest`` Returns: The alteration request process state """ steps: ProcessState = {} for name in STEPS_NAMES: url = reverse(f"alteration-{name}") active = True if url == request.path else False disabled = False if f"alteration_{name}" in request.session else True steps[name] = Step(url, active, disabled) return steps
[docs] def redirect_to_step(request, current_step: str) -> Optional[HttpResponse]: """Redirect the user to the proper step. Prevent a user skipping steps in the process. It redirects the user to the first missing step of the form (ie. you try to access /reasons before choosing a category you are redirected to /category) and displays a message to explain why. Args: request: an ``HttpRequest`` current_step: the name of current step Returns: A redirect to a previous step if some data is missing """ warnings = { "email": _( "Please confirm your email address before accessing the alteration " "request form." ), "category": _("Please choose a category for your alteration request."), "origins": _("Please select origins for your alteration request."), "reasons": _("Please fill the reasons for your alteration request."), } previous_steps = list(itertools.takewhile(lambda s: s != current_step, STEPS_NAMES)) steps = process_state(request) for name in previous_steps: if steps[name].disabled: # a disabled step has not been filled messages.warning(request, warnings[name]) return redirect(steps[name].url) return None
[docs] def cleanup_session(request: HttpRequest) -> None: """Remove alteration request details from the session. Called after creating the Alteration request in the db, all sessions keys starting with `alteration_` are deleted, except `alteration_email`. Args: request: an ``HttpRequest`` """ to_remove = [ key for key in request.session.keys() if key.startswith("alteration_") and key != "alteration_email" ] for key in to_remove: request.session.pop(key)
[docs] def tunnel_step(current_step: str): """A decorator to forbid skipping steps in the alteration request tunnel. The user will be redirect to the first missing step before `current_step`. Args: current_step: the name of the current step """ def factory(view_func): @wraps(view_func) def wrap(request: HttpRequest, *args, **kwargs): if redirect_ := redirect_to_step(request, current_step): return redirect_ return view_func(request, *args, **kwargs) return wrap return factory