Source code for swh.objstorage.backends.seaweedfs.objstorage

# Copyright (C) 2019-2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from datetime import timedelta
import io
import logging
from typing import Optional
from urllib.parse import urlparse

from swh.objstorage.backends.pathslicing import PathSlicer
from swh.objstorage.constants import LiteralPrimaryHash
from swh.objstorage.exc import ObjNotFoundError
from swh.objstorage.interface import HashDict
from swh.objstorage.objstorage import (
    CompressionFormat,
    ObjStorage,
    objid_to_default_hex,
    timed,
)

from .http import HttpFiler

LOGGER = logging.getLogger(__name__)


[docs] class SeaweedFilerObjStorage(ObjStorage): """ObjStorage with seaweedfs abilities, using the Filer API. https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API """ primary_hash: LiteralPrimaryHash = "sha1" name: str = "seaweedfs" def __init__( self, *, url: str = "", compression: CompressionFormat | None = None, slicing: str = "", pool_maxsize: int = 100, **kwargs, ): super().__init__(**kwargs) if compression is None: LOGGER.warning( "Deprecated: compression is undefined. " "Defaulting to none, but please set it explicitly." ) compression = "none" self.compression = compression self.wf = HttpFiler(url, pool_maxsize=pool_maxsize) self.root_path = urlparse(url).path if not self.root_path.endswith("/"): self.root_path += "/" self.slicer = PathSlicer(self.root_path, slicing, self.primary_hash)
[docs] def check_config(self, *, check_write): """Check the configuration for this object storage""" # FIXME: hopefully this blew up during instantiation return True
@timed def __contains__(self, obj_id: HashDict) -> bool: return self.wf.exists(self._path(obj_id))
[docs] @timed def add( self, content: bytes, obj_id: HashDict, check_presence: bool = True ) -> None: if check_presence and obj_id in self: return self.wf.put(io.BytesIO(self.compress(content)), self._path(obj_id))
[docs] @timed def restore(self, content: bytes, obj_id: HashDict) -> None: return self.add(content, obj_id, check_presence=False)
[docs] @timed def get(self, obj_id: HashDict) -> bytes: try: obj = self.wf.get(self._path(obj_id)) except Exception as exc: LOGGER.info("Failed to get object %s: %r", self._path(obj_id), exc) raise ObjNotFoundError(obj_id) return self.decompress(obj, objid_to_default_hex(obj_id, self.primary_hash))
[docs] def download_url( self, obj_id: HashDict, content_disposition: Optional[str] = None, expiry: Optional[timedelta] = None, ) -> Optional[str]: path = self._path(obj_id) if not self.wf.exists(path): raise ObjNotFoundError(obj_id) return self.wf.build_url(path)
[docs] def delete(self, obj_id: HashDict): super().delete(obj_id) # Check delete permission if obj_id not in self: raise ObjNotFoundError(obj_id) self.wf.delete(self._path(obj_id)) return True
# internal methods def _path(self, obj_id: HashDict): """Compute the backend path for the given obj id Given an object is, return the path part of the url to query the backend seaweedfs filer service with, according the configured path slicing. """ return self.slicer.get_path(objid_to_default_hex(obj_id, self.primary_hash))