swh.journal.writer.kafka module

class swh.journal.writer.kafka.DeliveryTag(topic: str, kafka_key: bytes)[source]

Bases: tuple

Unique tag allowing us to check for a message delivery

property topic

Alias for field number 0

property kafka_key

Alias for field number 1

class swh.journal.writer.kafka.DeliveryFailureInfo(object_type: str, key: Union[Dict[str, str], Dict[str, bytes], bytes], message: str, code: str)[source]

Bases: tuple

Verbose information for failed deliveries

property object_type

Alias for field number 0

property key

Alias for field number 1

property message

Alias for field number 2

property code

Alias for field number 3

swh.journal.writer.kafka.get_object_type(topic: str) → str[source]

Get the object type from a topic string

exception swh.journal.writer.kafka.KafkaDeliveryError(message: str, delivery_failures: Iterable[swh.journal.writer.kafka.DeliveryFailureInfo])[source]

Bases: Exception

Delivery failed on some kafka messages.

pretty_failures() → str[source]
class swh.journal.writer.kafka.KafkaJournalWriter(brokers: Iterable[str], prefix: str, client_id: str, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[cimpl.Producer] = <class 'cimpl.Producer'>, anonymize: bool = False)[source]

Bases: Generic[swh.journal.writer.kafka.TValue]

This class is used to write serialized versions of value objects to a series of Kafka topics. The type parameter TValue, which must implement the ValueProtocol, is the type of values this writer will write. Typically, TValue will be swh.model.model.BaseModel.

Topics used to send objects representations are built from a prefix plus the type of the object:

{prefix}.{object_type}

Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics:

{prefix}_privileged.{object_type}

The anonymization of a value object is the result of calling its anonymize() method. An object is considered anonymizable if this method returns a (non-None) value.

Parameters
  • brokers – list of broker addresses and ports.

  • prefix – the prefix used to build the topic names for objects.

  • client_id – the id of the writer sent to kafka.

  • value_sanitizer – a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there)

  • producer_config – extra configuration keys passed to the Producer.

  • flush_timeout – timeout, in seconds, after which the flush operation will fail if some message deliveries are still pending.

  • producer_class – override for the kafka producer class.

  • anonymize – if True, activate the anonymization feature.

send(topic: str, key: Union[Dict[str, str], Dict[str, bytes], bytes], value)[source]
delivery_error(message)swh.journal.writer.kafka.KafkaDeliveryError[source]

Get all failed deliveries, and clear them

flush()[source]
write_addition(object_type: str, object_: TValue) → None[source]

Write a single object to the journal

write_update(object_type: str, object_: TValue) → None

Write a single object to the journal

write_additions(object_type: str, objects: Iterable[TValue]) → None[source]

Write a set of objects to the journal