swh.journal.writer.kafka module¶
-
class
swh.journal.writer.kafka.
DeliveryTag
(topic: str, kafka_key: bytes)[source]¶ Bases:
tuple
Unique tag allowing us to check for a message delivery
-
property
topic
¶ Alias for field number 0
-
property
kafka_key
¶ Alias for field number 1
-
property
-
class
swh.journal.writer.kafka.
DeliveryFailureInfo
(object_type: str, key: Union[Dict[str, str], Dict[str, bytes], bytes], message: str, code: str)[source]¶ Bases:
tuple
Verbose information for failed deliveries
-
property
object_type
¶ Alias for field number 0
-
property
key
¶ Alias for field number 1
-
property
message
¶ Alias for field number 2
-
property
code
¶ Alias for field number 3
-
property
-
swh.journal.writer.kafka.
get_object_type
(topic: str) → str[source]¶ Get the object type from a topic string
-
exception
swh.journal.writer.kafka.
KafkaDeliveryError
(message: str, delivery_failures: Iterable[swh.journal.writer.kafka.DeliveryFailureInfo])[source]¶ Bases:
Exception
Delivery failed on some kafka messages.
-
class
swh.journal.writer.kafka.
KafkaJournalWriter
(brokers: Iterable[str], prefix: str, client_id: str, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[cimpl.Producer] = <class 'cimpl.Producer'>, anonymize: bool = False)[source]¶ Bases:
Generic
[swh.journal.writer.kafka.TValue
]This class is used to write serialized versions of value objects to a series of Kafka topics. The type parameter TValue, which must implement the ValueProtocol, is the type of values this writer will write. Typically, TValue will be swh.model.model.BaseModel.
Topics used to send objects representations are built from a
prefix
plus the type of the object:{prefix}.{object_type}
Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics:
{prefix}_privileged.{object_type}
The anonymization of a value object is the result of calling its
anonymize()
method. An object is considered anonymizable if this method returns a (non-None) value.- Parameters
brokers – list of broker addresses and ports.
prefix – the prefix used to build the topic names for objects.
client_id – the id of the writer sent to kafka.
value_sanitizer – a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there)
producer_config – extra configuration keys passed to the Producer.
flush_timeout – timeout, in seconds, after which the flush operation will fail if some message deliveries are still pending.
producer_class – override for the kafka producer class.
anonymize – if True, activate the anonymization feature.
-
delivery_error
(message) → swh.journal.writer.kafka.KafkaDeliveryError[source]¶ Get all failed deliveries, and clear them
-
write_addition
(object_type: str, object_: TValue) → None[source]¶ Write a single object to the journal
-
write_update
(object_type: str, object_: TValue) → None¶ Write a single object to the journal