swh.journal.writer.kafka module#
- class swh.journal.writer.kafka.DeliveryTag(topic: str, kafka_key: bytes)[source]#
Bases:
NamedTuple
Unique tag allowing us to check for a message delivery
Create new instance of DeliveryTag(topic, kafka_key)
- class swh.journal.writer.kafka.DeliveryFailureInfo(object_type: str, key: Dict[str, str] | Dict[str, bytes] | bytes, message: str, code: str)[source]#
Bases:
NamedTuple
Verbose information for failed deliveries
Create new instance of DeliveryFailureInfo(object_type, key, message, code)
- swh.journal.writer.kafka.get_object_type(topic: str) str [source]#
Get the object type from a topic string
- exception swh.journal.writer.kafka.KafkaDeliveryError(message: str, delivery_failures: Iterable[DeliveryFailureInfo])[source]#
Bases:
Exception
Delivery failed on some kafka messages.
- class swh.journal.writer.kafka.KafkaJournalWriter(brokers: ~typing.Iterable[str], prefix: str, client_id: str, value_sanitizer: ~typing.Callable[[str, ~typing.Dict[str, ~typing.Any]], ~typing.Dict[str, ~typing.Any]], producer_config: ~typing.Dict | None = None, flush_timeout: float = 120, producer_class: ~typing.Type[~cimpl.Producer] = <class 'cimpl.Producer'>, anonymize: bool = False, auto_flush: bool = True)[source]#
Bases:
object
This class is used to write serialized versions of value objects to a series of Kafka topics. The type parameter of value objects, which must implement the ValueProtocol, is the type of values this writer will write. Typically, ValueProtocol will be swh.model.model.BaseModel.
Topics used to send objects representations are built from a
prefix
plus the type of the object:{prefix}.{object_type}
Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics:
{prefix}_privileged.{object_type}
The anonymization of a value object is the result of calling its
anonymize()
method. An object is considered anonymizable if this method returns a (non-None) value.- Parameters:
brokers – list of broker addresses and ports.
prefix – the prefix used to build the topic names for objects.
client_id – the id of the writer sent to kafka.
value_sanitizer – a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there)
producer_config – extra configuration keys passed to the Producer.
flush_timeout – timeout, in seconds, after which the flush operation will fail if some message deliveries are still pending.
producer_class – override for the kafka producer class.
anonymize – if True, activate the anonymization feature.
auto_flush – if True (default), flush the kafka producer in
write_addition()
andwrite_additions()
. This should be set to False ONLY for testing purpuse. DO NOT USE ON PRODUCTION ENVIRONMENT.
- reliable_produce(topic: str, key: Dict[str, str] | Dict[str, bytes] | bytes, kafka_value: bytes | None)[source]#
- delivery_error(message) KafkaDeliveryError [source]#
Get all failed deliveries, and clear them
- write_addition(object_type: str, object_: ValueProtocol) None [source]#
Write a single object to the journal
- write_additions(object_type: str, objects: Iterable[ValueProtocol]) None [source]#
Write a set of objects to the journal
- delete(object_type: str, object_keys: Iterable[Dict[str, str] | Dict[str, bytes] | bytes]) None [source]#
Write a tombstone for the given keys.
For older data to be removed, the topic must be configured with
cleanup.policy=compact
. Please also consider setting:max.compaction.lag.ms
: delay between the appearance of a tombstone and the actual deletion of older values.delete.retention.ms
: how long must tombstones themselves be kept. This is important as they enable journal clients to learn that a given kep has been deleted and act accordingly.
Note that deletion won’t happen for keys located in the currently active log segment. It will only be possible once enough newer entries have be added, pushing older keys to “dirty” log segments that can be compacted.