Source code for swh.objstorage.interface

# Copyright (C) 2015-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


from datetime import timedelta
from typing import (
    Any,
    Dict,
    FrozenSet,
    Iterable,
    Iterator,
    Mapping,
    Optional,
    Tuple,
    Union,
)

from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable

from swh.core.api import remote_api_endpoint
from swh.model.model import Sha1
from swh.objstorage.constants import DEFAULT_LIMIT

COMPOSITE_OBJID_KEYS: FrozenSet[
    Literal["sha1", "sha1_git", "sha256", "blake2s256"]
] = frozenset(("sha1", "sha1_git", "sha256", "blake2s256"))


[docs] class CompositeObjId(TypedDict, total=False): sha1: bytes sha1_git: bytes sha256: bytes blake2s256: bytes
[docs] def objid_from_dict(d: Dict[str, Any]) -> CompositeObjId: filtered: CompositeObjId = {} for key in COMPOSITE_OBJID_KEYS: value = d.get(key) if value is None: continue if not isinstance(value, bytes): raise TypeError( "value for %s is %s, not bytes" % (key, value.__class__.__name__) ) filtered[key] = value if not filtered: raise ValueError( "dict is missing at least one of %s" % ", ".join(COMPOSITE_OBJID_KEYS) ) return filtered
ObjId = Union[bytes, CompositeObjId] """Type of object ids, which should be ``{hash: value for hash in SUPPORTED_HASHES}``; but single sha1 hashes are supported for legacy clients"""
[docs] @runtime_checkable class ObjStorageInterface(Protocol): """High-level API to manipulate the Software Heritage object storage. Conceptually, the object storage offers the following methods: - check_config() check if the object storage is properly configured - __contains__() check if an object is present, by object id - add() add a new object, returning an object id - restore() same as add() but erase an already existed content - get() retrieve the content of an object, by object id - check() check the integrity of an object, by object id - delete() remove an object and the following attributes: - name name given to the object storage; useful e.g. for logging in composite object storagges (multiplexer) Each implementation of this interface can have a different behavior and its own way to store the contents. """ name: str def __init__( self, *, name: str = "", **kwargs, ): ...
[docs] @remote_api_endpoint("check_config") def check_config(self, *, check_write): """Check whether the object storage is properly configured. Args: check_write (bool): if True, check if writes to the object storage can succeed. Returns: True if the configuration check worked, False if 'check_write' is True and the object storage is actually read only, and an exception if the check failed. """ ...
@remote_api_endpoint("content/contains") def __contains__(self, obj_id: ObjId) -> bool: """Indicate if the given object is present in the storage. Args: obj_id: object identifier. Returns: True if and only if the object is present in the current object storage. """ ...
[docs] @remote_api_endpoint("content/add") def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: """Add a new object to the object storage. Args: content: object's raw content to add in storage. obj_id: either dict of checksums, or single checksum of [bytes] using [ID_HASH_ALGO] algorithm. It is trusted to match the bytes. check_presence (bool): indicate if the presence of the content should be verified before adding the file. Returns: the id (bytes) of the object into the storage. """ ...
[docs] @remote_api_endpoint("content/add/batch") def add_batch( self, contents: Union[Mapping[Sha1, bytes], Iterable[Tuple[ObjId, bytes]]], check_presence: bool = True, ) -> Dict: """Add a batch of new objects to the object storage. Args: contents: either mapping from [ID_HASH_ALGO] checksums to object contents, or list of pairs of dict hashes and object contents Returns: the summary of objects added to the storage (count of object, count of bytes object) """ ...
[docs] def restore(self, content: bytes, obj_id: ObjId) -> None: """Restore a content that have been corrupted. This function is identical to add but does not check if the object id is already in the file system. The default implementation provided by the current class is suitable for most cases. Args: content: object's raw content to add in storage obj_id: dict of hashes of the content (or only the sha1, for legacy clients) """ ...
[docs] @remote_api_endpoint("content/get") def get(self, obj_id: ObjId) -> bytes: """Retrieve the content of a given object. Args: obj_id: object id. Returns: the content of the requested object as bytes. Raises: ObjNotFoundError: if the requested object is missing. """ ...
[docs] @remote_api_endpoint("content/get/batch") def get_batch(self, obj_ids: Iterable[ObjId]) -> Iterator[Optional[bytes]]: """Retrieve objects' raw content in bulk from storage. Note: This function does have a default implementation in ObjStorage that is suitable for most cases. For object storages that needs to do the minimal number of requests possible (ex: remote object storages), that method can be overridden to perform a more efficient operation. Args: obj_ids: list of object ids. Returns: list of resulting contents, or None if the content could not be retrieved. Do not raise any exception as a fail for one content will not cancel the whole request. """ ...
[docs] @remote_api_endpoint("content/download_url") def download_url( self, obj_id: ObjId, content_disposition: Optional[str] = None, expiry: Optional[timedelta] = None, ) -> Optional[str]: """Get a direct download link for the object if the obstorage backend supports such feature. Some objstorage backends, typically cloud based ones like azure or s3, can provide a direct download link for a stored object. Args: obj_id: object identifier content_disposition: set Content-Disposition header for the generated URL response if the objstorage backend supports it expiry: the duration after which the URL expires if the objstorage backend supports it, if not provided the URL expires 24 hours after its creation Returns: Direct download URL for the object or :const:`None` if the objstorage backend does not support such feature. """ ...
[docs] @remote_api_endpoint("content/check") def check(self, obj_id: ObjId) -> None: """Perform an integrity check for a given object. Verify that the file object is in place and that the content matches the object id. Args: obj_id: object identifier. Raises: ObjNotFoundError: if the requested object is missing. ObjCorruptedError: if the requested object is corrupted. """ ...
[docs] @remote_api_endpoint("content/delete") def delete(self, obj_id: ObjId): """Delete an object. Args: obj_id: object identifier. Raises: ObjNotFoundError: if the requested object is missing. """ ...
def __iter__(self) -> Iterator[CompositeObjId]: ...
[docs] def list_content( self, last_obj_id: Optional[ObjId] = None, limit: Optional[int] = DEFAULT_LIMIT ) -> Iterator[CompositeObjId]: """Generates known object ids. Args: last_obj_id: object id from which to iterate from (excluded). limit (int): max number of object ids to generate. If unset (None), generate all objects (behavior might not be guaranteed for all backends). Generates: obj_id: object ids. """ ...