swh.graph.luigi.compressed_graph module#

Luigi tasks for compression#

This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-dataset’s.

It implements the task DAG described in Compression steps.

Unlike the CLI, this requires the graph to be named graph.

Filtering#

The object_types parameter (--object-types on the CLI) specifies the set of node types to read from the dataset export, and it defaults to all types: ori,snp,rel,rev,dir,cnt.

Because the dataset export is keyed by edge sources, some objects without any of these types will be present in the input dataset. For example, if exporting ori,snp,rel,rev, root Directory of every release and revision will be present, though without its labels (as well as a few Content objects pointed by some Releases).

File layout#

In addition to files documented in Graph compression (eg. graph.graph, graph.mph, …), tasks in this module produce this directory structure:

base_dir/
    <date>[_<flavor>]/
        compressed/
            graph.graph
            graph.mph
            ...
            meta/
                export.json
                compression.json

graph.meta/export.json is copied from the ORC dataset exported by swh.dataset.luigi.

graph.meta/compression.json contains information about the compression itself, for provenance tracking. For example:

[
    {
        "steps": null,
        "export_start": "2022-11-08T11:00:54.998799+00:00",
        "export_end": "2022-11-08T11:05:53.105519+00:00",
        "object_types": [
            "origin",
            "origin_visit"
        ],
        "hostname": "desktop5",
        "conf": {},
        "tool": {
            "name": "swh.graph",
            "version": "2.2.0"
        },
        "commands": [
            {
                "command": [
                    "bash",
                    "-c",
                    "java it.unimi.dsi.big.webgraph.labelling.BitStreamArcLabelledImmutableGraph --list ..."
                ],
                "cgroup": "/sys/fs/cgroup/user.slice/user-1002.slice/user@1002.service/app.slice/swh.graph@103038/bash@0",
                "cgroup_stats": {
                    "memory.events": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
                    "memory.events.local": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
                    "memory.swap.current": "0",
                    "memory.zswap.current": "0",
                    "memory.swap.events": "high 0\nmax 0\nfail 0",
                    "cpu.stat": "usage_usec 531350\nuser_usec 424286\nsystem_usec 107063\n...",
                    "memory.current": "614400",
                    "memory.stat": "anon 0\nfile 110592\nkernel 176128\nkernel_stack 0\n...",
                    "memory.numa_stat": "anon N0=0\nfile N0=110592\nkernel_stack N0=0\n...",
                    "memory.peak": "49258496"
                }
            }
        ]
    }
]

When the compression pipeline is run in separate steps, each of the steps is recorded as an object in the root list.

S3 layout#

As .bin files are meant to be accessed randomly, they are uncompressed on disk. However, this is undesirable on at-rest/long-term storage like on S3, because some are very sparse (eg. graph.property.committer_timestamp.bin can be quickly compressed from 300 to 1GB).

Therefore, these files are compressed to .bin.zst, and need to be decompressed when downloading.

The layout is otherwise the same as the file layout.

class swh.graph.luigi.compressed_graph.ObjectTypesParameter(*args, **kwargs)[source]#

Bases: Parameter

A parameter type whose value is either * or a set of comma-separated object types (eg. ori,snp,rel,rev,dir,cnt).

parse(s: str) List[str][source]#

Parse an individual value from the input.

The default implementation is the identity function, but subclasses should override this method for specialized parsing.

Parameters:

x (str) – the value to parse.

Returns:

the parsed value.

serialize(value: List[str]) str[source]#

Opposite of parse().

Converts the value x to a string.

Parameters:

x – the value to serialize.

