swh.dataset.journalprocessor module#

class swh.dataset.journalprocessor.JournalClientOffsetRanges(*args, offset_ranges: Mapping[int, Tuple[int, int]], assignment: Sequence[int], progress_queue: Queue, refresh_every: int = 200, **kwargs)[source]#

Bases: JournalClient

A subclass of JournalClient reading only inside some specific offset range. Partition assignments have to be manually given to the class.

This client can only read a single topic at a time.

Parameters:
  • offset_ranges – A mapping of partition_id -> (low, high) offsets that define the boundaries of the messages to consume.

  • assignment – The list of partitions to assign to this client.

  • progress_queue – a multiprocessing.Queue where the current progress will be reported.

  • refresh_every – the refreshing rate of the progress reporting.

subscribe()[source]#

Subscribe to topics listed in self.subscription

This can be overridden if you need, for instance, to manually assign partitions.

unsubscribe(partitions: Container[int])[source]#
process(worker_fn)[source]#

Polls Kafka for a batch of messages, and calls the worker_fn with these messages.

Parameters:

worker_fn – Function called with the messages as argument.

handle_offset(message)[source]#

Check whether the client has reached the end of the current partition, and trigger a reassignment if that is the case.

deserialize_message(message, object_type=None)[source]#

Override of the message deserialization to hook the handling of the message offset. We also return the raw objects instead of deserializing them because we will need the partition ID later.

handle_messages(messages, worker_fn)[source]#

Override of the handle_messages() method to get a chance to commit messages.

Make sure messages properly handled by worker_fn (executed in super()) do get committed in kafka even if their originating partition has been desubscribed from.

This helps having a consistent view of the consumption of each partition at the end of the export process (EOF).

class swh.dataset.journalprocessor.ParallelJournalProcessor(config, masked_swhids: Set[ExtendedSWHID], exporter_factories: Sequence[Callable[[], Exporter]], export_id: str, obj_type: str, node_sets_path: Path, processes: int = 1, offset_margin: float | None = None)[source]#

Bases: object

Reads the given object type from the journal in parallel. It creates one JournalExportWorker per process.

Parameters:
  • config – the exporter config, which should also include the JournalClient configuration.

  • exporter_factories – a list of functions returning Exporter instances to process the objects

  • export_id – a unique identifier for the export that will be used as part of a Kafka consumer group ID.

  • obj_type – The type of SWH object to export.

  • node_sets_path – A directory where to store the node sets.

  • processes – The number of processes to run.

get_offsets() Dict[int, Tuple[int, int]][source]#

Compute (lo, high) offset boundaries for all partitions.

First pass to fetch all the current low and high watermark offsets of each partition to define the consumption boundaries.

If available, use committed offsets as lo offset boundaries.

run() None[source]#

Run the parallel export.

progress_worker(queue=None) None[source]#

An additional worker process that reports the current progress of the export between all the different parallel consumers and across all the partitions, by consuming the shared progress reporting Queue.

export_worker(assignment, progress_queue) None[source]#
class swh.dataset.journalprocessor.JournalProcessorWorker(config, masked_swhids: Set[ExtendedSWHID], exporter_factories: Sequence[Callable[[], Exporter]], group_id: str, obj_type: str, offsets: Dict[int, Tuple[int, int]], assignment: Sequence[int], progress_queue: Queue, node_sets_path: Path)[source]#

Bases: object

Worker process that processes all the messages and calls the given exporters for each object read from the journal.

get_node_set_for_object(partition_id: int, object_id: bytes)[source]#

Return an on-disk set object, which stores the nodes that have already been processed.

Node sets are sharded by partition ID (as each object is guaranteed to be assigned to a deterministic Kafka partition) then by object ID suffix. The sharding path of each file looks like:

.node_sets/{origin..content}/part-{0..256}/nodes-{0..f}.db

run() None[source]#

Start a Journal client on the given assignment and process all the incoming messages.

process_messages(messages: Dict[str, List]) None[source]#

Process the incoming Kafka messages.

process_message(object_type: str, partition: int, obj_key, obj) None[source]#

Process a single incoming Kafka message if the object it refers to has not been processed yet.

It uses an on-disk set to make sure that each object is only ever processed once.