swh.counters.journal_client module#

swh.counters.journal_client.process_journal_messages(messages: Dict[str, Dict[bytes, bytes]], counters: CountersInterface) None[source]#

Count the number of different values of an object’s property. It allow for example to count the persons inside the Release (authors) and Revision (authors and committers) classes

swh.counters.journal_client.process_revisions(revisions: Dict[bytes, bytes], counters: CountersInterface)[source]#

Count number of different authors/committers on revisions (in person collection)

swh.counters.journal_client.process_releases(releases: Dict[bytes, bytes], counters: CountersInterface)[source]#

Count number of different authors on the releases (in person collection)

class swh.counters.journal_client.CountersJournalClient(*args, **kwargs)[source]#

Bases: JournalClientBase

Journal Client implementation which only decodes the message keys. This does not need to bother with the message deserialization (contrary to swh.journal.client.JournalClientBase).

deserialize_message(message, object_type=None)[source]#
process_one_object(decoded_object, decoded_object_type, raw_message)[source]#

Process decoded (unserialized) object of type decoded_object_type.

Non-critical errors can be passed to self.error_reporter with the raw Kafka message ((topic, partition and offset) as argument).

Currently implemented in this method so various journal clients can implement their own trap-and-continue policy. We may decide to implement it generically in self.handle_message_batch method instead in the future.

commit_batch()[source]#

Commit the batch of objects read. This should take care of the offset commits.