swh.dataset.luigi module#

Luigi tasks#

This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-graph’s.

File layout#

Tasks in this module work on “export directories”, which have this layout:

swh_<date>[_<flavor>]/
    edges/
        origin/
        snapshot/
        ...
        stamps/
            origin
            snapshot
            ...
    orc/
        origin/
        snapshot/
        ...
        stamps/
            origin
            snapshot
            ...
    meta/
        export.json

stamps files are written after corresponding directories are written. Their presence indicates the corresponding directory was fully generated/copied. This allows skipping work that was already done, while ignoring interrupted jobs. They are omitted after the initial export (ie. when downloading to/from other machines).

meta/export.json contains information about the dataset, for provenance tracking. For example:

{
    "flavor": "full",
    "export_start": "2022-11-08T11:00:54.998799+00:00",
    "export_end": "2022-11-08T11:05:53.105519+00:00",
    "brokers": [
        "broker1.journal.staging.swh.network:9093"
    ],
    "prefix": "swh.journal.objects",
    "formats": [
        "edges",
        "orc"
    ],
    "object_types": [
        "revision",
        "release",
        "snapshot",
        "origin_visit_status",
        "origin_visit",
        "origin"
    ],
    "privileged": false,
    "hostname": "desktop5",
    "tool": {
        "name": "swh.dataset",
        "version": "0.3.2"
    }
}

object_types contains a list of “main tables” exported; this excludes relational tables like directory_entry.

Running all on staging#

An easy way to run it (eg. on the staging database), is to have these config files:

And run this command, for example:

luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunExportAll             --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/             --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/             --athena-db-name=vlorentz_20221109_staging

Note that this arbitrarily divides config options between luigi.cfg and the CLI for readability; but they can be used interchangeably

class swh.dataset.luigi.ObjectType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: Enum

