Source code for swh.deposit.api.private.deposit_read
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from contextlib import contextmanager
import os
from pathlib import Path
import shutil
import tempfile
from typing import Any, Dict, Iterator, List, Optional, Tuple
from xml.etree import ElementTree
from rest_framework import status
from swh.core import tarball
from swh.deposit.api.common import APIGet
from swh.deposit.api.private import APIPrivateView, DepositReadMixin
from swh.deposit.config import ARCHIVE_TYPE, SWH_PERSON
from swh.deposit.models import Deposit
from swh.deposit.utils import NAMESPACES, normalize_date
from swh.model.hashutil import hash_to_hex
from swh.model.model import MetadataAuthorityType
from swh.model.swhids import CoreSWHID
[docs]
@contextmanager
def aggregate_tarballs(extraction_dir: str, archives: List) -> Iterator[str]:
"""Aggregate multiple tarballs into one and returns this new archive's
path.
Args:
extraction_dir: Path to use for the tarballs computation
archive_paths: Deposit's archive paths
Returns:
Tuple (directory to clean up, archive path (aggregated or not))
"""
# rebuild one zip archive from (possibly) multiple ones
os.makedirs(extraction_dir, 0o755, exist_ok=True)
dir_path = tempfile.mkdtemp(prefix="swh.deposit-", dir=extraction_dir)
# root folder to build an aggregated tarball
aggregated_tarball_rootdir = os.path.join(dir_path, "aggregate")
download_tarball_rootdir = os.path.join(dir_path, "download")
# uncompress in a temporary location all client's deposit archives
for archive in archives:
with archive.open("rb") as archive_fp:
try:
# For storage which supports the path method access, let's retrieve it
archive_path = archive.path
except NotImplementedError:
# otherwise for remote backend which do not support it, let's download
# the tarball locally first
tarball_path = Path(archive.name)
tarball_path_dir = Path(download_tarball_rootdir) / tarball_path.parent
tarball_path_dir.mkdir(0o755, parents=True, exist_ok=True)
archive_path = str(tarball_path_dir / tarball_path.name)
with open(archive_path, "wb") as f:
while chunk := archive_fp.read(10 * 1024 * 1024):
f.write(chunk)
tarball.uncompress(archive_path, aggregated_tarball_rootdir)
# Aggregate into one big tarball the multiple smaller ones
temp_tarpath = shutil.make_archive(
aggregated_tarball_rootdir, "tar", aggregated_tarball_rootdir
)
# can already clean up temporary directory
shutil.rmtree(aggregated_tarball_rootdir)
try:
yield temp_tarpath
finally:
shutil.rmtree(dir_path)
[docs]
class APIReadArchives(APIPrivateView, APIGet, DepositReadMixin):
"""Dedicated class to read a deposit's raw archives content.
Only GET is supported.
"""
def __init__(self):
super().__init__()
self.extraction_dir = self.config["extraction_dir"]
if not os.path.exists(self.extraction_dir):
os.makedirs(self.extraction_dir)
[docs]
def process_get(
self, request, collection_name: str, deposit: Deposit
) -> Tuple[int, Any, str]:
"""Build a unique tarball from the multiple received and stream that
content to the client.
Args:
request (Request):
collection_name: Collection owning the deposit
deposit: Deposit concerned by the reading
Returns:
Tuple status, stream of content, content-type
"""
archives = [
r.archive
for r in self._deposit_requests(deposit, request_type=ARCHIVE_TYPE)
]
return (
status.HTTP_200_OK,
aggregate_tarballs(self.extraction_dir, archives),
"swh/generator",
)
[docs]
class APIReadMetadata(APIPrivateView, APIGet, DepositReadMixin):
"""Class in charge of aggregating metadata on a deposit."""
def _parse_dates(
self, deposit: Deposit, metadata: ElementTree.Element
) -> Tuple[dict, dict]:
"""Normalize the date to use as a tuple of author date, committer date
from the incoming metadata.
Returns:
Tuple of author date, committer date. Those dates are
swh normalized.
"""
commit_date_elt = metadata.find("codemeta:datePublished", namespaces=NAMESPACES)
author_date_elt = metadata.find("codemeta:dateCreated", namespaces=NAMESPACES)
author_date: Any
commit_date: Any
if author_date_elt is None and commit_date_elt is None:
author_date = commit_date = deposit.complete_date
elif commit_date_elt is None:
author_date = commit_date = author_date_elt.text # type: ignore
elif author_date_elt is None:
author_date = commit_date = commit_date_elt.text
else:
author_date = author_date_elt.text
commit_date = commit_date_elt.text
return (normalize_date(author_date), normalize_date(commit_date))
[docs]
def metadata_read(self, deposit: Deposit) -> Dict[str, Any]:
"""Read and aggregate multiple deposit information into one unified dictionary.
Args:
deposit: Deposit to retrieve information from
Returns:
Dictionary of deposit information read by the deposit loader, with the
following keys:
**origin** (Dict): Information about the origin
**raw_metadata** (str): List of raw metadata received for the
deposit
**provider** (Dict): the metadata provider information about the
deposit client
**tool** (Dict): the deposit information
**deposit** (Dict): deposit information relevant to build the revision
(author_date, committer_date, etc...)
"""
raw_metadata = self._metadata_get(deposit)
author_date: Optional[dict]
commit_date: Optional[dict]
if raw_metadata:
metadata_tree = ElementTree.fromstring(raw_metadata)
author_date, commit_date = self._parse_dates(deposit, metadata_tree)
release_notes_elements = metadata_tree.findall(
"codemeta:releaseNotes", namespaces=NAMESPACES
)
else:
author_date = commit_date = None
release_notes_elements = []
if deposit.parent and deposit.parent.swhid:
parent_swhid = deposit.parent.swhid
assert parent_swhid is not None
swhid = CoreSWHID.from_string(parent_swhid)
parent_revision = hash_to_hex(swhid.object_id)
parents = [parent_revision]
else:
parents = []
release_notes: Optional[str]
if release_notes_elements:
release_notes = "\n\n".join(
element.text for element in release_notes_elements if element.text
)
else:
release_notes = None
return {
"origin": {"type": "deposit", "url": deposit.origin_url},
"provider": {
"provider_name": deposit.client.last_name,
"provider_url": deposit.client.provider_url,
"provider_type": MetadataAuthorityType.DEPOSIT_CLIENT.value,
"metadata": {},
},
"tool": self.tool,
"raw_metadata": raw_metadata,
"deposit": {
"id": deposit.id,
"client": deposit.client.username,
"collection": deposit.collection.name,
"author": SWH_PERSON,
"author_date": author_date,
"committer": SWH_PERSON,
"committer_date": commit_date,
"revision_parents": parents,
"release_notes": release_notes,
},
}
[docs]
def process_get(
self, request, collection_name: str, deposit: Deposit
) -> Tuple[int, Dict, str]:
data = self.metadata_read(deposit)
return status.HTTP_200_OK, data if data else {}, "application/json"