swh.journal.replay module

swh.journal.replay.process_replay_objects(all_objects, *, storage)[source]
swh.journal.replay.collision_aware_content_add(content_add_fn: Callable[[Iterable[Any]], None], contents: List[swh.model.model.BaseContent]) → None[source]
Add contents to storage. If a hash collision is detected, an error is

logged. Then this adds the other non colliding contents to the storage.

  • content_add_fn – Storage content callable

  • contents – List of contents or skipped contents to add to storage

swh.journal.replay.is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=20)[source]

Checks if the given hash is in the provided array. The array must be a sorted list of sha1 hashes, and contain nb_hashes hashes (so its size must by nb_hashes*hash_size bytes).

  • hash (bytes) – the hash to look for

  • array (bytes) – a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. mmap.mmap)

  • nb_hashes (int) – number of hashes in the array

  • hash_size (int) – size of a hash (defaults to 20, for SHA1)


>>> import os
>>> hash1 = os.urandom(20)
>>> hash2 = os.urandom(20)
>>> hash3 = os.urandom(20)
>>> array = b''.join(sorted([hash1, hash2]))
>>> is_hash_in_bytearray(hash1, array, 2)
>>> is_hash_in_bytearray(hash2, array, 2)
>>> is_hash_in_bytearray(hash3, array, 2)
exception swh.journal.replay.ReplayError(operation, *, obj_id, exc)[source]

Bases: Exception

An error occurred during the replay of an object

swh.journal.replay.log_replay_retry(retry_obj, sleep, last_result)[source]

Log a retry of the content replayer


Log a replay error to sentry

swh.journal.replay.copy_object(obj_id, src, dst)[source]
swh.journal.replay.obj_in_objstorage(obj_id, dst)[source]

Check if an object is already in an objstorage, tenaciously

swh.journal.replay.process_replay_objects_content(all_objects: Dict[str, List[dict]], *, src: swh.objstorage.objstorage.ObjStorage, dst: swh.objstorage.objstorage.ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True)[source]

Takes a list of records from Kafka (see swh.journal.client.JournalClient.process()) and copies them from the src objstorage to the dst objstorage, if:

  • obj[‘status’] is ‘visible’

  • exclude_fn(obj) is False (if exclude_fn is provided)

  • obj[‘sha1’] not in dst (if check_dst is True)

  • all_objects – Objects passed by the Kafka client. Most importantly, all_objects[‘content’][*][‘sha1’] is the sha1 hash of each content.

  • src – An object storage (see swh.objstorage.get_objstorage())

  • dst – An object storage (see swh.objstorage.get_objstorage())

  • exclude_fn – Determines whether an object should be copied.

  • check_dst – Determines whether we should check the destination objstorage before copying.


>>> from swh.objstorage import get_objstorage
>>> src = get_objstorage('memory', {})
>>> dst = get_objstorage('memory', {})
>>> id1 = src.add(b'foo bar')
>>> id2 = src.add(b'baz qux')
>>> kafka_partitions = {
...     'content': [
...         {
...             'sha1': id1,
...             'status': 'visible',
...         },
...         {
...             'sha1': id2,
...             'status': 'visible',
...         },
...     ]
... }
>>> process_replay_objects_content(
...     kafka_partitions, src=src, dst=dst,
...     exclude_fn=lambda obj: obj['sha1'] == id1)
>>> id1 in dst
>>> id2 in dst