Source code for swh.fuse.backends.objstorage

# Copyright (C) 2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
import typing

from swh.core.api import RemoteException
from swh.core.statsd import Statsd, TimedContextManagerDecorator
from swh.fuse import LOGGER_NAME
from swh.model.swhids import CoreSWHID
from swh.objstorage.factory import get_objstorage
from swh.objstorage.interface import objid_from_dict
from swh.storage import get_storage

if typing.TYPE_CHECKING:
    from swh.objstorage.interface import ObjStorageInterface
    from swh.storage.interface import StorageInterface

from . import ContentBackend


[docs] class ObjStorageBackend(ContentBackend): """ This content backend relies on an ``swh-storage`` service, and maybe an ``swh-objstorage`` directly. See :ref:`Configuring files' download`. """ def __init__(self, conf: dict): self.logger = logging.getLogger(LOGGER_NAME) try: self.storage: StorageInterface = get_storage(**conf["content"]["storage"]) except KeyError: raise ValueError( "`content` configuration block should contain at least a `storage` object" ) try: self.objstorage: ObjStorageInterface | None = get_objstorage( **conf["content"]["objstorage"] ) except KeyError: self.objstorage = None self.storage_tracker = TimedContextManagerDecorator( Statsd(), "storage_response_time" ) self.objstorage_tracker = TimedContextManagerDecorator( Statsd(), "objstorage_response_time" )
[docs] def shutdown(self) -> None: self.logger.info( "Spent %f ms waiting for storage", self.storage_tracker.total_elapsed, ) self.logger.info( "Spent %f ms waiting for objstorage backend", self.objstorage_tracker.total_elapsed, )
[docs] async def get_blob(self, swhid: CoreSWHID) -> bytes: """ Fetch the content of a ``cnt`` object. """ hashes = None with self.storage_tracker: found = self.storage.content_get([swhid.object_id], algo="sha1_git") if found and found[0]: hashes = objid_from_dict(found[0].hashes()) if hashes: self.logger.debug(f"downloading {hashes} from objstorage") try: if self.objstorage is not None: with self.objstorage_tracker: return self.objstorage.get(hashes) else: with self.storage_tracker: content = self.storage.content_get_data(hashes) if content: return content raise ValueError(f"SWH-storage Cannot find object {swhid}") except RemoteException as e: if e.response: self.logger.error( f"Failed to fetch {hashes} from objstorage: {e.response.text}" ) raise e else: raise ValueError(f"SWH-storage Cannot find object {swhid}")