Source code for swh.coarnotify.server.views

# Copyright (C) 2025 - 2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Views."""

from http import HTTPStatus
import json

from django.conf import settings
from django.db.models import QuerySet
from django.http import HttpResponse
from django.shortcuts import get_object_or_404, render
from rdflib import Graph, URIRef
from rest_framework.decorators import api_view, permission_classes
from rest_framework.exceptions import NotAuthenticated, ParseError, PermissionDenied
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response

from swh.coarnotify.parsers import safe_jsonld_expander
from swh.scheduler import get_scheduler
from swh.scheduler.utils import create_oneshot_task

from .models import HANDLER_BY_TYPES, InboundNotification
from .utils import get_in_reply_to, get_notification_urn, inbox_headers, uuid_from_urn
from .validators import validate_notification


[docs] class Receipt(Response): """A 201 HTTP Response containing the url of the created notification.""" def __init__(self, notification: InboundNotification, request: Request): """Init the Response. Args: notification: the notification we created request: the HTTP request """ headers = { "Location": request.build_absolute_uri(notification.get_absolute_url()) } super().__init__(status=HTTPStatus.CREATED, headers=headers)
[docs] class Discover(Response): """A 200 HTTP Response containing the url of our inbox.""" def __init__(self, request: Request): """Init the Response. Args: request: the HTTP request """ super().__init__(status=HTTPStatus.OK, headers=inbox_headers(request))
[docs] class UserInbox(Response): """A 200 HTTP Response containing the url of our notifications sent by a user.""" def __init__( self, request: Request, notifications: "QuerySet[InboundNotification]" ): """Init the Response. Args: request: the HTTP request notifications: a queryset if notifications """ data = { "@context": "http://www.w3.org/ns/ldp", "@id": request.build_absolute_uri(), "contains": [ request.build_absolute_uri(n.get_absolute_url()) for n in notifications ], } super().__init__(data=data, status=HTTPStatus.OK)
[docs] def process_notification(request: Request) -> InboundNotification: """Process an inbound COAR Notification. - structural payload validation - store the notification in the db - schedule processing of the notification Args: request: an HTTP request Raises: ParseError: the notification is invalid NotAuthenticated: missing request.user Returns: An HTTP response with an error code if the COAR Notification is structurally invalid, or a simple JSON response with a message key containing the outcome of the process. """ if not request.user.is_authenticated: raise NotAuthenticated() # We expand the JSON-LD by safely loading ``@context`` URLs try: document = safe_jsonld_expander(request.data) except Exception as exc: raise ParseError(f"Unable to process json-ld: {exc}") from exc # The notification's URN ID is needed to access the graph property properly and # the UUID will be used as its ID in our database notification_urn = get_notification_urn(document) notification_id = uuid_from_urn(notification_urn) # Then we use rdflib to parse the expanded JSON-LD and validate the notification graph = Graph() try: graph.parse(data=json.dumps(document), format="json-ld") except Exception as exc: raise ParseError(str(exc)) from exc # Validate the notification types & shape types = validate_notification( graph, URIRef(notification_urn), request.build_absolute_uri() ) # At this point it looks like we can handle the notification, we store it in the db # and trigger a handling task notification = InboundNotification.objects.create( id=notification_id, in_reply_to=get_in_reply_to(graph, URIRef(notification_urn)), payload=document, raw_payload=request.data, sender=request.user.organization, handler=HANDLER_BY_TYPES.get(types), # type: ignore ) scheduler = get_scheduler(**settings.SWH_CONF["scheduler"]) task = create_oneshot_task("coarnotify-process-inbound", str(notification.id)) process_task_id = scheduler.create_tasks([task])[0].id notification.process_task_id = process_task_id notification.save() return notification
[docs] @api_view(["GET", "POST", "HEAD"]) def inbox(request) -> Response | HttpResponse: """Main inbox API endpoint. The response depends on the method used by the client: - HEAD: returns LDN inbox discovery headers - GET: - unauthenticated: an HTML response - authenticated: user's inbox - POST: - unauthenticated: an exception - authenticated: the result of the payload processing Args: request: an HTTP request Raises: PermissionDenied: unable to auth user Returns: An HTTP response """ if request.method == "HEAD": return Discover(request) if request.method == "GET" and not request.user.is_authenticated: response = render(request, "index.html") # include LDN inbox discovery headers for k, v in inbox_headers(request).items(): response[k] = v return response if not request.user.is_authenticated: raise PermissionDenied() if request.method == "GET": return UserInbox( request, InboundNotification.objects.filter(sender=request.user.organization), ) notification = process_notification(request) return Receipt(notification, request)
[docs] @api_view(["GET"]) @permission_classes([IsAuthenticated]) def read_notification(request, pk) -> Response: notification = get_object_or_404(InboundNotification, pk=pk) if notification.sender != request.user.organization: raise PermissionDenied() return Response(notification.raw_payload)