swh.export.luigi module#

Luigi tasks#

This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-graph’s.

File layout#

Tasks in this module work on “export directories”, which have this layout:

swh_<date>[_<flavor>]/
    edges/
        origin/
        snapshot/
        ...
        stamps/
            origin
            snapshot
            ...
    orc/
        origin/
        snapshot/
        ...
        stamps/
            origin
            snapshot
            ...
    meta/
        export.json

stamps files are written after corresponding directories are written. Their presence indicates the corresponding directory was fully generated/copied. This allows skipping work that was already done, while ignoring interrupted jobs. They are omitted after the initial export (ie. when downloading to/from other machines).

meta/export.json contains information about the dataset, for provenance tracking. For example:

{
    "flavor": "full",
    "export_start": "2022-11-08T11:00:54.998799+00:00",
    "export_end": "2022-11-08T11:05:53.105519+00:00",
    "brokers": [
        "broker1.journal.staging.swh.network:9093"
    ],
    "prefix": "swh.journal.objects",
    "formats": [
        "edges",
        "orc"
    ],
    "object_types": [
        "revision",
        "release",
        "snapshot",
        "origin_visit_status",
        "origin_visit",
        "origin"
    ],
    "privileged": false,
    "hostname": "desktop5",
    "tool": {
        "name": "swh.export",
        "version": "0.3.2"
    }
}

object_types contains a list of “main tables” exported; this excludes relational tables like directory_entry.

Running all on staging#

An easy way to run it (eg. on the staging database), is to have these config files:

And run this command, for example:

luigi --log-level INFO --local-scheduler --module swh.export.luigi RunExportAll             --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/             --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/             --athena-db-name=vlorentz_20221109_staging

Note that this arbitrarily divides config options between luigi.cfg and the CLI for readability; but they can be used interchangeably

class swh.export.luigi.ObjectType(value)#

Bases: Enum

origin = 1#
origin_visit = 2#
origin_visit_status = 3#
snapshot = 4#
release = 5#
revision = 6#
directory = 7#
content = 8#
skipped_content = 9#
class swh.export.luigi.Format(value)#

Bases: Enum

edges = 1#
orc = 2#
swh.export.luigi.merge_lists(lists: Iterable[List[T]]) List[T][source]#

Returns a list made of all items of the arguments, with no duplicate.

swh.export.luigi.parse_offsets(paths: Dict[Any, LocalTarget], object_types: Tuple[ObjectType, ...]) Dict[ObjectType, Dict[int, Tuple[int, int]]][source]#

