swh.objstorage.replayer.replay module#

exception swh.objstorage.replayer.replay.LengthMismatch(expected, received)[source]#

Bases: Exception

exception swh.objstorage.replayer.replay.HashMismatch(expected, received)[source]#

Bases: Exception

swh.objstorage.replayer.replay.format_obj_id(obj_id: CompositeObjId) str[source]#
swh.objstorage.replayer.replay.hex_obj_id(obj_id: CompositeObjId) Dict[str, str][source]#
swh.objstorage.replayer.replay.logger_debug_obj_id(msg, args, **kwargs)[source]#
swh.objstorage.replayer.replay.is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=20)[source]#

Checks if the given hash is in the provided array. The array must be a sorted list of sha1 hashes, and contain nb_hashes hashes (so its size must by nb_hashes*hash_size bytes).

  • hash (bytes) – the hash to look for

  • array (bytes) – a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. mmap.mmap)

  • nb_hashes (int) – number of hashes in the array

  • hash_size (int) – size of a hash (defaults to 20, for SHA1)


>>> import os
>>> hash1 = os.urandom(20)
>>> hash2 = os.urandom(20)
>>> hash3 = os.urandom(20)
>>> array = b''.join(sorted([hash1, hash2]))
>>> is_hash_in_bytearray(hash1, array, 2)
>>> is_hash_in_bytearray(hash2, array, 2)
>>> is_hash_in_bytearray(hash3, array, 2)
exception swh.objstorage.replayer.replay.ReplayError(*, obj_id: CompositeObjId, exc)[source]#

Bases: Exception

An error occurred during the replay of an object

swh.objstorage.replayer.replay.log_replay_retry(retry_state: RetryCallState, sleep: float | None = None, last_result: Any = None) None[source]#

Log a retry of the content replayer

swh.objstorage.replayer.replay.log_replay_error(obj_id: CompositeObjId, exc: Exception, operation: str, retries: int) None[source]#
swh.objstorage.replayer.replay.retry_error_callback(retry_state: RetryCallState) None[source]#

Log a replay error to sentry

class swh.objstorage.replayer.replay.retry_log_if_success[source]#

Bases: retry_base

Log in statsd the number of attempts required to succeed

swh.objstorage.replayer.replay.get_object(objstorage: ObjStorageInterface, obj_id: CompositeObjId) bytes[source]#
swh.objstorage.replayer.replay.check_hashes(obj: bytes, obj_id: CompositeObjId)[source]#
swh.objstorage.replayer.replay.put_object(objstorage: ObjStorageInterface, obj_id: CompositeObjId, obj: bytes)[source]#
swh.objstorage.replayer.replay.copy_object(obj_id: CompositeObjId, obj_len: int, src: ObjStorageInterface, dst: ObjStorageInterface, check_src_hashes: bool = False) int[source]#
swh.objstorage.replayer.replay.obj_in_objstorage(obj_id: CompositeObjId, dst: ObjStorageInterface) bool[source]#

Check if an object is already in an objstorage, tenaciously

class swh.objstorage.replayer.replay.ContentReplayer(src: Dict[str, Any], dst: Dict[str, Any], exclude_fn: Callable[[Dict[str, Any]], bool] | None = None, check_dst: bool = True, check_obj: bool = False, check_src_hashes: bool = False, concurrency: int = 16)[source]#

Bases: object

Helper class that takes a list of records from Kafka (see swh.journal.client.JournalClient.process()) and copies them from the src objstorage to the dst objstorage, if:

  • obj[‘status’] is ‘visible’

  • exclude_fn(obj) is False (if exclude_fn is provided)

  • CompositeObjId(**obj) not in dst (if check_dst is True)

  • src – An object storage configuration dict (see swh.objstorage.get_objstorage())

  • dst – An object storage configuration dict (see swh.objstorage.get_objstorage())

  • exclude_fn – Determines whether an object should be copied.

  • check_dst – Determines whether we should check the destination objstorage before copying.

  • check_obj – If check_dst is true, determines whether we should check the existing object in the destination objstorage is valid; if not, put the replayed object.

  • check_src_hashes – Checks the object before sending it to the dst objstorage.

  • concurrency – Number of worker threads doing the replication process (retrieve, check, store).

See swh/objstorage/replayer/tests/test_replay.py for usage examples.


Stop replayer’s worker threads

replay(all_objects: Dict[str, List[dict]])[source]#