swh.journal.writer.kafka module

class swh.journal.writer.kafka.DeliveryTag(topic: str, kafka_key: bytes)[source]

Bases: tuple

Unique tag allowing us to check for a message delivery

property topic

Alias for field number 0

property kafka_key

Alias for field number 1

class swh.journal.writer.kafka.DeliveryFailureInfo(object_type: str, key: Union[Dict[str, str], Dict[str, bytes], bytes], message: str, code: str)[source]

Bases: tuple

Verbose information for failed deliveries

property object_type

Alias for field number 0

property key

Alias for field number 1

property message

Alias for field number 2

property code

Alias for field number 3

swh.journal.writer.kafka.get_object_type(topic: str) → str[source]

Get the object type from a topic string

exception swh.journal.writer.kafka.KafkaDeliveryError(message: str, delivery_failures: Iterable[swh.journal.writer.kafka.DeliveryFailureInfo])[source]

Bases: Exception

Delivery failed on some kafka messages.

pretty_failures() → str[source]
class swh.journal.writer.kafka.KafkaJournalWriter(brokers: Iterable[str], prefix: str, client_id: str, producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[cimpl.Producer] = <class 'cimpl.Producer'>, anonymize: bool = False)[source]

Bases: object

This class is used to write serialized versions of swh.model.model objects to a series of Kafka topics.

Topics used to send objects representations are built from a prefix plus the type of the object:

{prefix}.{object_type}

Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of model objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics:

{prefix}_privileged.{object_type}

The anonymization of a swh.model object is the result of calling its BaseModel.anonymize() method. An object is considered anonymizable if this method returns a (non-None) value.

Parameters
  • brokers – list of broker addresses and ports.

  • prefix – the prefix used to build the topic names for objects.

  • client_id – the id of the writer sent to kafka.

  • producer_config – extra configuration keys passed to the Producer.

  • flush_timeout – timeout, in seconds, after which the flush operation will fail if some message deliveries are still pending.

  • producer_class – override for the kafka producer class.

  • anonymize – if True, activate the anonymization feature.

send(topic: str, key: Union[Dict[str, str], Dict[str, bytes], bytes], value)[source]
delivery_error(message)swh.journal.writer.kafka.KafkaDeliveryError[source]

Get all failed deliveries, and clear them

flush()[source]
write_addition(object_type: str, object_: Union[swh.model.model.Content, swh.model.model.Directory, swh.model.model.Origin, swh.model.model.OriginVisit, swh.model.model.Release, swh.model.model.Revision, swh.model.model.SkippedContent, swh.model.model.Snapshot]) → None[source]

Write a single object to the journal

write_update(object_type: str, object_: Union[swh.model.model.Content, swh.model.model.Directory, swh.model.model.Origin, swh.model.model.OriginVisit, swh.model.model.Release, swh.model.model.Revision, swh.model.model.SkippedContent, swh.model.model.Snapshot]) → None

Write a single object to the journal

write_additions(object_type: str, objects: Iterable[Union[swh.model.model.Content, swh.model.model.Directory, swh.model.model.Origin, swh.model.model.OriginVisit, swh.model.model.Release, swh.model.model.Revision, swh.model.model.SkippedContent, swh.model.model.Snapshot]]) → None[source]

Write a set of objects to the journal