# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from itertools import chain
import os
import re
from shutil import get_unpack_formats
import tarfile
from typing import Dict, Optional, Tuple
from xml.etree import ElementTree
import zipfile
from rest_framework import status
from rest_framework.request import Request
from swh.deposit.api.checks import check_metadata
from swh.deposit.api.common import APIGet
from swh.deposit.api.private import APIPrivateView, DepositReadMixin
from swh.deposit.config import (
ARCHIVE_TYPE,
DEPOSIT_STATUS_REJECTED,
DEPOSIT_STATUS_VERIFIED,
)
from swh.deposit.models import Deposit, DepositRequest
from swh.scheduler.utils import create_oneshot_task
MANDATORY_ARCHIVE_UNREADABLE = (
"At least one of its associated archives is not readable" # noqa
)
MANDATORY_ARCHIVE_INVALID = (
"Mandatory archive is invalid (i.e contains only one archive)" # noqa
)
MANDATORY_ARCHIVE_UNSUPPORTED = "Mandatory archive type is not supported"
MANDATORY_ARCHIVE_MISSING = "Deposit without archive is rejected"
ARCHIVE_EXTENSIONS = [
"zip",
"tar",
"tar.gz",
"xz",
"tar.xz",
"bz2",
"tar.bz2",
"Z",
"tar.Z",
"tgz",
"7z",
]
PATTERN_ARCHIVE_EXTENSION = re.compile(r".*\.(%s)$" % "|".join(ARCHIVE_EXTENSIONS))
[docs]
class APIChecks(APIPrivateView, APIGet, DepositReadMixin):
"""Dedicated class to trigger the deposit checks on deposit archives and metadata.
Only GET is supported.
"""
def _check_deposit_archives(self, deposit: Deposit) -> Tuple[bool, Optional[Dict]]:
"""Given a deposit, check each deposit request of type archive.
Args:
The deposit to check archives for
Returns
tuple (status, details): True, None if all archives
are ok, (False, <detailed-error>) otherwise.
"""
requests = list(self._deposit_requests(deposit, request_type=ARCHIVE_TYPE))
requests.reverse()
if len(requests) == 0: # no associated archive is refused
return False, {
"archive": [
{
"summary": MANDATORY_ARCHIVE_MISSING,
}
]
}
errors = []
for archive_request in requests:
check, error_message = self._check_archive(archive_request)
if not check:
errors.append(
{"summary": error_message, "fields": [archive_request.id]}
)
if not errors:
return True, None
return False, {"archive": errors}
def _check_archive(
self, archive_request: DepositRequest
) -> Tuple[bool, Optional[str]]:
"""Check that a deposit associated archive is ok:
- readable
- supported archive format
- valid content: the archive does not contain a single archive file
If any of those checks are not ok, return the corresponding
failing check.
Args:
archive_path (DepositRequest): Archive to check
Returns:
(True, None) if archive is check compliant, (False,
<detail-error>) otherwise.
"""
archive = archive_request.archive
archive_name = os.path.basename(archive.name)
if not known_archive_format(archive_name):
return False, MANDATORY_ARCHIVE_UNSUPPORTED
try:
# Use python's File api which is consistent across different types of
# storage backends (e.g. file, azure, ...)
# I did not find any other) workaround for azure blobstorage use, noop
# otherwise
reset_content_settings_if_needed(archive)
# FIXME: ^ Implement a better way (after digging into django-storages[azure]
with archive.open("rb") as archive_fp:
try:
with zipfile.ZipFile(archive_fp) as zip_fp:
files = zip_fp.namelist()
except Exception:
try:
# rewind since the first tryout reading may have moved the
# cursor
archive_fp.seek(0)
with tarfile.open(fileobj=archive_fp) as tar_fp:
files = tar_fp.getnames()
except Exception:
return False, MANDATORY_ARCHIVE_UNSUPPORTED
except Exception:
return False, MANDATORY_ARCHIVE_UNREADABLE
if len(files) > 1:
return True, None
element = files[0]
if PATTERN_ARCHIVE_EXTENSION.match(element):
# archive in archive!
return False, MANDATORY_ARCHIVE_INVALID
return True, None
[docs]
def process_get(
self, req: Request, collection_name: str, deposit: Deposit
) -> Tuple[int, Dict, str]:
"""Trigger the checks on the deposit archives and then on the deposit metadata.
If any problems (or warnings) are raised, the deposit status and status detail
are updated accordingly. If all checks are ok, the deposit status is updated to
the 'verified' status (details updated with warning if any) and a loading task
is scheduled for the deposit to be ingested. Otherwise, the deposit is marked as
'rejected' with the error details. A json response is returned to the caller
with the deposit checks.
Args:
req: Client request
collection_name: Collection owning the deposit
deposit: Deposit concerned by the reading
Returns:
Tuple (status, json response, content-type)
"""
raw_metadata = self._metadata_get(deposit)
details_dict: Dict = {}
# will check each deposit's associated request (both of type
# archive and metadata) for errors
archives_status_ok, details = self._check_deposit_archives(deposit)
if not archives_status_ok:
assert details is not None
details_dict.update(details)
if raw_metadata is None:
metadata_status_ok = False
details_dict["metadata"] = [{"summary": "Missing Atom document"}]
else:
metadata_tree = ElementTree.fromstring(raw_metadata)
metadata_status_ok, details = check_metadata(metadata_tree)
# Ensure in case of error, we do have the rejection details
assert metadata_status_ok or (
not metadata_status_ok and details is not None
)
# we can have warnings even if checks are ok (e.g. missing suggested field)
details_dict.update(details or {})
deposit_status_ok = archives_status_ok and metadata_status_ok
# if any details_dict arose, the deposit is rejected
deposit.status = (
DEPOSIT_STATUS_VERIFIED if deposit_status_ok else DEPOSIT_STATUS_REJECTED
)
response: Dict = {
"status": deposit.status,
}
if details_dict:
deposit.status_detail = details_dict
response["details"] = details_dict
# Deposit ok, then we schedule the deposit loading task (if not already done)
if deposit_status_ok and not deposit.load_task_id and self.config["checks"]:
url = deposit.origin_url
task = create_oneshot_task(
"load-deposit", url=url, deposit_id=deposit.id, retries_left=3
)
load_task_id = self.scheduler.create_tasks([task])[0].id
deposit.load_task_id = str(load_task_id)
deposit.save()
return status.HTTP_200_OK, response, "application/json"
[docs]
def reset_content_settings_if_needed(archive) -> None:
"""This resets the content_settings on the associated blob stored in an azure
blobstorage. This prevents the correct reading of the file and failing the checks
for no good reason.
"""
try:
from storages.backends.azure_storage import AzureStorage
except ImportError:
return None
if not isinstance(archive.storage, AzureStorage):
return None
from azure.storage.blob import ContentSettings
blob_client = archive.storage.client.get_blob_client(archive.name)
# Get the existing blob properties
properties = blob_client.get_blob_properties()
# reset content encoding in the settings
content_settings = dict(properties.content_settings)
content_settings["content_encoding"] = ""
# Set the content_type and content_language headers, and populate the remaining
# headers from the existing properties
blob_headers = ContentSettings(**content_settings)
blob_client.set_http_headers(blob_headers)