swh.journal.client module#

class swh.journal.client.EofBehavior(value)[source]#

Bases: Enum

Possible behaviors when reaching the end of the log

CONTINUE = 'continue'#
STOP = 'stop'#
RESTART = 'restart'#
swh.journal.client.get_journal_client(cls: str, **kwargs: Any)[source]#

Factory function to instantiate a journal client object.

Currently, only the “kafka” journal client is supported.

class swh.journal.client.JournalClientBase(brokers: str | List[str], group_id: str, prefix: str | None = None, object_types: List[str] | None = None, privileged: bool = False, stop_after_objects: int | None = None, batch_size: int = 200, process_timeout: float | None = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool | None = None, on_eof: EofBehavior | str | None = None, value_deserializer: Callable[[str, bytes], Any] | None = None, create_topics: bool = False, error_reporter: Dict | None = None, **kwargs)[source]#

Bases: object

A base client for the Software Heritage journal.

The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the prefix argument is None (default value), it will take the default value ‘swh.journal.objects’.

Clients subscribe to events specific to each object type as listed in the object_types argument (if unset, defaults to all existing kafka topic under the prefix).

Clients can be sharded by setting the group_id to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id.

Messages are processed in batches of maximum batch_size, one at a time through the process_one_object method call. That method needs to be implemented by a subclass method. It’s up to the implementation to catch and report issues in the error reporter if configured.

The objects passed to the process_one_object method are the result of the kafka message converted by the value_deserializer function. By default (if this argument is not given), it will produce dicts (using the kafka_to_value function). This signature of the function is:

value_deserializer(object_type: str, kafka_msg: bytes) -> Any

If the value returned by value_deserializer is None, it is ignored and not passed to the method.

Parameters:
  • stop_after_objects – If set, the processing stops after processing this number of messages in total.

  • on_eof – What to do when reaching the end of each partition (keep consuming, stop, or restart from earliest offsets); defaults to continuing. This can be either a EofBehavior variant or a string containing the name of one of the variants.

  • stop_on_eof – (deprecated) equivalent to passing on_eof=EofBehavior.STOP

  • auto_offset_reset – sets the behavior of the client when the consumer group initializes: 'earliest' (the default) processes all objects since the inception of the topics; ''

  • create_topics – Create kafka topics if they do not exist, not for production, this flag should only be enabled in development environments.

  • error_reporter – (Optional) configuration dict regarding an error reporter to instantiate. Implementation wise, It’s up to the journal client to decide whether to use the error_reporter (to trap errors and continue) or not.

Any other named argument is passed directly to KafkaConsumer().

subscribe()[source]#

Subscribe to topics listed in self.subscription

This can be overridden if you need, for instance, to manually assign partitions.

process_one_object(decoded_object, decoded_object_type, raw_message)[source]#

Process decoded (unserialized) object of type decoded_object_type.

Non-critical errors can be passed to self.error_reporter with the raw Kafka message ((topic, partition and offset) as argument).

Currently implemented in this method so various journal clients can implement their own trap-and-continue policy. We may decide to implement it generically in self.handle_message_batch method instead in the future.

commit_batch()[source]#

Commit the batch of objects read. This should take care of the offset commits.

process_messages()[source]#

Polls Kafka for a batch of messages, and calls the worker_fn with these messages.

Parameters:

worker_fn – Function called with the messages as argument.

handle_message_batch(messages) Tuple[int, bool][source]#
deserialize_message(message, object_type=None)[source]#
close()[source]#
class swh.journal.client.JournalClient(*args, **kwargs)[source]#

Bases: JournalClientBase

Existing JournalClient implementation kept for backward-compatibility.

Messages are still processed by the worker_fn callback function passed to the process method of the journal client.

The objects passed to the worker_fn callback are the result of the kafka message converted by the value_deserializer function. By default (if this argument is not given), it will produce dicts (using the kafka_to_value function). This signature of the function is:

value_deserializer(object_type: str, kafka_msg: bytes) -> Any

If the value returned by value_deserializer is None, it is ignored and not passed the worker_fn function.

Note: The process_one_object and commit_batch methods are implemented to reflect the existing behavior of the current journal client implementations ([obj]storage-replayer, indexers…).

Note2: It’s possible to slightly adapt the existing journal client implementation so they can use the self.error_reporter object to trap known issues, report them with the error reporter and continue instead of crashing.

process_one_object(decoded_object, decoded_object_type, raw_message)[source]#

Process decoded (unserialized) object of type decoded_object_type.

Non-critical errors can be passed to self.error_reporter with the raw Kafka message ((topic, partition and offset) as argument).

Currently implemented in this method so various journal clients can implement their own trap-and-continue policy. We may decide to implement it generically in self.handle_message_batch method instead in the future.

commit_batch()[source]#

Commit the batch of objects read. This should take care of the offset commits.

process(worker_fn: Callable[[Dict[str, List[dict]]], None])[source]#

Retro-compatible method to expose the worker_fn callback to previous implementations.

This sets the worker_fn to the eponym class attribute and delegates the computation to super().process_messages, in turn calling the worker_fn in due time.

handle_messages(messages, worker_fn)[source]#

Retro-compatible method to expose the worker_fn callback to previous implementations.

This delegates the call super().handle_message_batch which will use internally the self.worker_fn.