swh.storage.replay module#

class swh.storage.replay.ModelObjectDeserializer(validate: bool = True, raise_on_error: bool = False, reporter: Callable[[str, bytes], None] | None = None, known_mismatched_hashes: Tuple[Tuple[str, bytes, bytes]] | None = None)[source]#

Bases: object

A swh.journal object deserializer that checks object validity and reports invalid objects

The deserializer will directly produce BaseModel objects from journal objects representations.

If validation is activated and the object is hashable, it will check if the computed hash matches the identifier of the object.

If the object is invalid and a ‘reporter’ function is given, it will be called with 2 arguments:

reporter(object_id, journal_msg)

Where ‘object_id’ is a string representation of the object identifier (from the journal message), and ‘journal_msg’ is the row message (bytes) retrieved from the journal.

If ‘raise_on_error’ is True, a ‘StorageArgumentException’ exception is raised.

If ‘known_mismatched_hashes’ is given, it must be a tuple of triplets (object_type, object_id, expected_id) listing objects that store invalid hash (object_id) instead of the computed expected_id, but should not be discarded (i.e. they should be replicated by the replayer despite being invalid).

Typical usage:

deserializer = ModelObjectDeserializer(validate=True, reporter=reporter_cb)
client = get_journal_client(
    cls="kafka", value_deserializer=deserializer, **cfg)
convert(object_type: str, msg: bytes) BaseModel | None[source]#
report_failure(msg: bytes, obj: BaseModel | Tuple[str, Dict[str, Any]])[source]#
swh.storage.replay.process_replay_objects(all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface) None[source]#
swh.storage.replay.collision_aware_content_add(contents: List[ContentType], content_add_fn: Callable[[List[ContentType]], Dict[str, int]]) Dict[str, int][source]#
Add contents to storage. If a hash collision is detected, an error is

logged. Then this adds the other non colliding contents to the storage.

Parameters:
  • content_add_fn – Storage content callable

  • contents – List of contents or skipped contents to add to storage