swh.storage.replay module

swh.storage.replay.process_replay_objects(all_objects: Dict[str, List[Dict[str, Any]]], *, storage: swh.storage.interface.StorageInterface) None[source]
swh.storage.replay.collision_aware_content_add(content_add_fn: Callable[[List[Any]], Dict[str, int]], contents: List[swh.model.model.BaseContent]) None[source]
Add contents to storage. If a hash collision is detected, an error is

logged. Then this adds the other non colliding contents to the storage.

  • content_add_fn – Storage content callable

  • contents – List of contents or skipped contents to add to storage

swh.storage.replay.dict_key_dropper(d: Dict, keys_to_drop: Container) Dict[source]

Returns a copy of the dict d without any key listed in keys_to_drop