swh.journal package

Submodules

swh.journal.backfill module

Module defining journal backfiller classes.

Those backfiller goal is to produce back part or all of the objects from the storage to the journal topics

At the moment, a first naive implementation is the JournalBackfiller. It simply reads the objects from the storage and sends every object identifier back to the journal.

swh.journal.backfill.directory_converter(db, directory)[source]

Convert directory from the flat representation to swh model compatible objects.

swh.journal.backfill.revision_converter(db, revision)[source]

Convert revision from the flat representation to swh model compatible objects.

swh.journal.backfill.release_converter(db, release)[source]

Convert release from the flat representation to swh model compatible objects.

swh.journal.backfill.snapshot_converter(db, snapshot)[source]

Convert snapshot from the flat representation to swh model compatible objects.

swh.journal.backfill.origin_visit_converter(db, origin_visit)[source]
swh.journal.backfill.object_to_offset(object_id, numbits)[source]
Compute the index of the range containing object id, when dividing
space into 2^numbits.
Parameters:
  • object_id (str) – The hex representation of object_id
  • numbits (int) – Number of bits in which we divide input space
Returns:

The index of the range containing object id

swh.journal.backfill.byte_ranges(numbits, start_object=None, end_object=None)[source]
Generate start/end pairs of bytes spanning numbits bits and
constrained by optional start_object and end_object.
Parameters:
  • numbits (int) – Number of bits in which we divide input space
  • start_object (str) – Hex object id contained in the first range returned
  • end_object (str) – Hex object id contained in the last range returned
Yields:

2^numbits pairs of bytes

swh.journal.backfill.integer_ranges(start, end, block_size=1000)[source]
swh.journal.backfill.compute_query(obj_type, start, end)[source]
swh.journal.backfill.fetch(db, obj_type, start, end)[source]

Fetch all obj_type’s identifiers from db.

This opens one connection, stream objects and when done, close the connection.

Parameters:
  • db (BaseDb) – Db connection object
  • obj_type (str) – Object type
  • start (Union[bytes|Tuple]) – Range start identifier
  • end (Union[bytes|Tuple]) – Range end identifier
Raises:

ValueError if obj_type is not supported

Yields:

Objects in the given range

swh.journal.backfill._format_range_bound(bound)[source]
class swh.journal.backfill.JournalBackfiller(config=None)[source]

Bases: object

Class in charge of reading the storage’s objects and sends those back to the journal’s topics.

This is designed to be run periodically.

__init__(config=None)[source]

Initialize self. See help(type(self)) for accurate signature.

check_config(config)[source]
parse_arguments(object_type, start_object, end_object)[source]

Parse arguments

Raises:
  • ValueError for unsupported object type
  • ValueError if object ids are not parseable
Returns:

Parsed start and end object ids

run(object_type, start_object, end_object, dry_run=False)[source]

Reads storage’s subscribed object types and send them to the journal’s reading topic.

__dict__ = mappingproxy({'__module__': 'swh.journal.backfill', '__doc__': "Class in charge of reading the storage's objects and sends those\n back to the journal's topics.\n\n This is designed to be run periodically.\n\n ", '__init__': <function JournalBackfiller.__init__>, 'check_config': <function JournalBackfiller.check_config>, 'parse_arguments': <function JournalBackfiller.parse_arguments>, 'run': <function JournalBackfiller.run>, '__dict__': <attribute '__dict__' of 'JournalBackfiller' objects>, '__weakref__': <attribute '__weakref__' of 'JournalBackfiller' objects>})
__module__ = 'swh.journal.backfill'
__weakref__

list of weak references to the object (if defined)

swh.journal.cli module

swh.journal.cli.get_journal_client(ctx, **kwargs)[source]
swh.journal.cli.main()[source]

swh.journal.client module

class swh.journal.client.JournalClient(brokers, group_id, prefix=None, object_types=None, max_messages=0, process_timeout=0, auto_offset_reset='earliest', **kwargs)[source]

Bases: object

A base client for the Software Heritage journal.

The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the ‘prefix’ argument is None (default value), it will take the default value ‘swh.journal.objects’.

Clients subscribe to events specific to each object type as listed in the object_types argument (if unset, defaults to all accepted object types).

Clients can be sharded by setting the group_id to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id.