returns {obj_type: {partition: (low, high)}

class swh.export.luigi.PathParameter(is_dir: bool = False, is_file: bool = False, exists: bool = False, create: bool = False, **kwargs)[source]#

Bases: PathParameter

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

Parameters:
  • is_dir – whether the path should be to a directory

  • is_file – whether the path should be to a directory

  • exists – whether the path should already exist

  • create – whether the path should be created if it does not exist

is_dir and is_file are mutually exclusive. exists and create are mutually exclusive.

parse(s: str) Path[source]#

Parse an individual value from the input.

The default implementation is the identity function, but subclasses should override this method for specialized parsing.

Parameters:

x (str) – the value to parse.

Returns:

the parsed value.

class swh.export.luigi.S3PathParameter(*args, **kwargs)[source]#

Bases: StrParameter

A parameter that strip trailing slashes

normalize(s)[source]#

Given a parsed parameter value, normalizes it.

The value can either be the result of parse(), the default value or arguments passed into the task’s constructor by instantiation.

This is very implementation defined, but can be used to validate/clamp valid values. For example, if you wanted to only accept even integers, and “correct” odd values to the nearest integer, you can implement normalize as x // 2 * 2.

class swh.export.luigi.FractionalFloatParameter(*args, **kwargs)[source]#

Bases: FloatParameter

A float parameter that must be between 0 and 1

parse(s)[source]#

Parses a float from the string using float().

swh.export.luigi.stamps_paths(formats: List[Format], object_types: List[ObjectType]) List[str][source]#

Returns a list of (local FS or S3) paths used to mark tables as successfully exported.

swh.export.luigi.get_masked_swhids(logger, config: Dict[str, Any]) Set[ExtendedSWHID][source]#

Fetches the masking database and returns the list of all non-visible SWHIDs

class swh.export.luigi.StartExport(*args, **kwargs)[source]#

Bases: Task

Pseudo-task that computes the journal offsets from and to which objects should be exported

config_file#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_export_path#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_sensitive_export_path#

Class to parse optional path parameters.

export_id#

Class to parse optional parameters.

margin#

A float parameter that must be between 0 and 1

object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
output() Dict[str | ObjectType, LocalTarget][source]#

Returns a stamp file for each step, in self.local_export_path/tmp/stamps/

complete() bool[source]#

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

run() None[source]#

Writes the offsets file

class swh.export.luigi.ExportTopic(*args, **kwargs)[source]#

Bases: Task

Exports a single topic, given already computed offsets in the journal.

config_file#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_export_path#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_sensitive_export_path#

Class to parse optional path parameters.

export_id#

Class to parse optional parameters.

formats#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
processes#

Parameter whose value is an int.

margin#

A float parameter that must be between 0 and 1

object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
requires() Dict[str, Task][source]#

Returns an instance of StartExport

output() List[LocalTarget][source]#

Returns a luigi.LocalTarget instance for each stamp file

run() None[source]#

Consumes all of the self.OBJECT_TYPE topic into self.export_path / self.OBJECT_TYPE.

class swh.export.luigi.ExportPersonsTable(*args, **kwargs)[source]#

Bases: Task

Aggregates lists of persons exported by ExportTopic into a single table with no duplicates.

config_file#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_export_path#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_sensitive_export_path#

Class to parse optional path parameters.

export_id#

Class to parse optional parameters.

formats#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
processes#

Parameter whose value is an int.

margin#

A float parameter that must be between 0 and 1

object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
requires() Dict[str, Task][source]#

Returns an instance of StartExport, and an instance of ExportTopic for each value in self.object_types

output() Dict[str, LocalTarget][source]#

Returns {self.local_export_path}/tmp/stamps/person.json

run()[source]#

Aggregates lists of persons exported by ExportTopic into a single table with no duplicates.

class swh.export.luigi.ExportGraph(*args, **kwargs)[source]#

Bases: Task

Exports the entire graph to the local filesystem.

Example invocation:

luigi --local-scheduler --module swh.export.luigi ExportGraph                 --config=graph.prod.yml                 --local-export-path=export/                 --formats=edges

which is equivalent to this CLI call:

swh export –config-file graph.prod.yml graph export export/ –formats=edges

config_file#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_export_path#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_sensitive_export_path#

Class to parse optional path parameters.

export_id#

Class to parse optional parameters.

formats#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
processes#

Parameter whose value is an int.

margin#

A float parameter that must be between 0 and 1

object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
export_name#

Parameter whose value is a str.

output() List[LocalTarget][source]#

Returns path of meta/export.json on the local FS.

complete() bool[source]#

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

get_export_id() str[source]#
requires() Dict[str, Task][source]#

Returns an instance of StartExport, and an instance of ExportTopic for each value in self.object_types

run() None[source]#

Runs the full export, then writes stamps, then meta/export.json.

class swh.export.luigi.UploadExportToS3(*args, **kwargs)[source]#

Bases: Task

Uploads a local dataset export to S3; creating automatically if it does not exist.

Example invocation:

luigi --local-scheduler --module swh.export.luigi UploadExportToS3                 --local-export-path=export/                 --formats=edges                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
local_export_path#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

formats#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
s3_export_path#

A parameter that strip trailing slashes

requires() List[Task][source]#

Returns a ExportGraph task that writes local files at the expected location.

output() List[Target][source]#

Returns stamp and meta paths on S3.

complete() bool[source]#

Returns whether the graph dataset was exported with a superset of object_types

run() None[source]#

Copies all files: first the export itself, then meta/export.json.

class swh.export.luigi.DownloadExportFromS3(*args, **kwargs)[source]#

Bases: Task

Downloads a local dataset export from S3.

This performs the inverse operation of UploadExportToS3

Example invocation:

luigi --local-scheduler --module swh.export.luigi DownloadExportFromS3                 --local-export-path=export/                 --formats=edges                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
local_export_path#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

formats#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
s3_export_path#

A parameter that strip trailing slashes

parallelism#

Parameter whose value is an int.

requires() List[Task][source]#

Returns a ExportGraph task that writes local files at the expected location.

output() List[Target][source]#

Returns stamp and meta paths on the local filesystem.

complete() bool[source]#

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

run() None[source]#

Copies all files: first the export itself, then meta/export.json.

class swh.export.luigi.LocalExport(*args, **kwargs)[source]#

Bases: Task

Task that depends on a local dataset being present – either directly from ExportGraph or via DownloadExportFromS3.

local_export_path#

A parameter that is a local filesystem path.

If is_dir, is_file, or exists is True, then existence of the path (and optionally type) is checked.

If create is set, then is_dir must be True, and the directory is created if it does not already exist.

local_sensitive_export_path#

Class to parse optional path parameters.

formats#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
export_task_type#

A parameter that takes another luigi task class.

When used programatically, the parameter should be specified directly with the luigi.task.Task (sub) class. Like MyMetaTask(my_task_param=my_tasks.MyTask). On the command line, you specify the luigi.task.Task.get_task_family(). Like

$ luigi --module my_tasks MyMetaTask --my_task_param my_namespace.MyTask

Where my_namespace.MyTask is defined in the my_tasks python module.

When the luigi.task.Task class is instantiated to an object. The value will always be a task class (and not a string).

requires() List[Task][source]#

Returns an instance of either ExportGraph or DownloadExportFromS3 depending on the value of export_task_type.

output() List[Target][source]#

Returns stamp and meta paths on the local filesystem.

complete() bool[source]#

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

class swh.export.luigi.AthenaDatabaseTarget(name: str, table_names: Set[str])[source]#

Bases: Target

Target for the existence of a database on Athena.

exists() bool[source]#

Returns True if the Target exists and False otherwise.

class swh.export.luigi.CreateAthena(*args, **kwargs)[source]#

Bases: Task

Creates tables on AWS Athena pointing to a given graph dataset on S3.

Example invocation:

luigi --local-scheduler --module swh.export.luigi CreateAthena                 --ExportGraph-config=graph.staging.yml                 --athena-db-name=swh_20221108                 --object-types=origin,origin_visit                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08                 --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena

which is equivalent to this CLI call:

swh export athena create –database-name swh_20221108 –location-prefix s3://softwareheritage/graph/swh_2022-11-08 –output-location s3://softwareheritage/graph/tmp/athena –replace-tables

object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
s3_export_path#

A parameter that strip trailing slashes

s3_athena_output_location#

A parameter that strip trailing slashes

athena_db_name#

Parameter whose value is a str.

requires() List[Task][source]#

Returns the corresponding UploadExportToS3 instance, with ORC as only format.

output() List[Target][source]#

Returns an instance of AthenaDatabaseTarget.

run() None[source]#

Creates tables from the ORC dataset.

class swh.export.luigi.RunExportAll(*args, **kwargs)[source]#

Bases: WrapperTask

Runs both the S3 and Athena export.

Example invocation:

luigi --local-scheduler --module swh.export.luigi RunExportAll                 --ExportGraph-config=graph.staging.yml                 --ExportGraph-processes=12                 --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/                 --formats=edges                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08                 --athena-db-name=swh_20221108                 --object-types=origin,origin_visit                 --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
formats#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
s3_export_path#

A parameter that strip trailing slashes

s3_athena_output_location#

A parameter that strip trailing slashes

athena_db_name#

Parameter whose value is a str.

requires() List[Task][source]#

Returns instances of CreateAthena and UploadExportToS3.