swh.graph.luigi.compressed_graph module#
Luigi tasks for compression#
This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-dataset’s.
It implements the task DAG described in Compression steps.
Unlike the CLI, this requires the graph to be named graph.
Filtering#
The object_types
parameter (--object-types
on the CLI) specifies
the set of node types to read from the dataset export, and it defaults to
all types: ori,snp,rel,rev,dir,cnt
.
Because the dataset export is keyed by edge sources, some objects
without any of these types will be present in the input dataset. For example,
if exporting ori,snp,rel,rev
, root Directory of every release and revision
will be present, though without its labels (as well as a few Content objects
pointed by some Releases).
File layout#
In addition to files documented in Graph compression (eg. graph.graph
,
graph.mph
, …), tasks in this module produce this directory structure:
base_dir/
<date>[_<flavor>]/
compressed/
graph.graph
graph.mph
...
meta/
export.json
compression.json
graph.meta/export.json
is copied from the ORC dataset exported by
swh.dataset.luigi
.
graph.meta/compression.json
contains information about the compression itself,
for provenance tracking.
For example:
[
{
"steps": null,
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"object_types": [
"origin",
"origin_visit"
],
"hostname": "desktop5",
"conf": {},
"tool": {
"name": "swh.graph",
"version": "2.2.0"
},
"commands": [
{
"command": [
"bash",
"-c",
"java it.unimi.dsi.big.webgraph.labelling.BitStreamArcLabelledImmutableGraph --list ..."
],
"cgroup": "/sys/fs/cgroup/user.slice/user-1002.slice/user@1002.service/app.slice/swh.graph@103038/bash@0",
"cgroup_stats": {
"memory.events": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
"memory.events.local": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
"memory.swap.current": "0",
"memory.zswap.current": "0",
"memory.swap.events": "high 0\nmax 0\nfail 0",
"cpu.stat": "usage_usec 531350\nuser_usec 424286\nsystem_usec 107063\n...",
"memory.current": "614400",
"memory.stat": "anon 0\nfile 110592\nkernel 176128\nkernel_stack 0\n...",
"memory.numa_stat": "anon N0=0\nfile N0=110592\nkernel_stack N0=0\n...",
"memory.peak": "49258496"
}
}
]
}
]
When the compression pipeline is run in separate steps, each of the steps is recorded as an object in the root list.
S3 layout#
As .bin
files are meant to be accessed randomly, they are uncompressed on disk.
However, this is undesirable on at-rest/long-term storage like on S3, because
some are very sparse (eg. graph.property.committer_timestamp.bin
can be
quickly compressed from 300 to 1GB).
Therefore, these files are compressed to .bin.zst
, and need to be decompressed
when downloading.
The layout is otherwise the same as the file layout.
- class swh.graph.luigi.compressed_graph.ObjectTypesParameter(*args, **kwargs)[source]#
Bases:
Parameter
A parameter type whose value is either
*
or a set of comma-separated object types (eg.ori,snp,rel,rev,dir,cnt
).
- class swh.graph.luigi.compressed_graph.ExtractNodes(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = -20#
- EXPORT_AS_INPUT: bool = True#
Whether this task should depend directly on
LocalExport
. If not, it is assumed it depends transitiviely via one of the tasks returned byrequires()
.
- class swh.graph.luigi.compressed_graph.ExtractLabels(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = -10#
- EXPORT_AS_INPUT: bool = True#
Whether this task should depend directly on
LocalExport
. If not, it is assumed it depends transitiviely via one of the tasks returned byrequires()
.
- OUTPUT_FILES: Set[str] = {'.labels.csv.zst'}#
List of files which this task produces, without the graph name as prefix.
- USES_ALL_CPU_THREADS: bool = True#
True
on tasks that use all available CPU for their entire runtime.These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time.
- priority = -100#
low priority, because it is not on the critical path
- class swh.graph.luigi.compressed_graph.NodeStats(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 0#
- OUTPUT_FILES: Set[str] = {'.nodes.count.txt', '.nodes.stats.txt'}#
List of files which this task produces, without the graph name as prefix.
- priority = 100#
high priority, to help the scheduler allocate resources
- class swh.graph.luigi.compressed_graph.EdgeStats(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 3#
- EXPORT_AS_INPUT: bool = True#
Whether this task should depend directly on
LocalExport
. If not, it is assumed it depends transitiviely via one of the tasks returned byrequires()
.
- OUTPUT_FILES: Set[str] = {'.edges.count.txt', '.edges.stats.txt'}#
List of files which this task produces, without the graph name as prefix.
- priority = 100#
high priority, to help the scheduler allocate resources
- class swh.graph.luigi.compressed_graph.LabelStats(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 6#
- OUTPUT_FILES: Set[str] = {'.labels.count.txt'}#
List of files which this task produces, without the graph name as prefix.
- USES_ALL_CPU_THREADS: bool = True#
True
on tasks that use all available CPU for their entire runtime.These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time.
- priority = 100#
high priority, to help the scheduler allocate resources
- class swh.graph.luigi.compressed_graph.Mph(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 10#
- class swh.graph.luigi.compressed_graph.Bv(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 30#
- class swh.graph.luigi.compressed_graph.BvEf(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 40#
- class swh.graph.luigi.compressed_graph.BfsRoots(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 50#
- class swh.graph.luigi.compressed_graph.Bfs(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 60#
- class swh.graph.luigi.compressed_graph.PermuteAndSimplifyBfs(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 70#
- class swh.graph.luigi.compressed_graph.BfsEf(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 80#
- class swh.graph.luigi.compressed_graph.BfsDcf(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 90#
- class swh.graph.luigi.compressed_graph.Llp(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 100#
- INPUT_FILES: Set[str] = {'-bfs-simplified.dcf', '-bfs-simplified.ef', '-bfs-simplified.graph'}#
Dependencies of this step.
- OUTPUT_FILES: Set[str] = {'-llp.order'}#
List of files which this task produces, without the graph name as prefix.
- gammas = <luigi.parameter.Parameter object>#
- class swh.graph.luigi.compressed_graph.PermuteLlp(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 120#
- class swh.graph.luigi.compressed_graph.Offsets(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 130#
- class swh.graph.luigi.compressed_graph.Ef(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 140#
- class swh.graph.luigi.compressed_graph.ComposeOrders(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 110#
- class swh.graph.luigi.compressed_graph.Transpose(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 160#
- class swh.graph.luigi.compressed_graph.TransposeOffsets(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 170#
- class swh.graph.luigi.compressed_graph.TransposeEf(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 175#
- class swh.graph.luigi.compressed_graph.Maps(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 180#
- class swh.graph.luigi.compressed_graph.ExtractPersons(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 190#
- EXPORT_AS_INPUT: bool = True#
Whether this task should depend directly on
LocalExport
. If not, it is assumed it depends transitiviely via one of the tasks returned byrequires()
.
- class swh.graph.luigi.compressed_graph.PersonsStats(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 195#
- class swh.graph.luigi.compressed_graph.MphPersons(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 200#
- class swh.graph.luigi.compressed_graph.NodeProperties(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 210#
- INPUT_FILES: Set[str] = {'.node2swhid.bin', '.persons.pthash', '.pthash', '.pthash.order'}#
Dependencies of this step.
- EXPORT_AS_INPUT: bool = True#
Whether this task should depend directly on
LocalExport
. If not, it is assumed it depends transitiviely via one of the tasks returned byrequires()
.
- OUTPUT_FILES: Set[str] = {'.property.author_id.bin', '.property.author_timestamp.bin', '.property.author_timestamp_offset.bin', '.property.committer_id.bin', '.property.committer_timestamp.bin', '.property.committer_timestamp_offset.bin', '.property.content.is_skipped.bits', '.property.content.length.bin', '.property.message.bin', '.property.message.offset.bin', '.property.tag_name.bin', '.property.tag_name.offset.bin'}#
List of files which this task produces, without the graph name as prefix.
- priority = 10#
semi-high priority because it takes a very long time to run
- output() List[LocalTarget] [source]#
Returns a list of luigi targets matching
OUTPUT_FILES
.
- class swh.graph.luigi.compressed_graph.PthashLabels(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 220#
- class swh.graph.luigi.compressed_graph.LabelsOrder(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 225#
- class swh.graph.luigi.compressed_graph.FclLabels(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 230#
- class swh.graph.luigi.compressed_graph.EdgeLabels(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 240#
- INPUT_FILES: Set[str] = {'.labels.pthash', '.labels.pthash.order', '.pthash', '.pthash.order'}#
Dependencies of this step.
- EXPORT_AS_INPUT: bool = True#
Whether this task should depend directly on
LocalExport
. If not, it is assumed it depends transitiviely via one of the tasks returned byrequires()
.
- OUTPUT_FILES: Set[str] = {'-labelled.labeloffsets', '-labelled.labels', '-labelled.properties'}#
List of files which this task produces, without the graph name as prefix.
- priority = 10#
semi-high priority because it takes a long time to run
- class swh.graph.luigi.compressed_graph.EdgeLabelsTranspose(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 250#
- INPUT_FILES: Set[str] = {'.labels.pthash', '.labels.pthash.order', '.pthash', '.pthash.order'}#
Dependencies of this step.
- EXPORT_AS_INPUT: bool = True#
Whether this task should depend directly on
LocalExport
. If not, it is assumed it depends transitiviely via one of the tasks returned byrequires()
.
- OUTPUT_FILES: Set[str] = {'-transposed-labelled.labeloffsets', '-transposed-labelled.labels', '-transposed-labelled.properties'}#
List of files which this task produces, without the graph name as prefix.
- priority = 10#
semi-high priority because it takes a long time to run
- class swh.graph.luigi.compressed_graph.EdgeLabelsEf(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 270#
- class swh.graph.luigi.compressed_graph.EdgeLabelsTransposeEf(*args, **kwargs)[source]#
Bases:
_CompressionStepTask
- STEP: CompressionStep = 280#
- class swh.graph.luigi.compressed_graph.CompressGraph(*args, **kwargs)[source]#
Bases:
Task
- local_export_path = <luigi.parameter.PathParameter object>#
- graph_name = <luigi.parameter.Parameter object>#
- local_graph_path = <luigi.parameter.PathParameter object>#
- batch_size = <luigi.parameter.IntParameter object>#
- rust_executable_dir = <luigi.parameter.Parameter object>#
- object_types = <swh.graph.luigi.compressed_graph.ObjectTypesParameter object>#
- class swh.graph.luigi.compressed_graph.UploadGraphToS3(*args, **kwargs)[source]#
Bases:
Task
Uploads a local compressed graphto S3; creating automatically if it does not exist.
Example invocation:
luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 --local-graph-path=graph/ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
- local_graph_path = <luigi.parameter.PathParameter object>#
- s3_graph_path = <swh.dataset.luigi.S3PathParameter object>#
- parallelism = <luigi.parameter.IntParameter object>#
- requires() List[Task] [source]#
Returns a
CompressGraph
task that writes local files at the expected location.
- class swh.graph.luigi.compressed_graph.DownloadGraphFromS3(*args, **kwargs)[source]#
Bases:
Task
Downloads a local dataset graph from S3.
This performs the inverse operation of
UploadGraphToS3
Example invocation:
luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 --local-graph-path=graph/ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
- local_graph_path = <luigi.parameter.PathParameter object>#
- s3_graph_path = <swh.dataset.luigi.S3PathParameter object>#
- requires() List[Task] [source]#
Returns a
UploadGraphToS3
task that writes local files to S3.
- class swh.graph.luigi.compressed_graph.LocalGraph(*args, **kwargs)[source]#
Bases:
Task
Task that depends on a local dataset being present – either directly from
ExportGraph
or viaDownloadGraphFromS3
.- local_graph_path = <luigi.parameter.PathParameter object>#
- compression_task_type = <luigi.parameter.TaskParameter object>#
- requires() List[Task] [source]#
Returns an instance of either
CompressGraph
orDownloadGraphFromS3
depending on the value ofcompression_task_type
.