Messages are processed by the worker_fn callback passed to the process method, in batches of maximum max_messages.

Any other named argument is passed directly to KafkaConsumer().

__init__(brokers, group_id, prefix=None, object_types=None, max_messages=0, process_timeout=0, auto_offset_reset='earliest', **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

process(worker_fn)[source]

Polls Kafka for a batch of messages, and calls the worker_fn with these messages.

Parameters:Callable[Dict[str, List[dict]]] (worker_fn) – Function called with the messages as argument.
__dict__ = mappingproxy({'__module__': 'swh.journal.client', '__doc__': "A base client for the Software Heritage journal.\n\n The current implementation of the journal uses Apache Kafka\n brokers to publish messages under a given topic prefix, with each\n object type using a specific topic under that prefix. If the 'prefix'\n argument is None (default value), it will take the default value\n 'swh.journal.objects'.\n\n Clients subscribe to events specific to each object type as listed in the\n `object_types` argument (if unset, defaults to all accepted object types).\n\n Clients can be sharded by setting the `group_id` to a common\n value across instances. The journal will share the message\n throughput across the nodes sharing the same group_id.\n\n Messages are processed by the `worker_fn` callback passed to the\n `process` method, in batches of maximum `max_messages`.\n\n Any other named argument is passed directly to KafkaConsumer().\n\n ", '__init__': <function JournalClient.__init__>, 'process': <function JournalClient.process>, '__dict__': <attribute '__dict__' of 'JournalClient' objects>, '__weakref__': <attribute '__weakref__' of 'JournalClient' objects>})
__module__ = 'swh.journal.client'
__weakref__

list of weak references to the object (if defined)

swh.journal.direct_writer module

swh.journal.replay module

swh.journal.replay.process_replay_objects(all_objects, *, storage)[source]
swh.journal.replay._fix_revision_pypi_empty_string(rev)[source]

PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772

swh.journal.replay._fix_revision_transplant_source(rev)[source]
swh.journal.replay._check_date(date)[source]

Returns whether the date can be represented in backends with sane limits on timestamps and timezeones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6).

swh.journal.replay._check_revision_date(rev)[source]

Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339

swh.journal.replay._fix_revisions(revisions)[source]
swh.journal.replay._fix_origin_visits(visits)[source]
swh.journal.replay.fix_objects(object_type, objects)[source]

Converts a possibly old object from the journal to its current expected format.

List of conversions:

Empty author name/email in PyPI releases:

