swh.search.journal_client module#

swh.search.journal_client.fetch_last_revision_release_date(snapshot_id: bytes, storage: StorageInterface) Dict[str, str][source]#
swh.search.journal_client.convert_origin(origin, storage: StorageInterface)[source]#
swh.search.journal_client.convert_origin_visit_status(visit_status, storage: StorageInterface)[source]#
swh.search.journal_client.convert_from_origin_metadata_dict(origin_dict: Dict, storage: StorageInterface) Dict[source]#
swh.search.journal_client.convert_journal_object(object_type: str, object_data: Dict, storage: StorageInterface) Dict | None[source]#

Convert object_data of type object_type into a ready dict of data to be pushed to elasticsearch.

class swh.search.journal_client.SearchJournalClient(search: SearchInterface, storage: StorageInterface, *args, **kwargs)[source]#

Bases: JournalClientBase

Journal Client implementation

object_types = {'origin', 'origin_extrinsic_metadata', 'origin_intrinsic_metadata', 'origin_visit_status'}#
process_one_object(decoded_object, decoded_object_type, raw_message)[source]#

Process decoded (unserialized) object of type decoded_object_type.

Non-critical errors can be passed to self.error_reporter with the raw Kafka message ((topic, partition and offset) as argument).

Currently implemented in this method so various journal clients can implement their own trap-and-continue policy. We may decide to implement it generically in self.handle_message_batch method instead in the future.