Source code for swh.indexer.journal_client
# Copyright (C) 2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict
from swh.indexer.exception import ReportableError
from swh.indexer.indexer import BaseIndexer
from swh.journal.client import JournalClientBase
[docs]
class IndexerJournalClient(JournalClientBase):
indexer: BaseIndexer
def __init__(self, indexer, *args, **kwargs):
# The indexer declares the object_types to read so let's provide it to the
# journal client instantiation
kwargs["object_types"] = indexer.object_types
super().__init__(*args, **kwargs)
# The indexer collaborator which does indexation computation out of objects read
# from kafka
self.indexer = indexer
[docs]
def process_one_object(self, decoded_object, decoded_object_type, raw_message):
"""Implementation is in charge of calling the run method of the indexer and trap
any error to report it within the error_reporter.
"""
if decoded_object_type in self.indexer.object_types:
try:
# Make the indexer index the object
self.indexer.run([decoded_object])
except ReportableError as exc:
# If any reportable error is raised, let it be reported to the error
# reporter if any, otherwise propagate the error to the main process as
# usual
if self.error_reporter is None:
raise
self.log_error_report(
decoded_object,
decoded_object_type,
raw_message,
exc,
str(self.indexer.run),
)
[docs]
def log_error_report(
self,
object_d: Dict,
object_type: str,
msg,
exc: Exception,
operation: str,
) -> None:
"""Method called when issue must be reported without failing the index process.
This always logs the issue in sentry. This can also optionally logs the issue in
another reporter (when said reporter is declared in configuration).
Args:
object_d: The object dict which created a problem
object_type: its associated object_type
msg: The raw kafka message read from the topic
exc: The exception raised and caught (it contains the id of the object)
operation: The operation method called which raised the issue
Returns:
None
"""
from msgpack import dumps
import sentry_sdk
from swh.model.hashutil import hash_to_hex
# ReportedException's first argument is a string describing the issue
# The second argument is actually the id of the object in error
obj_id = exc.args[1]
obj_id_str = hash_to_hex(obj_id)
error_context: Dict[str, Any] = {
"obj_id": obj_id_str,
"object_type": object_type,
"operation": operation,
"exc": str(exc),
}
error_context["raw_message"] = {
"key": msg.key(), # bytes or None
"value": msg.value(), # bytes or None
"partition": msg.partition(), # int
"topic": msg.topic(), # str
"timestamp": msg.timestamp(),
}
# Report in sentry
with sentry_sdk.push_scope() as scope:
for k, v in error_context.items():
scope.set_extra(k, v)
sentry_sdk.capture_exception(exc)
self.indexer.log.error(
"Failed operation %(operation)s on %(obj_id)s exception: %(exc)s",
error_context,
)
oid = f"{object_type}:{obj_id_str}"
msg = dumps(error_context)
assert self.error_reporter is not None
self.error_reporter(oid, msg)