Source code for swh.deposit.api.collection

# Copyright (C) 2017-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from typing import Optional, Tuple

from django.shortcuts import render
from rest_framework import status
from rest_framework.generics import ListAPIView

from swh.deposit.api.common import (
    ACCEPT_ARCHIVE_CONTENT_TYPES,
    APIPost,
    ParsedRequestHeaders,
    Receipt,
    get_collection_by_name,
)
from swh.deposit.api.utils import DefaultPagination, DepositSerializer
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS, EDIT_IRI
from swh.deposit.models import Deposit
from swh.deposit.parsers import (
    SWHAtomEntryParser,
    SWHFileUploadTarParser,
    SWHFileUploadZipParser,
    SWHMultiPartParser,
)


[docs] class CollectionAPI(ListAPIView, APIPost): """Deposit request class defining api endpoints for sword deposit. What's known as 'Col-IRI' in the sword specification. HTTP verbs supported: GET and POST """ parser_classes = ( SWHMultiPartParser, SWHFileUploadZipParser, SWHFileUploadTarParser, SWHAtomEntryParser, ) serializer_class = DepositSerializer pagination_class = DefaultPagination
[docs] def get(self, request, *args, **kwargs): """List the user's collection if the user has access to said collection.""" self.checks(request, kwargs["collection_name"]) paginated_result = super().get(request, *args, **kwargs) data = paginated_result.data # Build pagination link headers links = [] for link_name in ["next", "previous"]: link = data.get(link_name) if link is None: continue links.append(f'<{link}>; rel="{link_name}"') response = render( request, "deposit/collection_list.xml", context={ "count": data["count"], "results": [dict(d) for d in data["results"]], }, content_type="application/xml", status=status.HTTP_200_OK, ) response["Link"] = ",".join(links) return response
[docs] def get_queryset(self): """List the deposits for the authenticated user (pagination is handled by the `pagination_class` class attribute). """ return Deposit.objects.filter(client=self.request.user.id).order_by("id")
[docs] def process_post( self, req, headers: ParsedRequestHeaders, collection_name: str, deposit: Optional[Deposit] = None, ) -> Tuple[int, str, Receipt]: """Create a first deposit as: - archive deposit (1 zip) - multipart (1 zip + 1 atom entry) - atom entry Args: req (Request): the request holding the information to parse and inject in db collection_name (str): the associated client Returns: An http response (HttpResponse) according to the situation. If everything is ok, a 201 response (created) with a deposit receipt. Raises: - archive deposit: - 400 (bad request) if the request is not providing an external identifier - 403 (forbidden) if the length of the archive exceeds the max size configured - 412 (precondition failed) if the length or hash provided mismatch the reality of the archive. - 415 (unsupported media type) if a wrong media type is provided - multipart deposit: - 400 (bad request) if the request is not providing an external identifier - 412 (precondition failed) if the potentially md5 hash provided mismatch the reality of the archive - 415 (unsupported media type) if a wrong media type is provided - Atom entry deposit: - 400 (bad request) if the request is not providing an external identifier - 400 (bad request) if the request's body is empty - 415 (unsupported media type) if a wrong media type is provided """ assert deposit is None deposit = self._deposit_create(req, collection_name, external_id=headers.slug) if req.content_type in ACCEPT_ARCHIVE_CONTENT_TYPES: receipt = self._binary_upload(req, headers, collection_name, deposit) elif req.content_type.startswith("multipart/"): receipt = self._multipart_upload(req, headers, collection_name, deposit) else: receipt = self._atom_entry(req, headers, collection_name, deposit) return status.HTTP_201_CREATED, EDIT_IRI, receipt
def _deposit_create( self, request, collection_name: str, external_id: Optional[str] ) -> Deposit: collection = get_collection_by_name(collection_name) client = self.get_client(request) deposit_parent: Optional[Deposit] = None if external_id: # TODO: delete this when clients stopped relying on the slug try: # find a deposit parent (same external id, status load to success) deposit_parent = ( Deposit.objects.filter( client=client, external_id=external_id, status=DEPOSIT_STATUS_LOAD_SUCCESS, ) .order_by("-id")[0:1] .get() ) except Deposit.DoesNotExist: # then no parent for that deposit, deposit_parent already None pass return Deposit( collection=collection, external_id=external_id or "", client=client, parent=deposit_parent, )