swh.objstorage.objstorage module#

swh.objstorage.objstorage.objid_to_default_hex(obj_id: bytes | CompositeObjId, algo: Literal['sha1', 'sha256'] = 'sha1') str[source]#

Converts SHA1 hashes and multi-hashes to the hexadecimal representation of the SHA1.

swh.objstorage.objstorage.compute_hashes(content: bytes, hash_names: Iterable[str] = {'blake2s256', 'sha1', 'sha1_git', 'sha256'}) Dict[str, bytes][source]#

Compute the content’s hashes.

Parameters:
Returns:

A dict mapping algo name to hash value

swh.objstorage.objstorage.compute_hash(content: bytes, algo: str = 'sha1') bytes[source]#

Compute the content’s hash.

Parameters:
  • content – The raw content to hash

  • hash_name – Hash’s name

Returns:

The computed hash for the content

class swh.objstorage.objstorage.NullCompressor[source]#

Bases: object

compress(data)[source]#
flush()[source]#
class swh.objstorage.objstorage.NullDecompressor[source]#

Bases: object

decompress(data: bytes) bytes[source]#
property unused_data: bytes#
class swh.objstorage.objstorage.ObjStorage(*, allow_delete=False, **kwargs)[source]#

Bases: object

PRIMARY_HASH: Literal['sha1', 'sha256'] = 'sha1'#
compression: Literal['bz2', 'lzma', 'gzip', 'zlib', 'none'] = 'none'#
add_batch(contents: Mapping[bytes, bytes] | Iterable[Tuple[bytes | CompositeObjId, bytes]], check_presence: bool = True) Dict[source]#
restore(content: bytes, obj_id: bytes | CompositeObjId) None[source]#
get_batch(obj_ids: Iterable[bytes | CompositeObjId]) Iterator[bytes | None][source]#
abstract delete(obj_id: bytes | CompositeObjId)[source]#
list_content(last_obj_id: bytes | CompositeObjId | None = None, limit: int | None = 10000) Iterator[CompositeObjId][source]#
download_url(obj_id: bytes | CompositeObjId, content_disposition: str | None = None, expiry: timedelta | None = None) str | None[source]#
abstract get(obj_id: bytes | CompositeObjId) bytes[source]#
check(obj_id: bytes | CompositeObjId) None[source]#

Check if a content is found and recompute its hash to check integrity.

compress(data: bytes) bytes[source]#
decompress(data: bytes, hex_obj_id: str) bytes[source]#