swh.journal package

Submodules

swh.journal.backfill module

Module defining journal backfiller classes.

Those backfiller goal is to produce back part or all of the objects from the storage to the journal topics

At the moment, a first naive implementation is the JournalBackfiller. It simply reads the objects from the storage and sends every object identifier back to the journal.

swh.journal.backfill.directory_converter(db, directory)[source]

Convert directory from the flat representation to swh model compatible objects.

swh.journal.backfill.revision_converter(db, revision)[source]

Convert revision from the flat representation to swh model compatible objects.

swh.journal.backfill.release_converter(db, release)[source]

Convert release from the flat representation to swh model compatible objects.

swh.journal.backfill.snapshot_converter(db, snapshot)[source]

Convert snapshot from the flat representation to swh model compatible objects.

swh.journal.backfill.origin_visit_converter(db, origin_visit)[source]
swh.journal.backfill.object_to_offset(object_id, numbits)[source]
Compute the index of the range containing object id, when dividing
space into 2^numbits.
Parameters:
  • object_id (str) – The hex representation of object_id
  • numbits (int) – Number of bits in which we divide input space
Returns:

The index of the range containing object id

swh.journal.backfill.byte_ranges(numbits, start_object=None, end_object=None)[source]
Generate start/end pairs of bytes spanning numbits bits and
constrained by optional start_object and end_object.
Parameters:
  • numbits (int) – Number of bits in which we divide input space
  • start_object (str) – Hex object id contained in the first range returned
  • end_object (str) – Hex object id contained in the last range returned
Yields:

2^numbits pairs of bytes

swh.journal.backfill.integer_ranges(start, end, block_size=1000)[source]
swh.journal.backfill.compute_query(obj_type, start, end)[source]
swh.journal.backfill.fetch(db, obj_type, start, end)[source]

Fetch all obj_type’s identifiers from db.

This opens one connection, stream objects and when done, close the connection.

Parameters:
  • db (BaseDb) – Db connection object
  • obj_type (str) – Object type
  • start (Union[bytes|Tuple]) – Range start identifier
  • end (Union[bytes|Tuple]) – Range end identifier
Raises:

ValueError if obj_type is not supported

Yields:

Objects in the given range

class swh.journal.backfill.JournalBackfiller(config=None)[source]

Bases: object

Class in charge of reading the storage’s objects and sends those back to the journal’s topics.

This is designed to be run periodically.

__init__(config=None)[source]

Initialize self. See help(type(self)) for accurate signature.

check_config(config)[source]
parse_arguments(object_type, start_object, end_object)[source]

Parse arguments

Raises:
  • ValueError for unsupported object type
  • ValueError if object ids are not parseable
Returns:

Parsed start and end object ids

run(object_type, start_object, end_object, dry_run=False)[source]

Reads storage’s subscribed object types and send them to the journal’s reading topic.

__dict__ = mappingproxy({'parse_arguments': <function JournalBackfiller.parse_arguments>, 'check_config': <function JournalBackfiller.check_config>, '__doc__': "Class in charge of reading the storage's objects and sends those\n back to the journal's topics.\n\n This is designed to be run periodically.\n\n ", '__dict__': <attribute '__dict__' of 'JournalBackfiller' objects>, 'run': <function JournalBackfiller.run>, '__module__': 'swh.journal.backfill', '__weakref__': <attribute '__weakref__' of 'JournalBackfiller' objects>, '__init__': <function JournalBackfiller.__init__>})
__module__ = 'swh.journal.backfill'
__weakref__

list of weak references to the object (if defined)

swh.journal.cli module

swh.journal.cli.get_journal_client(ctx, **kwargs)[source]
swh.journal.cli.main()[source]

swh.journal.client module

class swh.journal.client.JournalClient(brokers, group_id, prefix=None, object_types=None, max_messages=0, auto_offset_reset='earliest', **kwargs)[source]

Bases: object

A base client for the Software Heritage journal.

The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the ‘prefix’ argument is None (default value), it will take the default value ‘swh.journal.objects’.

