swh.storage.replay module#

class swh.storage.replay.ModelObjectDeserializer(validate: bool = True, raise_on_error: bool = False, reporter: Optional[Callable[[str, bytes], None]] = None)[source]#

Bases: object

A swh.journal object deserializer that checks object validity and reports invalid objects

The deserializer will directly produce BaseModel objects from journal objects representations.

If validation is activated and the object is hashable, it will check if the computed hash matches the identifier of the object.

If the object is invalid and a ‘reporter’ function is given, it will be called with 2 arguments:

reporter(object_id, journal_msg)

Where ‘object_id’ is a string representation of the object identifier (from the journal message), and ‘journal_msg’ is the row message (bytes) retrieved from the journal.

If ‘raise_on_error’ is True, a ‘StorageArgumentException’ exception is raised.

Typical usage:

deserializer = ModelObjectDeserializer(validate=True, reporter=reporter_cb)
client = get_journal_client(
    cls="kafka", value_deserializer=deserializer, **cfg)
convert(object_type: str, msg: bytes) Optional[BaseModel][source]#
report_failure(msg: bytes, obj: Union[BaseModel, Tuple[str, Dict[str, Any]]])[source]#
swh.storage.replay.process_replay_objects(all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface) None[source]#
swh.storage.replay.collision_aware_content_add(contents: List[ContentType], content_add_fn: Callable[[List[ContentType]], Dict[str, int]]) Dict[str, int][source]#
Add contents to storage. If a hash collision is detected, an error is

logged. Then this adds the other non colliding contents to the storage.

Parameters:
  • content_add_fn – Storage content callable

  • contents – List of contents or skipped contents to add to storage