swh.objstorage.replayer.replay module¶
-
swh.objstorage.replayer.replay.
is_hash_in_bytearray
(hash_, array, nb_hashes, hash_size=20)[source]¶ Checks if the given hash is in the provided array. The array must be a sorted list of sha1 hashes, and contain nb_hashes hashes (so its size must by nb_hashes*hash_size bytes).
- Parameters
hash (bytes) – the hash to look for
array (bytes) – a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg.
mmap.mmap
)nb_hashes (int) – number of hashes in the array
hash_size (int) – size of a hash (defaults to 20, for SHA1)
Example:
>>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False
-
exception
swh.objstorage.replayer.replay.
ReplayError
(operation, *, obj_id, exc)[source]¶ Bases:
Exception
An error occurred during the replay of an object
-
swh.objstorage.replayer.replay.
log_replay_retry
(retry_obj, sleep, last_result)[source]¶ Log a retry of the content replayer
-
swh.objstorage.replayer.replay.
obj_in_objstorage
(obj_id, dst)[source]¶ Check if an object is already in an objstorage, tenaciously
-
swh.objstorage.replayer.replay.
process_replay_objects_content
(all_objects: Dict[str, List[dict]], *, src: swh.objstorage.objstorage.ObjStorage, dst: swh.objstorage.objstorage.ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True)[source]¶ Takes a list of records from Kafka (see
swh.journal.client.JournalClient.process()
) and copies them from the src objstorage to the dst objstorage, if:obj[‘status’] is ‘visible’
exclude_fn(obj) is False (if exclude_fn is provided)
obj[‘sha1’] not in dst (if check_dst is True)
- Parameters
all_objects – Objects passed by the Kafka client. Most importantly, all_objects[‘content’][*][‘sha1’] is the sha1 hash of each content.
src – An object storage (see
swh.objstorage.get_objstorage()
)dst – An object storage (see
swh.objstorage.get_objstorage()
)exclude_fn – Determines whether an object should be copied.
check_dst – Determines whether we should check the destination objstorage before copying.
Example:
>>> from swh.objstorage.factory import get_objstorage >>> src = get_objstorage('memory') >>> dst = get_objstorage('memory') >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True