class swh.graph.luigi.compressed_graph.ExtractNodes(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = -20#
EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

INPUT_FILES: Set[str] = {}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.nodes/'}#

List of files which this task produces, without the graph name as prefix.

USES_ALL_CPU_THREADS: bool = True#

True on tasks that use all available CPU for their entire runtime.

These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time.

class swh.graph.luigi.compressed_graph.ExtractLabels(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = -10#
EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

INPUT_FILES: Set[str] = {}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.labels.csv.zst'}#

List of files which this task produces, without the graph name as prefix.

USES_ALL_CPU_THREADS: bool = True#

True on tasks that use all available CPU for their entire runtime.

These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time.

priority = -100#

low priority, because it is not on the critical path

class swh.graph.luigi.compressed_graph.NodeStats(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 0#
INPUT_FILES: Set[str] = {'.nodes/'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.nodes.count.txt', '.nodes.stats.txt'}#

List of files which this task produces, without the graph name as prefix.

priority = 100#

high priority, to help the scheduler allocate resources

class swh.graph.luigi.compressed_graph.EdgeStats(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 3#
EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

INPUT_FILES: Set[str] = {}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.edges.count.txt', '.edges.stats.txt'}#

List of files which this task produces, without the graph name as prefix.

priority = 100#

high priority, to help the scheduler allocate resources

class swh.graph.luigi.compressed_graph.LabelStats(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 6#
INPUT_FILES: Set[str] = {'.labels.csv.zst'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.labels.count.txt'}#

List of files which this task produces, without the graph name as prefix.

USES_ALL_CPU_THREADS: bool = True#

True on tasks that use all available CPU for their entire runtime.

These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time.

priority = 100#

high priority, to help the scheduler allocate resources

class swh.graph.luigi.compressed_graph.Mph(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 10#
INPUT_FILES: Set[str] = {'.nodes.count.txt', '.nodes/'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.pthash'}#

List of files which this task produces, without the graph name as prefix.

USES_ALL_CPU_THREADS: bool = True#

True on tasks that use all available CPU for their entire runtime.

These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time.

class swh.graph.luigi.compressed_graph.Bv(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 30#
EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

INPUT_FILES: Set[str] = {'.pthash'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-base.graph'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.BvEf(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 40#
INPUT_FILES: Set[str] = {'-base.graph'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-base.ef'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.BfsRoots(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 50#
EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

INPUT_FILES: Set[str] = {}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-bfs.roots.txt'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.Bfs(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 60#
INPUT_FILES: Set[str] = {'-base.ef', '-base.graph', '-bfs.roots.txt', '.pthash'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-bfs.order'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.PermuteAndSimplifyBfs(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 70#
INPUT_FILES: Set[str] = {'-base.ef', '-base.graph', '-bfs.order'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-bfs-simplified.graph'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.BfsEf(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 80#
INPUT_FILES: Set[str] = {'-bfs-simplified.graph'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-bfs-simplified.ef'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.BfsDcf(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 90#
INPUT_FILES: Set[str] = {'-bfs-simplified.graph'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-bfs-simplified.dcf'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.Llp(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 100#
INPUT_FILES: Set[str] = {'-bfs-simplified.dcf', '-bfs-simplified.ef', '-bfs-simplified.graph'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-llp.order'}#

List of files which this task produces, without the graph name as prefix.

gammas = <luigi.parameter.Parameter object>#
class swh.graph.luigi.compressed_graph.PermuteLlp(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 120#
INPUT_FILES: Set[str] = {'-base.ef', '-base.graph', '.pthash.order'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.graph', '.properties'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.Offsets(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 130#
INPUT_FILES: Set[str] = {'.graph'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.offsets'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.Ef(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 140#
INPUT_FILES: Set[str] = {'.graph', '.offsets'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.ef'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.ComposeOrders(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 110#
INPUT_FILES: Set[str] = {'-bfs.order', '-llp.order'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.pthash.order'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.Transpose(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 160#
INPUT_FILES: Set[str] = {'.ef', '.graph'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-transposed.graph', '-transposed.properties'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.TransposeOffsets(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 170#
INPUT_FILES: Set[str] = {'-transposed.graph'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-transposed.offsets'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.TransposeEf(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 175#
INPUT_FILES: Set[str] = {'-transposed.graph', '-transposed.offsets'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-transposed.ef'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.Maps(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 180#
INPUT_FILES: Set[str] = {'.nodes/', '.pthash', '.pthash.order'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.node2swhid.bin'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.ExtractPersons(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 190#
INPUT_FILES: Set[str] = {}#

Dependencies of this step.

EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

OUTPUT_FILES: Set[str] = {'.persons.csv.zst'}#

List of files which this task produces, without the graph name as prefix.

USES_ALL_CPU_THREADS: bool = True#

True on tasks that use all available CPU for their entire runtime.

These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time.

class swh.graph.luigi.compressed_graph.PersonsStats(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 195#
INPUT_FILES: Set[str] = {'.persons.csv.zst'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.persons.count.txt'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.MphPersons(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 200#
INPUT_FILES: Set[str] = {'.persons.count.txt', '.persons.csv.zst'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.persons.pthash'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.NodeProperties(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 210#
INPUT_FILES: Set[str] = {'.node2swhid.bin', '.persons.pthash', '.pthash', '.pthash.order'}#

Dependencies of this step.

EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

OUTPUT_FILES: Set[str] = {'.property.author_id.bin', '.property.author_timestamp.bin', '.property.author_timestamp_offset.bin', '.property.committer_id.bin', '.property.committer_timestamp.bin', '.property.committer_timestamp_offset.bin', '.property.content.is_skipped.bits', '.property.content.length.bin', '.property.message.bin', '.property.message.offset.bin', '.property.tag_name.bin', '.property.tag_name.offset.bin'}#

List of files which this task produces, without the graph name as prefix.

priority = 10#

semi-high priority because it takes a very long time to run

output() List[LocalTarget][source]#

Returns a list of luigi targets matching OUTPUT_FILES.

class swh.graph.luigi.compressed_graph.PthashLabels(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 220#
INPUT_FILES: Set[str] = {'.labels.count.txt', '.labels.csv.zst'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.labels.pthash'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.LabelsOrder(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 225#
INPUT_FILES: Set[str] = {'.labels.count.txt', '.labels.csv.zst', '.labels.pthash'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.labels.pthash.order'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.FclLabels(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 230#
INPUT_FILES: Set[str] = {'.labels.count.txt', '.labels.csv.zst'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.labels.fcl.bytearray', '.labels.fcl.pointers', '.labels.fcl.properties'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.EdgeLabels(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 240#
INPUT_FILES: Set[str] = {'.labels.pthash', '.labels.pthash.order', '.pthash', '.pthash.order'}#

Dependencies of this step.

EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

OUTPUT_FILES: Set[str] = {'-labelled.labeloffsets', '-labelled.labels', '-labelled.properties'}#

List of files which this task produces, without the graph name as prefix.

priority = 10#

semi-high priority because it takes a long time to run

class swh.graph.luigi.compressed_graph.EdgeLabelsTranspose(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 250#
INPUT_FILES: Set[str] = {'.labels.pthash', '.labels.pthash.order', '.pthash', '.pthash.order'}#

Dependencies of this step.

EXPORT_AS_INPUT: bool = True#

Whether this task should depend directly on LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by requires().

OUTPUT_FILES: Set[str] = {'-transposed-labelled.labeloffsets', '-transposed-labelled.labels', '-transposed-labelled.properties'}#

List of files which this task produces, without the graph name as prefix.

priority = 10#

semi-high priority because it takes a long time to run

class swh.graph.luigi.compressed_graph.EdgeLabelsEf(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 270#
INPUT_FILES: Set[str] = {'-labelled.labeloffsets', '-labelled.labels'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-labelled.ef'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.EdgeLabelsTransposeEf(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 280#
INPUT_FILES: Set[str] = {'-transposed-labelled.labeloffsets', '-transposed-labelled.labels'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'-transposed-labelled.ef'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.Stats(*args, **kwargs)[source]#

Bases: _CompressionStepTask

STEP: CompressionStep = 290#
INPUT_FILES: Set[str] = {'-transposed.graph', '-transposed.graph.ef', '.graph', '.graph.ef'}#

Dependencies of this step.

OUTPUT_FILES: Set[str] = {'.stats'}#

List of files which this task produces, without the graph name as prefix.

class swh.graph.luigi.compressed_graph.CompressGraph(*args, **kwargs)[source]#

Bases: Task

local_export_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
local_graph_path: Path = <luigi.parameter.PathParameter object>#
batch_size = <luigi.parameter.IntParameter object>#
rust_executable_dir = <luigi.parameter.Parameter object>#
object_types: list[str] = <swh.graph.luigi.compressed_graph.ObjectTypesParameter object>#
requires() List[Task][source]#

Returns a LocalExport task, and leaves of the compression dependency graph

output() List[LocalTarget][source]#

Returns the meta/*.json targets

run()[source]#

Runs the full compression pipeline, then writes meta/compression.json

This does not support running individual steps yet.

class swh.graph.luigi.compressed_graph.UploadGraphToS3(*args, **kwargs)[source]#

Bases: Task

Uploads a local compressed graphto S3; creating automatically if it does not exist.

Example invocation:

luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3                 --local-graph-path=graph/                 --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
local_graph_path = <luigi.parameter.PathParameter object>#
s3_graph_path = <swh.dataset.luigi.S3PathParameter object>#
parallelism = <luigi.parameter.IntParameter object>#
requires() List[Task][source]#

Returns a CompressGraph task that writes local files at the expected location.

output() List[Target][source]#

Returns stamp and meta paths on S3.

run() None[source]#

Copies all files: first the graph itself, then meta/compression.json.

class swh.graph.luigi.compressed_graph.DownloadGraphFromS3(*args, **kwargs)[source]#

Bases: Task

Downloads a local dataset graph from S3.

This performs the inverse operation of UploadGraphToS3

Example invocation:

luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3                 --local-graph-path=graph/                 --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
local_graph_path: Path = <luigi.parameter.PathParameter object>#
s3_graph_path: str = <swh.dataset.luigi.S3PathParameter object>#
requires() List[Task][source]#

Returns a UploadGraphToS3 task that writes local files to S3.

output() List[Target][source]#

Returns stamp and meta paths on the local filesystem.

run() None[source]#

Copies all files: first the graph itself, then meta/compression.json.

class swh.graph.luigi.compressed_graph.LocalGraph(*args, **kwargs)[source]#

Bases: Task

Task that depends on a local dataset being present – either directly from ExportGraph or via DownloadGraphFromS3.

local_graph_path: Path = <luigi.parameter.PathParameter object>#
compression_task_type = <luigi.parameter.TaskParameter object>#
requires() List[Task][source]#

Returns an instance of either CompressGraph or DownloadGraphFromS3 depending on the value of compression_task_type.

output() List[Target][source]#

Returns stamp and meta paths on the local filesystem.