swh.objstorage.replayer.replay module#

swh.objstorage.replayer.replay.is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=20)[source]#

Checks if the given hash is in the provided array. The array must be a sorted list of sha1 hashes, and contain nb_hashes hashes (so its size must by nb_hashes*hash_size bytes).

Parameters:
  • hash (bytes) – the hash to look for

  • array (bytes) – a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. mmap.mmap)

  • nb_hashes (int) – number of hashes in the array

  • hash_size (int) – size of a hash (defaults to 20, for SHA1)

Example:

>>> import os
>>> hash1 = os.urandom(20)
>>> hash2 = os.urandom(20)
>>> hash3 = os.urandom(20)
>>> array = b''.join(sorted([hash1, hash2]))
>>> is_hash_in_bytearray(hash1, array, 2)
True
>>> is_hash_in_bytearray(hash2, array, 2)
True
>>> is_hash_in_bytearray(hash3, array, 2)
False
exception swh.objstorage.replayer.replay.ReplayError(*, obj_id, exc)[source]#

Bases: Exception

An error occurred during the replay of an object

swh.objstorage.replayer.replay.log_replay_retry(retry_state, sleep=None, last_result=None)[source]#

Log a retry of the content replayer

swh.objstorage.replayer.replay.log_replay_error(retry_state)[source]#

Log a replay error to sentry

class swh.objstorage.replayer.replay.retry_log_if_success[source]#

Bases: retry_base

Log in statsd the number of attempts required to succeed

swh.objstorage.replayer.replay.get_object(objstorage, obj_id)[source]#
swh.objstorage.replayer.replay.put_object(objstorage, obj_id, obj)[source]#
swh.objstorage.replayer.replay.copy_object(obj_id, src, dst)[source]#
swh.objstorage.replayer.replay.obj_in_objstorage(obj_id, dst)[source]#

Check if an object is already in an objstorage, tenaciously

class swh.objstorage.replayer.replay.ContentReplayer(src: Dict[str, Any], dst: Dict[str, Any], exclude_fn: Callable[[dict], bool] | None = None, check_dst: bool = True, check_obj: bool = False, concurrency: int = 16)[source]#

Bases: object

Helper class that takes a list of records from Kafka (see swh.journal.client.JournalClient.process()) and copies them from the src objstorage to the dst objstorage, if:

  • obj[‘status’] is ‘visible’

  • exclude_fn(obj) is False (if exclude_fn is provided)

  • obj[‘sha1’] not in dst (if check_dst is True)

Parameters:
  • src – An object storage configuration dict (see swh.objstorage.get_objstorage())

  • dst – An object storage configuration dict (see swh.objstorage.get_objstorage())

  • exclude_fn – Determines whether an object should be copied.

  • check_dst – Determines whether we should check the destination objstorage before copying.

  • check_obj – If check_dst is true, determines whether we should check the existing object in the destination objstorage is valid; if not, put the replayed object.

  • concurrency – Number of worker threads doing the replication process (retrieve, check, store).

See swh/objstorage/replayer/tests/test_replay.py for usage examples.

stop()[source]#

Stop replayer’s worker threads

replay(all_objects: Dict[str, List[dict]])[source]#