swh.objstorage.replayer.replay module#
- exception swh.objstorage.replayer.replay.LengthMismatch(expected, received)[source]#
- swh.objstorage.replayer.replay.is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=20)[source]#
Checks if the given hash is in the provided array. The array must be a sorted list of sha1 hashes, and contain nb_hashes hashes (so its size must by nb_hashes*hash_size bytes).
- Parameters:
>>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False
- exception swh.objstorage.replayer.replay.ReplayError(*, obj_id: ObjId, exc)[source]#
An error occurred during the replay of an object
- swh.objstorage.replayer.replay.log_replay_retry(retry_state: RetryCallState, sleep: float | None = None, last_result: Any = None) None [source]#
Log a retry of the content replayer
- swh.objstorage.replayer.replay.log_replay_error(obj_id: ObjId, exc: Exception, operation: str, retries: int) None [source]#
- swh.objstorage.replayer.replay.retry_error_callback(retry_state: RetryCallState) None [source]#
Log a replay error to sentry
- class swh.objstorage.replayer.replay.retry_log_if_success[source]#
Log in statsd the number of attempts required to succeed
- swh.objstorage.replayer.replay.get_object(objstorage: ObjStorageInterface, obj_id: ObjId) bytes [source]#
- swh.objstorage.replayer.replay.put_object(objstorage: ObjStorageInterface, obj_id: ObjId, obj: bytes)[source]#
- swh.objstorage.replayer.replay.copy_object(obj_id: ObjId, obj_len: int, src: ObjStorageInterface, dst: ObjStorageInterface, check_src_hashes: bool = False) int [source]#
- swh.objstorage.replayer.replay.obj_in_objstorage(obj_id: ObjId, dst: ObjStorageInterface) bool [source]#
Check if an object is already in an objstorage, tenaciously
- class swh.objstorage.replayer.replay.ContentReplayer(src: Dict[str, Any], dst: Dict[str, Any], exclude_fn: Callable[[Dict[str, Any]], bool] | None = None, check_dst: bool = True, check_obj: bool = False, check_src_hashes: bool = False, concurrency: int = 16)[source]#
Helper class that takes a list of records from Kafka (see
) and copies them from the src objstorage to the dst objstorage, if:obj[‘status’] is ‘visible’
exclude_fn(obj) is False (if exclude_fn is provided)
CompositeObjId(**obj) not in dst (if check_dst is True)
- Parameters:
src – An object storage configuration dict (see
)dst – An object storage configuration dict (see
)exclude_fn – Determines whether an object should be copied.
check_dst – Determines whether we should check the destination objstorage before copying.
check_obj – If check_dst is true, determines whether we should check the existing object in the destination objstorage is valid; if not, put the replayed object.
check_src_hashes – Checks the object before sending it to the dst objstorage.
concurrency – Number of worker threads doing the replication process (retrieve, check, store).
See swh/objstorage/replayer/tests/test_replay.py for usage examples.