swh.search.journal_client module#
- swh.search.journal_client.fetch_last_revision_release_date(snapshot_id: bytes, storage: StorageInterface) Dict[str, str][source]#
- swh.search.journal_client.convert_origin(origin, storage: StorageInterface)[source]#
- swh.search.journal_client.convert_origin_visit_status(visit_status, storage: StorageInterface)[source]#
- swh.search.journal_client.convert_from_origin_metadata_dict(origin_dict: Dict, storage: StorageInterface) Dict[source]#
- swh.search.journal_client.convert_journal_object(object_type: str, object_data: Dict, storage: StorageInterface) Dict | None[source]#
Convert object_data of type object_type into a ready dict of data to be pushed to elasticsearch.
- class swh.search.journal_client.SearchJournalClient(search: SearchInterface, storage: StorageInterface, *args, **kwargs)[source]#
Bases:
JournalClientBaseJournal Client implementation
- object_types = {'origin', 'origin_extrinsic_metadata', 'origin_intrinsic_metadata', 'origin_visit_status'}#
- process_one_object(decoded_object, decoded_object_type, raw_message)[source]#
Process decoded (unserialized) object of type decoded_object_type.
Non-critical errors can be passed to self.error_reporter with the raw Kafka message ((topic, partition and offset) as argument).
Currently implemented in this method so various journal clients can implement their own trap-and-continue policy. We may decide to implement it generically in self.handle_message_batch method instead in the future.