swh.dataset.luigi module#
Luigi tasks#
This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-graph’s.
File layout#
Tasks in this module work on “export directories”, which have this layout:
swh_<date>[_<flavor>]/
edges/
origin/
snapshot/
...
stamps/
origin
snapshot
...
orc/
origin/
snapshot/
...
stamps/
origin
snapshot
...
meta/
export.json
stamps
files are written after corresponding directories are written.
Their presence indicates the corresponding directory was fully generated/copied.
This allows skipping work that was already done, while ignoring interrupted jobs.
They are omitted after the initial export (ie. when downloading to/from other machines).
meta/export.json
contains information about the dataset, for provenance tracking.
For example:
{
"flavor": "full",
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"brokers": [
"broker1.journal.staging.swh.network:9093"
],
"prefix": "swh.journal.objects",
"formats": [
"edges",
"orc"
],
"object_types": [
"revision",
"release",
"snapshot",
"origin_visit_status",
"origin_visit",
"origin"
],
"privileged": false,
"hostname": "desktop5",
"tool": {
"name": "swh.dataset",
"version": "0.3.2"
}
}
object_types
contains a list of “main tables” exported; this excludes relational
tables like directory_entry
.
Running all on staging#
An easy way to run it (eg. on the staging database), is to have these config files:
And run this command, for example:
luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunExportAll --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ --athena-db-name=vlorentz_20221109_staging
Note that this arbitrarily divides config options between luigi.cfg
and the CLI
for readability; but they can be used interchangeably
- class swh.dataset.luigi.ObjectType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
Enum
- origin = 1#
- origin_visit = 2#
- origin_visit_status = 3#
- snapshot = 4#
- release = 5#
- revision = 6#
- directory = 7#
- content = 8#
- skipped_content = 9#
- class swh.dataset.luigi.Format(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
Enum
- edges = 1#
- orc = 2#
- swh.dataset.luigi.merge_lists(lists: Iterator[List[T]]) List[T] [source]#
Returns a list made of all items of the arguments, with no duplicate.
- class swh.dataset.luigi.PathParameter(is_dir: bool = False, is_file: bool = False, exists: bool = False, create: bool = False, **kwargs)[source]#
Bases:
PathParameter
A parameter that is a local filesystem path.
If
is_dir
,is_file
, orexists
isTrue
, then existence of the path (and optionally type) is checked.If
create
is set, thenis_dir
must beTrue
, and the directory is created if it does not already exist.- Parameters:
is_dir – whether the path should be to a directory
is_file – whether the path should be to a directory
exists – whether the path should already exist
create – whether the path should be created if it does not exist
is_dir
andis_file
are mutually exclusive.exists
andcreate
are mutually exclusive.
- class swh.dataset.luigi.S3PathParameter(*args, **kwargs)[source]#
Bases:
Parameter
A parameter that strip trailing slashes
- normalize(s)[source]#
Given a parsed parameter value, normalizes it.
The value can either be the result of parse(), the default value or arguments passed into the task’s constructor by instantiation.
This is very implementation defined, but can be used to validate/clamp valid values. For example, if you wanted to only accept even integers, and “correct” odd values to the nearest integer, you can implement normalize as
x // 2 * 2
.
- class swh.dataset.luigi.FractionalFloatParameter(*args, **kwargs)[source]#
Bases:
FloatParameter
A float parameter that must be between 0 and 1
- swh.dataset.luigi.stamps_paths(formats: List[Format], object_types: List[ObjectType]) List[str] [source]#
Returns a list of (local FS or S3) paths used to mark tables as successfully exported.
- class swh.dataset.luigi.ExportGraph(*args, **kwargs)[source]#
Bases:
Task
Exports the entire graph to the local filesystem.
Example invocation:
luigi --local-scheduler --module swh.dataset.luigi ExportGraph --config=graph.prod.yml --local-export-path=export/ --formats=edges
which is equivalent to this CLI call:
swh dataset –config-file graph.prod.yml graph export export/ –formats=edges
- config_file = <swh.dataset.luigi.PathParameter object>#
- local_export_path = <swh.dataset.luigi.PathParameter object>#
- export_id = <luigi.parameter.OptionalParameter object>#
- formats = <luigi.parameter.EnumListParameter object>#
- processes = <luigi.parameter.IntParameter object>#
- margin = <swh.dataset.luigi.FractionalFloatParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- class swh.dataset.luigi.UploadExportToS3(*args, **kwargs)[source]#
Bases:
Task
Uploads a local dataset export to S3; creating automatically if it does not exist.
Example invocation:
luigi --local-scheduler --module swh.dataset.luigi UploadExportToS3 --local-export-path=export/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
- local_export_path = <swh.dataset.luigi.PathParameter object>#
- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
- requires() List[Task] [source]#
Returns a
ExportGraph
task that writes local files at the expected location.
- class swh.dataset.luigi.DownloadExportFromS3(*args, **kwargs)[source]#
Bases:
Task
Downloads a local dataset export from S3.
This performs the inverse operation of
UploadExportToS3
Example invocation:
luigi --local-scheduler --module swh.dataset.luigi DownloadExportFromS3 --local-export-path=export/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
- local_export_path = <swh.dataset.luigi.PathParameter object>#
- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
- parallelism = <luigi.parameter.IntParameter object>#
- requires() List[Task] [source]#
Returns a
ExportGraph
task that writes local files at the expected location.
- class swh.dataset.luigi.LocalExport(*args, **kwargs)[source]#
Bases:
Task
Task that depends on a local dataset being present – either directly from
ExportGraph
or viaDownloadExportFromS3
.- local_export_path = <swh.dataset.luigi.PathParameter object>#
- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- export_task_type = <luigi.parameter.TaskParameter object>#
- requires() List[Task] [source]#
Returns an instance of either
ExportGraph
orDownloadExportFromS3
depending on the value ofexport_task_type
.
- class swh.dataset.luigi.AthenaDatabaseTarget(name: str, table_names: Set[str])[source]#
Bases:
Target
Target for the existence of a database on Athena.
- class swh.dataset.luigi.CreateAthena(*args, **kwargs)[source]#
Bases:
Task
Creates tables on AWS Athena pointing to a given graph dataset on S3.
Example invocation:
luigi --local-scheduler --module swh.dataset.luigi CreateAthena --ExportGraph-config=graph.staging.yml --athena-db-name=swh_20221108 --object-types=origin,origin_visit --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
which is equivalent to this CLI call:
swh dataset athena create –database-name swh_20221108 –location-prefix s3://softwareheritage/graph/swh_2022-11-08 –output-location s3://softwareheritage/graph/tmp/athena –replace-tables
- object_types = <luigi.parameter.EnumListParameter object>#
- s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
- s3_athena_output_location = <swh.dataset.luigi.S3PathParameter object>#
- athena_db_name = <luigi.parameter.Parameter object>#
- requires() List[Task] [source]#
Returns the corresponding
UploadExportToS3
instance, with ORC as only format.
- output() List[Target] [source]#
Returns an instance of
AthenaDatabaseTarget
.
- class swh.dataset.luigi.RunExportAll(*args, **kwargs)[source]#
Bases:
WrapperTask
Runs both the S3 and Athena export.
Example invocation:
luigi --local-scheduler --module swh.dataset.luigi RunExportAll --ExportGraph-config=graph.staging.yml --ExportGraph-processes=12 --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 --athena-db-name=swh_20221108 --object-types=origin,origin_visit --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- s3_export_path = <swh.dataset.luigi.S3PathParameter object>#
- s3_athena_output_location = <swh.dataset.luigi.S3PathParameter object>#
- athena_db_name = <luigi.parameter.Parameter object>#
- requires() List[Task] [source]#
Returns instances of
CreateAthena
andUploadExportToS3
.