Clients subscribe to events specific to each object type as listed in the object_types argument (if unset, defaults to all accepted objet types).

Clients can be sharded by setting the group_id to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id.

Messages are processed by the process_objects method in batches of maximum max_messages.

Any other named argument is passed directly to KafkaConsumer().

__init__(brokers, group_id, prefix=None, object_types=None, max_messages=0, auto_offset_reset='earliest', **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

process(worker_fn)[source]

Polls Kafka for a batch of messages, and calls the worker_fn with these messages.

Parameters:Callable[Dict[str, List[dict]]] (worker_fn) – Function called with the messages as argument.
__dict__ = mappingproxy({'__doc__': "A base client for the Software Heritage journal.\n\n The current implementation of the journal uses Apache Kafka\n brokers to publish messages under a given topic prefix, with each\n object type using a specific topic under that prefix. If the 'prefix'\n argument is None (default value), it will take the default value\n 'swh.journal.objects'.\n\n Clients subscribe to events specific to each object type as listed in the\n `object_types` argument (if unset, defaults to all accepted objet types).\n\n Clients can be sharded by setting the `group_id` to a common\n value across instances. The journal will share the message\n throughput across the nodes sharing the same group_id.\n\n Messages are processed by the `process_objects` method in batches\n of maximum `max_messages`.\n\n Any other named argument is passed directly to KafkaConsumer().\n\n ", '__module__': 'swh.journal.client', '__dict__': <attribute '__dict__' of 'JournalClient' objects>, 'process': <function JournalClient.process>, '__weakref__': <attribute '__weakref__' of 'JournalClient' objects>, '__init__': <function JournalClient.__init__>})
__module__ = 'swh.journal.client'
__weakref__

list of weak references to the object (if defined)

swh.journal.direct_writer module

class swh.journal.direct_writer.DirectKafkaWriter(brokers, prefix, client_id)[source]

Bases: object

This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.

__init__(brokers, prefix, client_id)[source]

Initialize self. See help(type(self)) for accurate signature.

send(topic, key, value)[source]
_get_key(object_type, object_)[source]
_sanitize_object(object_type, object_)[source]
write_addition(object_type, object_)[source]
write_update(object_type, object_)
write_additions(object_type, objects)[source]
__dict__ = mappingproxy({'write_addition': <function DirectKafkaWriter.write_addition>, '__doc__': 'This class is instantiated and used by swh-storage to write incoming\n new objects to Kafka before adding them to the storage backend\n (eg. postgresql) itself.', 'send': <function DirectKafkaWriter.send>, 'write_update': <function DirectKafkaWriter.write_addition>, '_sanitize_object': <function DirectKafkaWriter._sanitize_object>, '__module__': 'swh.journal.direct_writer', '__weakref__': <attribute '__weakref__' of 'DirectKafkaWriter' objects>, 'write_additions': <function DirectKafkaWriter.write_additions>, '_get_key': <function DirectKafkaWriter._get_key>, '__dict__': <attribute '__dict__' of 'DirectKafkaWriter' objects>, '__init__': <function DirectKafkaWriter.__init__>})
__module__ = 'swh.journal.direct_writer'
__weakref__

list of weak references to the object (if defined)

swh.journal.replay module

swh.journal.replay.process_replay_objects(all_objects, *, storage)[source]
swh.journal.replay._insert_objects(object_type, objects, storage)[source]
swh.journal.replay.copy_object(obj_id, src, dst)[source]
swh.journal.replay.process_replay_objects_content(all_objects, *, src, dst, concurrency=8)[source]

swh.journal.serializers module

swh.journal.serializers.key_to_kafka(key)[source]

Serialize a key, possibly a dict, in a predictable way

swh.journal.serializers.kafka_to_key(kafka_key)[source]

Deserialize a key

swh.journal.serializers.value_to_kafka(value)[source]

Serialize some data for storage in kafka

swh.journal.serializers.kafka_to_value(kafka_value)[source]

Deserialize some data stored in kafka

Module contents