swh.journal.client module#
- class swh.journal.client.EofBehavior(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
Possible behaviors when reaching the end of the log
- CONTINUE = 'continue'#
- STOP = 'stop'#
- RESTART = 'restart'#
- swh.journal.client.get_journal_client(cls: str, **kwargs: Any)[source]#
Factory function to instantiate a journal client object.
Currently, only the “kafka” journal client is supported.
- class swh.journal.client.JournalClient(brokers: str | List[str], group_id: str, prefix: str | None = None, object_types: List[str] | None = None, privileged: bool = False, stop_after_objects: int | None = None, batch_size: int = 200, process_timeout: float | None = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool | None = None, on_eof: EofBehavior | str | None = None, value_deserializer: Callable[[str, bytes], Any] | None = None, create_topics: bool = False, **kwargs)[source]#
Bases:
object
A base client for the Software Heritage journal.
The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the prefix argument is None (default value), it will take the default value ‘swh.journal.objects’.
Clients subscribe to events specific to each object type as listed in the
object_types
argument (if unset, defaults to all existing kafka topic under the prefix).Clients can be sharded by setting the
group_id
to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id.Messages are processed by the
worker_fn
callback passed to the process method, in batches of maximumbatch_size
messages (defaults to 200).The objects passed to the
worker_fn
callback are the result of the kafka message converted by thevalue_deserializer
function. By default (if this argument is not given), it will produce dicts (using thekafka_to_value
function). This signature of the function is:value_deserializer(object_type: str, kafka_msg: bytes) -> Any
If the value returned by
value_deserializer
is None, it is ignored and not passed theworker_fn
function.- Parameters:
stop_after_objects – If set, the processing stops after processing this number of messages in total.
on_eof – What to do when reaching the end of each partition (keep consuming, stop, or restart from earliest offsets); defaults to continuing. This can be either a
EofBehavior
variant or a string containing the name of one of the variants.stop_on_eof – (deprecated) equivalent to passing
on_eof=EofBehavior.STOP
auto_offset_reset – sets the behavior of the client when the consumer group initializes:
'earliest'
(the default) processes all objects since the inception of the topics;''
create_topics – Create kafka topics if they do not exist, not for production, this flag should only be enabled in development environments.
Any other named argument is passed directly to KafkaConsumer().
- subscribe()[source]#
Subscribe to topics listed in self.subscription
This can be overridden if you need, for instance, to manually assign partitions.
- process(worker_fn: Callable[[Dict[str, List[dict]]], None])[source]#
Polls Kafka for a batch of messages, and calls the worker_fn with these messages.
- Parameters:
worker_fn – Function called with the messages as argument.