Source code for swh.coarnotify.server.utils

# Copyright (C) 2025 - 2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Utils."""

import json
from typing import Any
import uuid

from django.conf import settings
from pyld import jsonld
from rdflib import Graph, Literal, URIRef
from rest_framework import serializers
from rest_framework.request import Request

from swh.coarnotify.namespaces import AS, CN, LDP, RDF
from swh.scheduler import get_scheduler
from swh.scheduler.utils import create_oneshot_task

from .models import Handlers, InboundNotification, OutboundNotification, Statuses

DEFAULT_CONTEXTS = ["https://www.w3.org/ns/activitystreams", "https://coar-notify.net"]


[docs] def send_cn(notification: OutboundNotification) -> None: """Schedule the sending of a COAR Notification. Args: notification: an OutboundNotification """ scheduler = get_scheduler(**settings.SWH_CONF["scheduler"]) task = create_oneshot_task("coarnotify-send-outbound", str(notification.id)) notification.send_task_id = scheduler.create_tasks([task])[0].id
def _build_reply_to( cn: InboundNotification, types: list[URIRef], summary: str | None = None ) -> OutboundNotification: """Builds an Outbound CN in reply to an CN. The notification is built with ``rdflib`` and framed with ``pyld``. Args: cn: an InboundNotification to reply to types: a list of types for the notification summary: An optional message to include in the notification Returns: An ``OutboundNotification`` """ pk = uuid.uuid4() root_id = URIRef(f"urn:uuid:{pk}") reply = Graph(identifier=root_id) # type is REQUIRED. for type_ in types: reply.add((root_id, URIRef(RDF.type), type_)) # inReplyTo is REQUIRED. It takes the URI which identifies the activity for which # this is a response. reply.add((root_id, URIRef(AS.inReplyTo), URIRef(f"urn:uuid:{cn.pk}"))) # origin is REQUIRED. It describes the system which has sent the notification. origin_id = URIRef(settings.CN_ORIGIN["id"]) reply.add((root_id, URIRef(AS.origin), origin_id)) reply.add((origin_id, URIRef(LDP.inbox), URIRef(settings.CN_ORIGIN["inbox"]))) # actor is RECOMMENDED. It describes the party responsible for this activity actor_id = URIRef(settings.CN_ACTOR["id"]) reply.add((root_id, URIRef(AS.actor), actor_id)) reply.add((actor_id, URIRef(RDF.type), URIRef(AS[settings.CN_ACTOR["type"]]))) reply.add((actor_id, URIRef(AS.name), Literal(settings.CN_ACTOR["name"]))) # summary is OPTIONAL (except of ``UnprocessableNotification``). When present, it # SHOULD provide a brief, free-text summary of the reason for rejecting the # previous Offer. if summary: reply.add((root_id, URIRef(AS.summary), Literal(summary))) # object is REQUIRED. It is the complete payload of the Offer activity to which # this pattern is a response. original_id = URIRef(f"urn:uuid:{cn.pk}") reply.add((root_id, URIRef(AS.object), original_id)) reply.parse(data=json.dumps(cn.raw_payload), format="json-ld") # target is REQUIRED. It describes the system which is intended to receive the # notification, we reuse the values from `origin` in the original notification target_id = reply.value(subject=original_id, predicate=URIRef(AS.origin)) assert target_id reply.add((root_id, URIRef(AS.target), target_id)) target_inbox = reply.value(subject=target_id, predicate=URIRef(LDP.inbox)) # Serialize the graph to JSON-LD and deserialize it to a python list, document = json.loads(reply.serialize(format="json-ld")) # At this point the JSON-LD is `expanded` and contains multiple subjects: our # ``root_id`` but also ``target``, ``object``, etc. We need to frame the JSON-LD to # explain we want ``root_id`` as the main focus of the document. As a side effect # the content of the original CN might be embed in ``inReplyTo`` instead of # ``object`` as they target the same ID, we specify we want ``object`` embedded, not # ``inReplyTo``. frame = { "@context": [ "https://www.w3.org/ns/activitystreams", "https://coar-notify.net", ], "@id": str(root_id), "object": {"@embed": True}, "inReplyTo": {"@embed": False}, } payload = jsonld.frame(document, frame) return OutboundNotification.objects.create( id=pk, payload=payload, in_reply_to=cn, inbox=str(target_inbox) )
[docs] def create_reject_cn(cn: InboundNotification) -> OutboundNotification: """Create an Outbound CN to reply to a rejected CN. https://coar-notify.net/specification/1.0.1/reject/ Args: cn: an inbound CN Returns: an outbound CN """ return _build_reply_to(cn, [URIRef(AS.Reject)], cn.error_message)
[docs] def create_accept_cn( cn: InboundNotification, summary: str | None = None ) -> OutboundNotification: """Create an Outbound CN to reply to an acceptable CN. https://coar-notify.net/specification/1.0.1/accept/ Args: cn: an inbound notification summary: an optional summary to include in the CN Returns: an outbound CN """ return _build_reply_to(cn, [URIRef(AS.Accept)], summary)
[docs] def create_unprocessable_cn(cn: InboundNotification) -> OutboundNotification: """Create an Outbound CN to reply to an unprocessable CN. https://coar-notify.net/specification/1.0.1/unprocessable/ Args: notification: an inbound CN Returns: an outbound CN """ return _build_reply_to( cn, [URIRef(AS.Flag), URIRef(CN.UnprocessableNotification)], cn.error_message )
[docs] def uuid_from_urn(urn: str) -> uuid.UUID: """Extract a UUID from a URN. Args: urn: a UUID URN (urn:uuid:xxx) Raises: ValueError: URN is not a valid UUID URN Returns: a uuid """ try: scheme, nid, nss = urn.split(":") except (ValueError, AttributeError): raise ValueError("Expecting URN rendered in URI syntax") if scheme != "urn": raise ValueError("Not a URN") if nid != "uuid": raise ValueError("Not a UUID URN") return uuid.UUID(nss)
[docs] def get_in_reply_to(graph: Graph, root_id: str) -> OutboundNotification | None: """Get the ``inReplyTo`` notification (if it exists). Args: graph: RDF graph root_id: the document ID Returns: The matching OutboundNotification if it exists """ root = URIRef(root_id) in_reply_to = graph.value(root, AS.inReplyTo) if not in_reply_to: return None try: notification_id = uuid_from_urn(str(in_reply_to)) except ValueError: return None return OutboundNotification.objects.filter(id=notification_id).first()
[docs] def get_notification_urn(document: list[dict]) -> str: """Get the notification ID from an expanded JSON-LD. Args: document: an expanded JSON-LD Raises: serializers.ValidationError: invalid ID or duplicate Returns: A UUID URN """ try: notification_urn = document[0]["@id"] except Exception: raise serializers.ValidationError({"id": "The notification must have an ID"}) try: notification_id = uuid_from_urn(notification_urn) except ValueError as exc: raise serializers.ValidationError( {"id": f"The notification ID is invalid: {exc}"} ) if InboundNotification.objects.filter(id=notification_id).exists(): raise serializers.ValidationError( { "id": f"A COAR Notification with the id {notification_id} has already " "been handled." } ) return notification_urn
[docs] def get_handler_from_types(graph: Graph, root_id: str) -> Handlers | None: """Get the notification handler from its types. For now only mentions (Announce + RelationshipAction) are handled. Args: graph: RDF graph root_id: the document ID Returns: An handler name """ types = set(graph.objects(URIRef(root_id), URIRef(RDF.type))) if types == {AS.Announce, CN.RelationshipAction}: return Handlers.MENTION return None
[docs] def reject(notification: InboundNotification, error_message: str) -> None: """Mark notification as rejected and send a Reject CN. Args: notification: an InboundNotification error_message : a reason for the rejection """ notification.status = Statuses.REJECTED notification.error_message = error_message notification.save() rejected_cn = create_reject_cn(notification) send_cn(rejected_cn)
[docs] def unprocessable(notification: InboundNotification, error_message: str) -> None: """Mark notification as unprocessable and send an Unprocessable CN. Args: notification: an InboundNotification error_message : a reason for the rejection """ notification.status = Statuses.UNPROCESSABLE notification.error_message = error_message notification.save() unprocessable_cn = create_unprocessable_cn(notification) send_cn(unprocessable_cn)
[docs] def inbox_headers(request: Request) -> dict[str, str]: """Headers to signpost our Inbox address. Args: request: an HTTP request Returns: dict[str, str]: headers to include on our views """ return { "Link": ( f'<{request.build_absolute_uri()}>; rel="http://www.w3.org/ns/ldp#inbox' ) }
[docs] def context_processor(request: Request) -> dict[str, Any]: """Inject variables in templates' context. Args: request: an HTTP request Returns: A dict of variables accessible in all templates """ return {"CN_VERSION": settings.CN_VERSION}