Source code for swh.counters.kafka_client
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from typing import Dict
from confluent_kafka import KafkaError
from swh.journal.client import EofBehavior, JournalClient, _error_cb
[docs]
class KeyOrientedJournalClient(JournalClient):
"""Journal Client implementation which only decodes the message keys.
This does not need to bother with the message deserialization (contrary
to :class:`swh.journal.client.JournalClient`)
Message values are still passed unparsed to ``worker_fn`` so it can
deserialize and use it if needed.
"""
[docs]
def handle_messages(self, messages, worker_fn):
objects: Dict[str, Dict[bytes, bytes]] = defaultdict(dict)
nb_processed = 0
for message in messages:
error = message.error()
if error is not None:
if error.code() == KafkaError._PARTITION_EOF:
self.eof_reached.add((message.topic(), message.partition()))
else:
_error_cb(error)
continue
if message.value() is None:
# ignore message with no payload, these can be generated in tests
continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
objects[object_type][message.key()] = message.value()
if objects:
worker_fn(dict(objects))
self.consumer.commit()
at_eof = self.on_eof == EofBehavior.STOP and all(
(tp.topic, tp.partition) in self.eof_reached
for tp in self.consumer.assignment()
)
return nb_processed, at_eof