swh.journal.writer.kafka module#

class swh.journal.writer.kafka.DeliveryTag(topic: str, kafka_key: bytes)[source]#

Bases: NamedTuple

Unique tag allowing us to check for a message delivery

Create new instance of DeliveryTag(topic, kafka_key)

topic: str#

Alias for field number 0

kafka_key: bytes#

Alias for field number 1

class swh.journal.writer.kafka.DeliveryFailureInfo(object_type: str, key: Dict[str, str] | Dict[str, bytes] | bytes, message: str, code: str)[source]#

Bases: NamedTuple

Verbose information for failed deliveries

Create new instance of DeliveryFailureInfo(object_type, key, message, code)

object_type: str#

Alias for field number 0

key: Dict[str, str] | Dict[str, bytes] | bytes#

Alias for field number 1

message: str#

Alias for field number 2

code: str#

Alias for field number 3

swh.journal.writer.kafka.get_object_type(topic: str) str[source]#

Get the object type from a topic string

exception swh.journal.writer.kafka.KafkaDeliveryError(message: str, delivery_failures: Iterable[DeliveryFailureInfo])[source]#

Bases: Exception

Delivery failed on some kafka messages.

pretty_failures() str[source]#
class swh.journal.writer.kafka.KafkaJournalWriter(brokers: ~typing.Iterable[str], prefix: str, client_id: str, value_sanitizer: ~typing.Callable[[str, ~typing.Dict[str, ~typing.Any]], ~typing.Dict[str, ~typing.Any]], producer_config: ~typing.Dict | None = None, flush_timeout: float = 120, producer_class: ~typing.Type[~cimpl.Producer] = <class 'cimpl.Producer'>, anonymize: bool = False, auto_flush: bool = True)[source]#

Bases: object

This class is used to write serialized versions of value objects to a series of Kafka topics. The type parameter of value objects, which must implement the ValueProtocol, is the type of values this writer will write. Typically, ValueProtocol will be swh.model.model.BaseModel.

Topics used to send objects representations are built from a prefix plus the type of the object:

{prefix}.{object_type}

Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics:

{prefix}_privileged.{object_type}

The anonymization of a value object is the result of calling its anonymize() method. An object is considered anonymizable if this method returns a (non-None) value.

Parameters:
  • brokers – list of broker addresses and ports.

  • prefix – the prefix used to build the topic names for objects.

  • client_id – the id of the writer sent to kafka.

  • value_sanitizer – a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there)

  • producer_config – extra configuration keys passed to the Producer.

  • flush_timeout – timeout, in seconds, after which the flush operation will fail if some message deliveries are still pending.

  • producer_class – override for the kafka producer class.

  • anonymize – if True, activate the anonymization feature.

  • auto_flush – if True (default), flush the kafka producer in write_addition() and write_additions(). This should be set to False ONLY for testing purpuse. DO NOT USE ON PRODUCTION ENVIRONMENT.

reliable_produce(topic: str, key: Dict[str, str] | Dict[str, bytes] | bytes, kafka_value: bytes | None)[source]#
send(topic: str, key: Dict[str, str] | Dict[str, bytes] | bytes, value)[source]#
delivery_error(message) KafkaDeliveryError[source]#

Get all failed deliveries, and clear them

flush() None[source]#
write_addition(object_type: str, object_: ValueProtocol) None[source]#

Write a single object to the journal

write_additions(object_type: str, objects: Iterable[ValueProtocol]) None[source]#

Write a set of objects to the journal

delete(object_type: str, object_keys: Iterable[Dict[str, str] | Dict[str, bytes] | bytes]) None[source]#

Write a tombstone for the given keys.

For older data to be removed, the topic must be configured with cleanup.policy=compact. Please also consider setting:

  • max.compaction.lag.ms: delay between the appearance of a tombstone and the actual deletion of older values.

  • delete.retention.ms: how long must tombstones themselves be kept. This is important as they enable journal clients to learn that a given kep has been deleted and act accordingly.

Note that deletion won’t happen for keys located in the currently active log segment. It will only be possible once enough newer entries have be added, pushing older keys to “dirty” log segments that can be compacted.