origin = 1#
origin_visit = 2#
origin_visit_status = 3#
snapshot = 4#
release = 5#
revision = 6#
directory = 7#
content = 8#
skipped_content = 9#
class swh.dataset.luigi.Format(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: Enum

edges = 1#
orc = 2#
swh.dataset.luigi.merge_lists(lists: Iterator[List[T]]) List[T][source]#

Returns a list made of all items of the arguments, with no duplicate.

class swh.dataset.luigi.PathParameter(is_dir: bool = False, is_file: bool = False, exists: bool = False, create: bool = False, **kwargs)[source]#

Bases: PathParameter

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

Parameters:
  • is_dir – whether the path should be to a directory

  • is_file – whether the path should be to a directory

  • exists – whether the path should already exist

  • create – whether the path should be created if it does not exist

is_dir and is_file are mutually exclusive. exists and create are mutually exclusive.

parse(s: str) Path[source]#

Parse an individual value from the input.

The default implementation is the identity function, but subclasses should override this method for specialized parsing.

Parameters:

x (str) – the value to parse.

Returns:

the parsed value.

class swh.dataset.luigi.S3PathParameter(*args, **kwargs)[source]#

Bases: Parameter

A parameter that strip trailing slashes

normalize(s)[source]#

Given a parsed parameter value, normalizes it.

The value can either be the result of parse(), the default value or arguments passed into the task’s constructor by instantiation.

This is very implementation defined, but can be used to validate/clamp valid values. For example, if you wanted to only accept even integers, and “correct” odd values to the nearest integer, you can implement normalize as x // 2 * 2.

class swh.dataset.luigi.FractionalFloatParameter(*args, **kwargs)[source]#

Bases: FloatParameter

A float parameter that must be between 0 and 1

parse(s)[source]#

Parses a float from the string using float().

swh.dataset.luigi.stamps_paths(formats: List[Format], object_types: List[ObjectType]) List[str][source]#

Returns a list of (local FS or S3) paths used to mark tables as successfully exported.

class swh.dataset.luigi.ExportGraph(*args, **kwargs)[source]#

Bases: Task

Exports the entire graph to the local filesystem.

Example invocation:

luigi --local-scheduler --module swh.dataset.luigi ExportGraph                 --config=graph.prod.yml                 --local-export-path=export/                 --formats=edges

which is equivalent to this CLI call:

swh dataset –config-file graph.prod.yml graph export export/ –formats=edges

config_file = <swh.dataset.luigi.PathParameter object>#
local_export_path = <swh.dataset.luigi.PathParameter object>#
export_id = <luigi.parameter.OptionalParameter object>#
formats = <luigi.parameter.EnumListParameter object>#
processes = <luigi.parameter.IntParameter object>#
margin = <swh.dataset.luigi.FractionalFloatParameter object>#
object_types = <luigi.parameter.EnumListParameter object>#
output() List[Target][source]#

Returns path of meta/export.json on the local FS.

complete() bool[source]#

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

run() None[source]#

Runs the full export, then writes stamps, then meta.json.

class swh.dataset.luigi.UploadExportToS3(*args, **kwargs)[source]#

Bases: Task

Uploads a local dataset export to S3; creating automatically if it does not exist.

Example invocation:

luigi --local-scheduler --module swh.dataset.luigi UploadExportToS3                 --local-export-path=export/                 --formats=edges                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
local_export_path = <swh.dataset.luigi.PathParameter object>#
formats = <luigi.parameter.EnumListParameter object>#
object_types = <luigi.parameter.EnumListParameter object>#
s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
requires() List[Task][source]#

Returns a ExportGraph task that writes local files at the expected location.

output() List[Target][source]#

Returns stamp and meta paths on S3.

complete() bool[source]#

Returns whether the graph dataset was exported with a superset of object_types

run() None[source]#

Copies all files: first the export itself, then meta.json.

class swh.dataset.luigi.DownloadExportFromS3(*args, **kwargs)[source]#

Bases: Task

Downloads a local dataset export from S3.

This performs the inverse operation of UploadExportToS3

Example invocation:

luigi --local-scheduler --module swh.dataset.luigi DownloadExportFromS3                 --local-export-path=export/                 --formats=edges                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
local_export_path = <swh.dataset.luigi.PathParameter object>#
formats = <luigi.parameter.EnumListParameter object>#
object_types = <luigi.parameter.EnumListParameter object>#
s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
parallelism = <luigi.parameter.IntParameter object>#
requires() List[Task][source]#

Returns a ExportGraph task that writes local files at the expected location.

output() List[Target][source]#

Returns stamp and meta paths on the local filesystem.

complete() bool[source]#

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

run() None[source]#

Copies all files: first the export itself, then meta.json.

class swh.dataset.luigi.LocalExport(*args, **kwargs)[source]#

Bases: Task

Task that depends on a local dataset being present – either directly from ExportGraph or via DownloadExportFromS3.

local_export_path = <swh.dataset.luigi.PathParameter object>#
formats = <luigi.parameter.EnumListParameter object>#
object_types = <luigi.parameter.EnumListParameter object>#
export_task_type = <luigi.parameter.TaskParameter object>#
requires() List[Task][source]#

Returns an instance of either ExportGraph or DownloadExportFromS3 depending on the value of export_task_type.

output() List[Target][source]#

Returns stamp and meta paths on the local filesystem.

complete() bool[source]#

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

class swh.dataset.luigi.AthenaDatabaseTarget(name: str, table_names: Set[str])[source]#

Bases: Target

Target for the existence of a database on Athena.

exists() bool[source]#

Returns True if the Target exists and False otherwise.

class swh.dataset.luigi.CreateAthena(*args, **kwargs)[source]#

Bases: Task

Creates tables on AWS Athena pointing to a given graph dataset on S3.

Example invocation:

luigi --local-scheduler --module swh.dataset.luigi CreateAthena                 --ExportGraph-config=graph.staging.yml                 --athena-db-name=swh_20221108                 --object-types=origin,origin_visit                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08                 --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena

which is equivalent to this CLI call:

swh dataset athena create –database-name swh_20221108 –location-prefix s3://softwareheritage/graph/swh_2022-11-08 –output-location s3://softwareheritage/graph/tmp/athena –replace-tables

object_types = <luigi.parameter.EnumListParameter object>#
s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
s3_athena_output_location = <swh.dataset.luigi.S3PathParameter object>#
athena_db_name = <luigi.parameter.Parameter object>#
requires() List[Task][source]#

Returns the corresponding UploadExportToS3 instance, with ORC as only format.

output() List[Target][source]#

Returns an instance of AthenaDatabaseTarget.

run() None[source]#

Creates tables from the ORC dataset.

class swh.dataset.luigi.RunExportAll(*args, **kwargs)[source]#

Bases: WrapperTask

Runs both the S3 and Athena export.

Example invocation:

luigi --local-scheduler --module swh.dataset.luigi RunExportAll                 --ExportGraph-config=graph.staging.yml                 --ExportGraph-processes=12                 --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/                 --formats=edges                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08                 --athena-db-name=swh_20221108                 --object-types=origin,origin_visit                 --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
formats = <luigi.parameter.EnumListParameter object>#
object_types = <luigi.parameter.EnumListParameter object>#
s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
s3_athena_output_location = <swh.dataset.luigi.S3PathParameter object>#
athena_db_name = <luigi.parameter.Parameter object>#
requires() List[Task][source]#

Returns instances of CreateAthena and UploadExportToS3.