>>> from pprint import pprint
>>> date = {
...     'timestamp': {
...         'seconds': 1565096932,
...         'microseconds': 0,
...     },
...     'offset': 0,
... }
>>> pprint(fix_objects('revision', [{
...     'author': {'email': '', 'fullname': b'', 'name': ''},
...     'committer': {'email': '', 'fullname': b'', 'name': ''},
...     'date': date,
...     'committer_date': date,
... }]))
[{'author': {'email': b'', 'fullname': b'', 'name': b''},
  'committer': {'email': b'', 'fullname': b'', 'name': b''},
  'committer_date': {'offset': 0,
                     'timestamp': {'microseconds': 0, 'seconds': 1565096932}},
  'date': {'offset': 0,
           'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}]

Fix type of ‘transplant_source’ extra headers:

>>> revs = fix_objects('revision', [{
...     'author': {'email': '', 'fullname': b'', 'name': ''},
...     'committer': {'email': '', 'fullname': b'', 'name': ''},
...     'date': date,
...     'committer_date': date,
...     'metadata': {
...         'extra_headers': [
...             ['time_offset_seconds', b'-3600'],
...             ['transplant_source', '29c154a012a70f49df983625090434587622b39e']
...     ]}
... }])
>>> pprint(revs[0]['metadata']['extra_headers'])
[['time_offset_seconds', b'-3600'],
 ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']]

Filter out revisions with invalid dates:

>>> from copy import deepcopy
>>> invalid_date1 = deepcopy(date)
>>> invalid_date1['timestamp']['microseconds'] = 1000000000  # > 10^6
>>> fix_objects('revision', [{
...     'author': {'email': '', 'fullname': b'', 'name': b''},
...     'committer': {'email': '', 'fullname': b'', 'name': b''},
...     'date': invalid_date1,
...     'committer_date': date,
... }])
[]
>>> invalid_date2 = deepcopy(date)
>>> invalid_date2['timestamp']['seconds'] = 2**70  # > 10^63
>>> fix_objects('revision', [{
...     'author': {'email': '', 'fullname': b'', 'name': b''},
...     'committer': {'email': '', 'fullname': b'', 'name': b''},
...     'date': invalid_date2,
...     'committer_date': date,
... }])
[]
>>> invalid_date3 = deepcopy(date)
>>> invalid_date3['offset'] = 2**20  # > 10^15
>>> fix_objects('revision', [{
...     'author': {'email': '', 'fullname': b'', 'name': b''},
...     'committer': {'email': '', 'fullname': b'', 'name': b''},
...     'date': date,
...     'committer_date': invalid_date3,
... }])
[]

visit[‘origin’] is an URL instead of a dict:

>>> fix_objects('origin_visit', [{'origin': 'http://foo'}])
[{'origin': {'url': 'http://foo'}}]

visit[‘type’] is missing , but origin[‘visit’][‘type’] exists:

>>> pprint(fix_objects(
...     'origin_visit',
...     [{'origin': {'type': 'hg', 'url': 'http://foo'}}]))
[{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}]
swh.journal.replay._insert_objects(object_type, objects, storage)[source]
swh.journal.replay.is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=20)[source]

Checks if the given hash is in the provided array. The array must be a sorted list of sha1 hashes, and contain nb_hashes hashes (so its size must by nb_hashes*hash_size bytes).

Parameters:
  • hash (bytes) – the hash to look for
  • array (bytes) – a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :py:cls:`mmap.mmap`)
  • nb_hashes (int) – number of hashes in the array
  • hash_size (int) – size of a hash (defaults to 20, for SHA1)

Example:

>>> import os
>>> hash1 = os.urandom(20)
>>> hash2 = os.urandom(20)
>>> hash3 = os.urandom(20)
>>> array = b''.join(sorted([hash1, hash2]))
>>> is_hash_in_bytearray(hash1, array, 2)
True
>>> is_hash_in_bytearray(hash2, array, 2)
True
>>> is_hash_in_bytearray(hash3, array, 2)
False
swh.journal.replay.retry(max_retries)[source]
swh.journal.replay.copy_object(obj_id, src, dst, max_retries=3)[source]
swh.journal.replay.process_replay_objects_content(all_objects, *, src, dst, exclude_fn=None)[source]

Takes a list of records from Kafka (see swh.journal.client.JournalClient.process()) and copies them from the src objstorage to the dst objstorage, if:

  • obj[‘status’] is ‘visible’
  • exclude_fn(obj) is False (if exclude_fn is provided)
Parameters:
  • Dict[str, List[dict]] (all_objects) – Objects passed by the Kafka client. Most importantly, all_objects[‘content’][*][‘sha1’] is the sha1 hash of each content
  • src – An object storage (see swh.objstorage.get_objstorage())
  • dst – An object storage (see swh.objstorage.get_objstorage())
  • Optional[Callable[dict, bool]] (exclude_fn) – Determines whether an object should be copied.

Example:

>>> from swh.objstorage import get_objstorage
>>> src = get_objstorage('memory', {})
>>> dst = get_objstorage('memory', {})
>>> id1 = src.add(b'foo bar')
>>> id2 = src.add(b'baz qux')
>>> kafka_partitions = {
...     'content': [
...         {
...             'sha1': id1,
...             'status': 'visible',
...         },
...         {
...             'sha1': id2,
...             'status': 'visible',
...         },
...     ]
... }
>>> process_replay_objects_content(
...     kafka_partitions, src=src, dst=dst,
...     exclude_fn=lambda obj: obj['sha1'] == id1)
>>> id1 in dst
False
>>> id2 in dst
True

swh.journal.serializers module

swh.journal.serializers.key_to_kafka(key)[source]

Serialize a key, possibly a dict, in a predictable way

swh.journal.serializers.kafka_to_key(kafka_key)[source]

Deserialize a key

swh.journal.serializers.value_to_kafka(value)[source]

Serialize some data for storage in kafka

swh.journal.serializers.kafka_to_value(kafka_value)[source]

Deserialize some data stored in kafka

Module contents