Source code for swh.objstorage.api.client
# Copyright (C) 2015-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Iterator, Optional
import msgpack
from swh.core.api import RPCClient
from swh.objstorage.constants import DEFAULT_LIMIT
from swh.objstorage.exc import (
Error,
NoBackendsLeftError,
ObjCorruptedError,
ObjNotFoundError,
ObjStorageAPIError,
)
from swh.objstorage.interface import CompositeObjId, ObjId, ObjStorageInterface
from swh.objstorage.objstorage import objid_to_default_hex, timed
[docs]
class RemoteObjStorage(RPCClient):
"""Proxy to a remote object storage.
This class allows to connect to an object storage server via
http protocol.
Attributes:
url (string): The url of the server to connect. Must end
with a '/'
session: The session to send requests.
"""
api_exception = ObjStorageAPIError
reraise_exceptions = [
ObjNotFoundError,
Error,
ObjCorruptedError,
NoBackendsLeftError,
PermissionError,
]
backend_class = ObjStorageInterface
name: str = "remote"
[docs]
def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId) -> None:
return self.add(content, obj_id, check_presence=False)
def __iter__(self) -> Iterator[CompositeObjId]:
yield from self.list_content()
[docs]
def list_content(
self,
last_obj_id: Optional[ObjId] = None,
limit: Optional[int] = DEFAULT_LIMIT,
) -> Iterator[CompositeObjId]:
params: Dict[str, Any] = {}
if limit:
params["limit"] = limit
if last_obj_id:
params["last_obj_id"] = objid_to_default_hex(last_obj_id)
response = self.raw_verb(
"get",
"content",
headers={"accept": "application/x-msgpack"},
params=params,
stream=True,
)
yield from msgpack.Unpacker(response.raw, raw=False)
# XXX Maybe there is a better way of doing this, but according the automagic
# way this class is built (via the MetaRPCClient metaclass), one cannot easily
# just overload the methods in the class definition.
RemoteObjStorage.get = timed(RemoteObjStorage.get) # type: ignore
RemoteObjStorage.add = timed(RemoteObjStorage.add) # type: ignore
RemoteObjStorage.__contains__ = timed(RemoteObjStorage.__contains__) # type: ignore