# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining an swh-deposit client
"""
import hashlib
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urljoin
import warnings
from xml.etree import ElementTree
import requests
from requests import Response
from requests.utils import parse_header_links
from swh.core.config import load_from_envvar
from swh.deposit import __version__ as swh_deposit_version
from swh.deposit.utils import NAMESPACES
logger = logging.getLogger(__name__)
[docs]
class MaintenanceError(ValueError):
"""Informational maintenance error exception"""
pass
[docs]
def handle_deprecated_config(config: Dict) -> Tuple[str, Optional[Tuple[str, str]]]:
warnings.warn(
'"config" argument is deprecated, please '
'use "url" and "auth" arguments instead; note that "auth" '
"expects now a couple (username, password) and not a dict.",
DeprecationWarning,
)
url: str = config["url"]
auth: Optional[Tuple[str, str]] = None
if config.get("auth"):
auth = (config["auth"]["username"], config["auth"]["password"])
return (url, auth)
[docs]
class BaseApiDepositClient:
"""Deposit client base class"""
def __init__(
self,
config: Optional[Dict] = None,
url: Optional[str] = None,
auth: Optional[Tuple[str, str]] = None,
):
if not url and not config:
config = load_from_envvar()
if config:
url, auth = handle_deprecated_config(config)
# needed to help mypy not be fooled by the Optional nature of url
assert url is not None
self.base_url = url.strip("/") + "/"
self.auth = auth
self.session = requests.Session()
if auth:
self.session.auth = auth
self.session.headers.update(
{"user-agent": f"swh-deposit/{swh_deposit_version}"}
)
[docs]
def do(self, method, url, *args, **kwargs):
"""Internal method to deal with requests, possibly with basic http
authentication.
Args:
method (str): supported http methods as in self._methods' keys
Returns:
The request's execution
"""
full_url = urljoin(self.base_url, url.lstrip("/"))
return self.session.request(method, full_url, *args, **kwargs)
[docs]
class PrivateApiDepositClient(BaseApiDepositClient):
"""Private API deposit client to:
- read a given deposit's archive(s)
- read a given deposit's metadata
- update a given deposit's status
"""
[docs]
def archive_get(self, archive_update_url: str, archive: str) -> Optional[str]:
"""Retrieve the archive from the deposit to a local directory.
Args:
archive_update_url (str): The full deposit archive(s)'s raw content
to retrieve locally
archive (str): the local archive's path where to store
the raw content
Returns:
The archive path to the local archive to load.
Or None if any problem arose.
"""
response = self.do("get", archive_update_url, stream=True)
if response.ok:
with open(archive, "wb") as f:
for chunk in response.iter_content():
f.write(chunk)
return archive
msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,)
logger.error(msg)
raise ValueError(msg)
[docs]
def status_update(
self,
update_status_url,
status,
status_detail=None,
release_id=None,
directory_id=None,
origin_url=None,
):
"""Update the deposit's status.
Args:
update_status_url (str): the full deposit's archive
status (str): The status to update the deposit with
release_id (str/None): the release's identifier to update to
directory_id (str/None): the directory's identifier to update to
origin_url (str/None): deposit's associated origin url
"""
payload = {"status": status}
if release_id:
payload["release_id"] = release_id
if directory_id:
payload["directory_id"] = directory_id
if origin_url:
payload["origin_url"] = origin_url
if status_detail:
payload["status_detail"] = status_detail
self.do("put", update_status_url, json=payload)
[docs]
def check(self, check_url):
"""Check the deposit's associated data (metadata, archive(s))
Args:
check_url (str): the full deposit's check url
"""
r = self.do("get", check_url)
if r.ok:
data = r.json()
return data["status"]
msg = "Problem when checking deposit %s" % check_url
logger.error(msg)
raise ValueError(msg)
[docs]
class BaseDepositClient(BaseApiDepositClient):
"""Base Deposit client to access the public api."""
def __init__(
self, config=None, url=None, auth=None, error_msg=None, empty_result={}
):
super().__init__(url=url, auth=auth, config=config)
self.error_msg = error_msg
self.empty_result = empty_result
[docs]
def compute_url(self, *args, **kwargs):
"""Compute api url endpoint to query."""
raise NotImplementedError
[docs]
def compute_method(self, *args, **kwargs):
"""Http method to use on the url"""
raise NotImplementedError
[docs]
def parse_result_ok(
self, xml_content: str, headers: Optional[Dict] = None
) -> Dict[str, Any]:
"""Given an xml result from the api endpoint, parse it and returns a
dict.
"""
raise NotImplementedError
[docs]
def parse_result_error(self, xml_content: str) -> Dict[str, Any]:
"""Given an error response in xml, parse it into a dict.
Returns:
dict with following keys:
'error': The error message
'detail': Some more detail about the error if any
"""
data = ElementTree.fromstring(xml_content)
return {
"summary": data.findtext("atom:summary", namespaces=NAMESPACES),
"detail": data.findtext("detail", "", namespaces=NAMESPACES).strip(),
"sword:verboseDescription": data.findtext(
"sword:verboseDescription", "", namespaces=NAMESPACES
).strip(),
}
[docs]
def do_execute(self, method: str, url: str, info: Dict, **kwargs) -> Response:
"""Execute the http query to url using method and info information.
By default, execute a simple query to url with the http method. Override this in
subclass to improve the default behavior if needed.
"""
return self.do(method, url, **kwargs)
[docs]
def compute_params(self, **kwargs) -> Dict[str, Any]:
"""Determine the params out of the kwargs"""
return {}
[docs]
def execute(self, *args, **kwargs) -> Dict[str, Any]:
"""Main endpoint to prepare and execute the http query to the api.
Raises:
MaintenanceError if some api maintenance is happening.
Returns:
Dict of computed api data
"""
url = self.compute_url(*args, **kwargs)
method = self.compute_method(*args, **kwargs)
info = self.compute_information(*args, **kwargs)
params = self.compute_params(**kwargs)
try:
response = self.do_execute(method, url, info, params=params)
except Exception as e:
msg = self.error_msg % (url, e)
result = self.empty_result
result.update(
{
"error": msg,
}
)
return result
else:
if response.ok:
if int(response.status_code) == 204: # 204 returns no body
return {"status": response.status_code}
else:
headers = dict(response.headers) if response.headers else None
return self.parse_result_ok(response.text, headers)
else:
try:
error = self.parse_result_error(response.text)
except ElementTree.ParseError:
logger.warning(
"Error message in response is not xml parsable: %s",
response.text,
)
error = {}
empty = self.empty_result
error.update(empty)
if response.status_code == 503:
summary = error.get("summary")
detail = error.get("sword:verboseDescription")
# Maintenance error
if summary and detail:
raise MaintenanceError(f"{summary}: {detail}")
error.update(
{
"status": response.status_code,
}
)
return error
[docs]
class ServiceDocumentDepositClient(BaseDepositClient):
"""Service Document information retrieval."""
def __init__(self, config=None, url=None, auth=None):
super().__init__(
url=url,
auth=auth,
config=config,
error_msg="Service document failure at %s: %s",
empty_result={"collection": None},
)
[docs]
def compute_url(self, *args, **kwargs):
return "/servicedocument/"
[docs]
def compute_method(self, *args, **kwargs):
return "get"
[docs]
def parse_result_ok(
self, xml_content: str, headers: Optional[Dict] = None
) -> Dict[str, Any]:
"""Parse service document's success response."""
single_keys = [
"atom:title",
"sword:collectionPolicy",
"dc:abstract",
"sword:treatment",
"sword:mediation",
"sword:metadataRelevantHeader",
"sword:service",
"sword:name",
]
multi_keys = [
"app:accept",
"sword:acceptPackaging",
]
data = ElementTree.fromstring(xml_content)
workspace: List[Dict[str, Any]] = [
{
"app:collection": {
**{
key: collection.findtext(key, namespaces=NAMESPACES)
for key in single_keys
},
**{
key: [
elt.text
for elt in collection.findall(key, namespaces=NAMESPACES)
]
for key in multi_keys
},
}
}
for collection in data.findall(
"app:workspace/app:collection", namespaces=NAMESPACES
)
]
return {"app:service": {"app:workspace": workspace}}
[docs]
def parse_result_error(self, xml_content: str) -> Dict[str, Any]:
result = super().parse_result_error(xml_content)
return {"error": result["summary"]}
[docs]
class StatusDepositClient(BaseDepositClient):
"""Status information on a deposit."""
def __init__(self, config=None, url=None, auth=None):
super().__init__(
url=url,
auth=auth,
config=config,
error_msg="Status check failure at %s: %s",
empty_result={
"deposit_status": None,
"deposit_status_detail": None,
"deposit_swh_id": None,
},
)
[docs]
def compute_url(self, collection, deposit_id):
return "/%s/%s/status/" % (collection, deposit_id)
[docs]
def compute_method(self, *args, **kwargs):
return "get"
[docs]
def parse_result_ok(
self, xml_content: str, headers: Optional[Dict] = None
) -> Dict[str, Any]:
"""Given an xml content as string, returns a deposit dict."""
data = ElementTree.fromstring(xml_content)
keys = [
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_swh_id",
"deposit_swh_id_context",
"deposit_external_id",
]
return {key: data.findtext("swh:" + key, namespaces=NAMESPACES) for key in keys}
[docs]
class CollectionListDepositClient(BaseDepositClient):
"""List a collection of deposits (owned by a user)"""
def __init__(self, config=None, url=None, auth=None):
super().__init__(
url=url,
auth=auth,
config=config,
error_msg="List deposits failure at %s: %s",
empty_result={},
)
[docs]
def compute_url(self, collection, **kwargs):
return f"/{collection}/"
[docs]
def compute_method(self, *args, **kwargs):
return "get"
[docs]
def compute_params(self, **kwargs) -> Dict[str, Any]:
"""Transmit pagination params if values provided are not None
(e.g. page, page_size)
"""
return {k: v for k, v in kwargs.items() if v is not None}
[docs]
def parse_result_ok(
self, xml_content: str, headers: Optional[Dict] = None
) -> Dict[str, Any]:
"""Given an xml content as string, returns a deposit dict."""
link_header = headers.get("Link", "") if headers else ""
links = parse_header_links(link_header)
data = ElementTree.fromstring(xml_content)
total_result = data.findtext("swh:count", "0", namespaces=NAMESPACES).strip()
keys = [
"id",
"reception_date",
"complete_date",
"external_id",
"swhid",
"status",
"status_detail",
"swhid_context",
"origin_url",
]
entries = data.findall("atom:entry", namespaces=NAMESPACES)
deposits_d = [
{
key: deposit.findtext(f"swh:{key}", namespaces=NAMESPACES)
for key in keys
if deposit.find(f"swh:{key}", namespaces=NAMESPACES) is not None
}
for deposit in entries
]
return {
"count": total_result,
"deposits": deposits_d,
**{entry["rel"]: entry["url"] for entry in links},
}
[docs]
class BaseCreateDepositClient(BaseDepositClient):
"""Deposit client base class to post new deposit."""
def __init__(self, config=None, url=None, auth=None):
super().__init__(
url=url,
auth=auth,
config=config,
error_msg="Post Deposit failure at %s: %s",
empty_result={
"swh:deposit_id": None,
"swh:deposit_status": None,
},
)
[docs]
def compute_url(self, collection, *args, **kwargs):
return "/%s/" % collection
[docs]
def compute_method(self, *args, **kwargs):
return "post"
[docs]
def parse_result_ok(
self, xml_content: str, headers: Optional[Dict] = None
) -> Dict[str, Any]:
"""Given an xml content as string, returns a deposit dict."""
data = ElementTree.fromstring(xml_content)
keys = [
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_date",
]
return {key: data.findtext("swh:" + key, namespaces=NAMESPACES) for key in keys}
[docs]
def do_execute(self, method, url, info, **kwargs):
with open(info["filepath"], "rb") as f:
return self.do(method, url, data=f, headers=info["headers"])
[docs]
class CreateArchiveDepositClient(BaseCreateDepositClient):
"""Post an archive (binary) deposit client."""
[docs]
class UpdateArchiveDepositClient(CreateArchiveDepositClient):
"""Update (add/replace) an archive (binary) deposit client."""
[docs]
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/media/" % (collection, deposit_id)
[docs]
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
[docs]
class CreateMultipartDepositClient(BaseCreateDepositClient):
"""Create a multipart deposit client."""
def _multipart_info(self, info, info_meta):
files = [
(
"file",
(info["filename"], open(info["filepath"], "rb"), info["content-type"]),
),
(
"atom",
(
info_meta["filename"],
open(info_meta["filepath"], "rb"),
"application/atom+xml",
),
),
]
headers = {
"CONTENT_MD5": info["md5sum"],
"IN-PROGRESS": str(info["in_progress"]),
}
if "slug" in info:
headers["SLUG"] = info["slug"]
return files, headers
[docs]
def do_execute(self, method, url, info, **kwargs):
return self.do(method, url, files=info["files"], headers=info["headers"])
[docs]
class UpdateMultipartDepositClient(CreateMultipartDepositClient):
"""Update a multipart deposit client."""
[docs]
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/metadata/" % (collection, deposit_id)
[docs]
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
[docs]
class PublicApiDepositClient(BaseApiDepositClient):
"""Public api deposit client."""
[docs]
def service_document(self):
"""Retrieve service document endpoint's information."""
return ServiceDocumentDepositClient(url=self.base_url, auth=self.auth).execute()
[docs]
def deposit_status(self, collection: str, deposit_id: int):
"""Retrieve status information on a deposit."""
return StatusDepositClient(url=self.base_url, auth=self.auth).execute(
collection, deposit_id
)
[docs]
def deposit_list(
self,
collection: str,
page: Optional[int] = None,
page_size: Optional[int] = None,
):
"""List deposits from the collection"""
return CollectionListDepositClient(url=self.base_url, auth=self.auth).execute(
collection, page=page, page_size=page_size
)
[docs]
def deposit_create(
self,
collection: str,
slug: Optional[str],
archive: Optional[str] = None,
metadata: Optional[str] = None,
in_progress: bool = False,
):
"""Create a new deposit (archive, metadata, both as multipart)."""
if archive and not metadata:
return CreateArchiveDepositClient(
url=self.base_url, auth=self.auth
).execute(collection, in_progress, slug, archive_path=archive)
elif not archive and metadata:
return CreateMetadataDepositClient(
url=self.base_url, auth=self.auth
).execute(collection, in_progress, slug, metadata_path=metadata)
else:
return CreateMultipartDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
archive_path=archive,
metadata_path=metadata,
)
[docs]
def deposit_update(
self,
collection: str,
deposit_id: int,
slug: Optional[str],
archive: Optional[str] = None,
metadata: Optional[str] = None,
in_progress: bool = False,
replace: bool = False,
swhid: Optional[str] = None,
):
"""Update (add/replace) existing deposit (archive, metadata, both)."""
response = self.deposit_status(collection, deposit_id)
if "error" in response:
return response
status = response["deposit_status"]
if swhid is None and status != "partial":
return {
"error": "You can only act on deposit with status 'partial'",
"detail": f"The deposit {deposit_id} has status '{status}'",
"deposit_status": status,
"deposit_id": deposit_id,
}
if swhid is not None and status != "done":
return {
"error": "You can only update metadata on deposit with status 'done'",
"detail": f"The deposit {deposit_id} has status '{status}'",
"deposit_status": status,
"deposit_id": deposit_id,
}
if archive and not metadata:
result = UpdateArchiveDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
archive_path=archive,
replace=replace,
)
elif not archive and metadata and swhid is None:
result = UpdateMetadataOnPartialDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
metadata_path=metadata,
replace=replace,
)
elif not archive and metadata and swhid is not None:
result = UpdateMetadataOnDoneDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
metadata_path=metadata,
swhid=swhid,
)
else:
result = UpdateMultipartDepositClient(
url=self.base_url, auth=self.auth
).execute(
collection,
in_progress,
slug,
deposit_id=deposit_id,
archive_path=archive,
metadata_path=metadata,
replace=replace,
)
if "error" in result:
return result
return self.deposit_status(collection, deposit_id)