swh.objstorage.replayer.replay module

swh.objstorage.replayer.replay.is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=20)[source]

Checks if the given hash is in the provided array. The array must be a sorted list of sha1 hashes, and contain nb_hashes hashes (so its size must by nb_hashes*hash_size bytes).

  • hash (bytes) – the hash to look for

  • array (bytes) – a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. mmap.mmap)

  • nb_hashes (int) – number of hashes in the array

  • hash_size (int) – size of a hash (defaults to 20, for SHA1)


>>> import os
>>> hash1 = os.urandom(20)
>>> hash2 = os.urandom(20)
>>> hash3 = os.urandom(20)
>>> array = b''.join(sorted([hash1, hash2]))
>>> is_hash_in_bytearray(hash1, array, 2)
>>> is_hash_in_bytearray(hash2, array, 2)
>>> is_hash_in_bytearray(hash3, array, 2)
exception swh.objstorage.replayer.replay.ReplayError(*, obj_id, exc)[source]

Bases: Exception

An error occurred during the replay of an object

swh.objstorage.replayer.replay.log_replay_retry(retry_state, sleep=None, last_result=None)[source]

Log a retry of the content replayer


Log a replay error to sentry

class swh.objstorage.replayer.replay.retry_log_if_success[source]

Bases: tenacity.retry.retry_base

Log in statsd the number of attempts required to succeed

swh.objstorage.replayer.replay.get_object(objstorage, obj_id)[source]
swh.objstorage.replayer.replay.put_object(objstorage, obj_id, obj)[source]
swh.objstorage.replayer.replay.copy_object(obj_id, src, dst)[source]
swh.objstorage.replayer.replay.obj_in_objstorage(obj_id, dst)[source]

Check if an object is already in an objstorage, tenaciously

swh.objstorage.replayer.replay.process_replay_objects_content(all_objects: Dict[str, List[dict]], *, src: swh.objstorage.objstorage.ObjStorage, dst: swh.objstorage.objstorage.ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, concurrency: int = 16)[source]

Takes a list of records from Kafka (see swh.journal.client.JournalClient.process()) and copies them from the src objstorage to the dst objstorage, if:

  • obj[‘status’] is ‘visible’

  • exclude_fn(obj) is False (if exclude_fn is provided)

  • obj[‘sha1’] not in dst (if check_dst is True)

  • all_objects – Objects passed by the Kafka client. Most importantly, all_objects[‘content’][*][‘sha1’] is the sha1 hash of each content.

  • src – An object storage (see swh.objstorage.get_objstorage())

  • dst – An object storage (see swh.objstorage.get_objstorage())

  • exclude_fn – Determines whether an object should be copied.

  • check_dst – Determines whether we should check the destination objstorage before copying.


>>> from swh.objstorage.factory import get_objstorage
>>> src = get_objstorage('memory')
>>> dst = get_objstorage('memory')
>>> id1 = src.add(b'foo bar')
>>> id2 = src.add(b'baz qux')
>>> kafka_partitions = {
...     'content': [
...         {
...             'sha1': id1,
...             'status': 'visible',
...         },
...         {
...             'sha1': id2,
...             'status': 'visible',
...         },
...     ]
... }
>>> process_replay_objects_content(
...     kafka_partitions, src=src, dst=dst,
...     exclude_fn=lambda obj: obj['sha1'] == id1)
>>> id1 in dst
>>> id2 in dst