swh.journal.client module

swh.journal.client.get_journal_client(cls: str, **kwargs: Any)[source]

Factory function to instantiate a journal client object.

Currently, only the “kafka” journal client is supported.

class swh.journal.client.JournalClient(brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool = False, **kwargs)[source]

Bases: object

A base client for the Software Heritage journal.

The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the prefix argument is None (default value), it will take the default value ‘swh.journal.objects’.

Clients subscribe to events specific to each object type as listed in the object_types argument (if unset, defaults to all existing kafka topic under the prefix).

Clients can be sharded by setting the group_id to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id.

Messages are processed by the worker_fn callback passed to the process method, in batches of maximum batch_size messages (defaults to 200).

If set, the processing stops after processing stop_after_objects messages in total.

stop_on_eof stops the processing when the client has reached the end of each partition in turn.

auto_offset_reset sets the behavior of the client when the consumer group initializes: ‘earliest’ (the default) processes all objects since the inception of the topics; ‘’

Any other named argument is passed directly to KafkaConsumer().

subscribe()[source]

Subscribe to topics listed in self.subscription

This can be overridden if you need, for instance, to manually assign partitions.

process(worker_fn)[source]

Polls Kafka for a batch of messages, and calls the worker_fn with these messages.

Parameters

Callable[Dict[str, List[dict]]] (worker_fn) – Function called with the messages as argument.

handle_messages(messages, worker_fn)[source]
deserialize_message(message)[source]
close()[source]