# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks
===========
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks,
as an alternative to the CLI that can be composed with other tasks,
such as swh-graph's.
File layout
-----------
Tasks in this module work on "export directories", which have this layout::
swh_<date>[_<flavor>]/
edges/
origin/
snapshot/
...
stamps/
origin
snapshot
...
orc/
origin/
snapshot/
...
stamps/
origin
snapshot
...
meta/
export.json
``stamps`` files are written after corresponding directories are written.
Their presence indicates the corresponding directory was fully generated/copied.
This allows skipping work that was already done, while ignoring interrupted jobs.
They are omitted after the initial export (ie. when downloading to/from other machines).
``meta/export.json`` contains information about the dataset, for provenance tracking.
For example:
.. code-block:: json
{
"flavor": "full",
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"brokers": [
"broker1.journal.staging.swh.network:9093"
],
"prefix": "swh.journal.objects",
"formats": [
"edges",
"orc"
],
"object_types": [
"revision",
"release",
"snapshot",
"origin_visit_status",
"origin_visit",
"origin"
],
"privileged": false,
"hostname": "desktop5",
"tool": {
"name": "swh.dataset",
"version": "0.3.2"
}
}
``object_types`` contains a list of "main tables" exported; this excludes relational
tables like ``directory_entry``.
Running all on staging
----------------------
An easy way to run it (eg. on the staging database), is to have these config
files:
.. code-block: yaml
:caption: graph.staging.yml
journal:
brokers:
- broker1.journal.staging.swh.network:9093
prefix: swh.journal.objects
sasl.mechanism: "SCRAM-SHA-512"
security.protocol: "sasl_ssl"
sasl.username: "<username>"
sasl.password: "<password>"
privileged: false
group_id: "<username>-test-dataset-export"
.. code-block: yaml
:caption: luigi.cfg
[ExportGraph]
config=graph.staging.yml
processes=16
[RunExportAll]
formats=edges,orc
s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/
And run this command, for example::
luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunExportAll \
--UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \
--s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \
--athena-db-name=vlorentz_20221109_staging
Note that this arbitrarily divides config options between :file:`luigi.cfg` and the CLI
for readability; but `they can be used interchangeably <https://luigi.readthedocs.io/en/stable/configuration.html#parameters-from-config-ingestion>`__
""" # noqa
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import enum
from pathlib import Path
import shutil
from typing import Hashable, Iterator, List, Set, TypeVar, Union
import luigi
from swh.dataset import cli
from swh.dataset.relational import MAIN_TABLES
ObjectType = enum.Enum( # type: ignore[misc]
"ObjectType", [obj_type for obj_type in MAIN_TABLES.keys()]
)
Format = enum.Enum("Format", list(cli.AVAILABLE_EXPORTERS)) # type: ignore[misc]
T = TypeVar("T", bound=Hashable)
[docs]
def merge_lists(lists: Iterator[List[T]]) -> List[T]:
"""Returns a list made of all items of the arguments, with no duplicate."""
res = set()
for list_ in lists:
res.update(set(list_))
return list(res)
[docs]
class PathParameter(luigi.PathParameter):
"""
A parameter that is a local filesystem path.
If ``is_dir``, ``is_file``, or ``exists`` is :const:`True`, then existence of
the path (and optionally type) is checked.
If ``create`` is set, then ``is_dir`` must be :const:`True`, and the directory
is created if it does not already exist.
"""
def __init__(
self,
is_dir: bool = False,
is_file: bool = False,
exists: bool = False,
create: bool = False,
**kwargs,
):
"""
:param is_dir: whether the path should be to a directory
:param is_file: whether the path should be to a directory
:param exists: whether the path should already exist
:param create: whether the path should be created if it does not exist
``is_dir`` and ``is_file`` are mutually exclusive.
``exists`` and ``create`` are mutually exclusive.
"""
if create and not is_dir:
raise ValueError("`is_dir` must be True if `create` is True")
if is_dir and is_file:
raise ValueError("`is_dir` and `is_file` are mutually exclusive")
super().__init__(**kwargs)
self.is_dir = is_dir
self.is_file = is_file
self.exists = exists
self.create = create
[docs]
def parse(self, s: str) -> Path:
path = Path(s)
if self.create:
path.mkdir(parents=True, exist_ok=True)
if (self.exists or self.is_dir or self.is_file) and not path.exists():
raise ValueError(f"{s} does not exist")
if self.is_dir and not path.is_dir():
raise ValueError(f"{s} is not a directory")
if self.is_file and not path.is_file():
raise ValueError(f"{s} is not a file")
return path
[docs]
class S3PathParameter(luigi.Parameter):
"""A parameter that strip trailing slashes"""
def __init__(self, *args, **kwargs):
""""""
# Override luigi.Parameter.__init__'s docstring, which contains a broken ref
super().__init__(*args, **kwargs)
[docs]
def normalize(self, s):
return s.rstrip("/")
[docs]
class FractionalFloatParameter(luigi.FloatParameter):
"""A float parameter that must be between 0 and 1"""
def __init__(self, *args, **kwargs):
""""""
# Override luigi.Parameter.__init__'s docstring, which contains a broken ref
super().__init__(*args, **kwargs)
[docs]
def parse(self, s):
v = super().parse(s)
if not 0.0 <= v <= 1.0:
raise ValueError(f"{s} is not a float between 0 and 1")
return v
[docs]
def stamps_paths(formats: List[Format], object_types: List[ObjectType]) -> List[str]:
"""Returns a list of (local FS or S3) paths used to mark tables as successfully
exported.
"""
return [
f"{format_.name}/stamps/{object_type.name.lower()}"
for format_ in formats
for object_type in object_types
]
def _export_metadata_has_object_types(
export_metadata: Union[luigi.LocalTarget, "luigi.contrib.s3.S3Target"],
object_types: List[ObjectType],
) -> bool:
import json
with export_metadata.open() as fd:
meta = json.load(fd)
return set(meta["object_types"]) >= {
object_type.name for object_type in object_types
}
[docs]
class ExportGraph(luigi.Task):
"""Exports the entire graph to the local filesystem.
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi ExportGraph \
--config=graph.prod.yml \
--local-export-path=export/ \
--formats=edges
which is equivalent to this CLI call:
swh dataset --config-file graph.prod.yml graph export export/ --formats=edges
"""
config_file = PathParameter(is_file=True)
local_export_path = PathParameter(is_dir=True, create=True)
export_id = luigi.OptionalParameter(
default=None,
description="""
Unique ID of the export run. This is appended to the kafka
group_id config file option. If group_id is not set in the
'journal' section of the config file, defaults to 'swh-dataset-export-'.
""",
)
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists)
processes = luigi.IntParameter(default=1, significant=False)
margin = FractionalFloatParameter(
default=1.0,
description="""
Offset margin to start consuming from. E.g. is set to '0.95',
consumers will start at 95%% of the last committed offset;
in other words, start earlier than last committed position.
""",
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
[docs]
def output(self) -> List[luigi.Target]:
"""Returns path of `meta/export.json` on the local FS."""
return [self._meta()]
[docs]
def complete(self) -> bool:
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
def _stamps(self):
return [
luigi.LocalTarget(self.local_export_path / path)
for path in stamps_paths(self.formats, self.object_types)
]
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
[docs]
def run(self) -> None:
"""Runs the full export, then writes stamps, then :file:`meta.json`."""
import datetime
import json
import socket
import pkg_resources
from swh.core import config
# we are about to overwrite files, so remove any existing stamp
for output in self.output():
if output.exists():
output.remove()
if self.local_export_path.exists(): # type: ignore[operator]
# don't delete self.local_export_path itself, it may be pre-created by
# the root user in a directory we cannot write to.
for path in self.local_export_path.iterdir():
shutil.rmtree(path)
conf = config.read(self.config_file)
start_date = datetime.datetime.now(tz=datetime.timezone.utc)
cli.run_export_graph(
config=conf,
export_path=self.local_export_path,
export_formats=[format_.name for format_ in self.formats],
object_types=[obj_type.name.lower() for obj_type in self.object_types],
exclude_obj_types=set(),
export_id=self.export_id,
processes=self.processes,
margin=self.margin,
)
end_date = datetime.datetime.now(tz=datetime.timezone.utc)
# Create stamps
for output in self._stamps():
output.makedirs()
with output.open("w") as fd:
pass
# Write export metadata
meta = {
"flavor": "full",
"export_start": start_date.isoformat(),
"export_end": end_date.isoformat(),
"brokers": conf["journal"]["brokers"],
"prefix": conf["journal"]["prefix"],
"formats": [format_.name for format_ in self.formats],
"object_types": [object_type.name for object_type in self.object_types],
"privileged": conf["journal"].get("privileged"),
"hostname": socket.getfqdn(),
"tool": {
"name": "swh.dataset",
"version": pkg_resources.get_distribution("swh.dataset").version,
},
}
with self._meta().open("w") as fd:
json.dump(meta, fd, indent=4)
[docs]
class UploadExportToS3(luigi.Task):
"""Uploads a local dataset export to S3; creating automatically if it does
not exist.
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi UploadExportToS3 \
--local-export-path=export/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
"""
local_export_path = PathParameter(is_dir=True, create=True, significant=False)
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path = S3PathParameter()
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`ExportGraph` task that writes local files at the
expected location."""
return [
ExportGraph(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on S3."""
return [self._meta()]
def _meta(self):
import luigi.contrib.s3
return luigi.contrib.s3.S3Target(f"{self.s3_export_path}/meta/export.json")
[docs]
def complete(self) -> bool:
"""Returns whether the graph dataset was exported with a superset of
``object_types``"""
if (
"s3://softwareheritage/graph/"
< self.s3_export_path
< "s3://softwareheritage/graph/2022-12-07"
):
# exports before 2022-12-07 did not have the metadata needed; skip check
# for old exports.
return True
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
[docs]
def run(self) -> None:
"""Copies all files: first the export itself, then :file:`meta.json`."""
import os
import luigi.contrib.s3
import tqdm
client = luigi.contrib.s3.S3Client()
# recursively copy local files to S3, and end with stamps and export metadata
for format_ in self.formats:
for dirname in os.listdir(self.local_export_path / format_.name):
if dirname == "stamps":
# used as stamps while exporting, pointless to copy them
continue
if dirname == "meta":
# used as final stamp; copy it at the end
continue
local_dir = self.local_export_path / format_.name / dirname
s3_dir = f"{self.s3_export_path}/{format_.name}/{dirname}"
status_message = f"Uploading {format_.name}/{dirname}/"
self.set_status_message(status_message)
for file_ in tqdm.tqdm(
list(os.listdir(local_dir)),
desc=status_message,
):
local_path = local_dir / file_
s3_path = f"{s3_dir}/{file_}"
obj_summary = client.get_key(s3_path)
if (
obj_summary is not None
and obj_summary.size == local_path.stat().st_size
):
# already uploaded (probably by a previous interrupted run)
continue
client.put_multipart(local_path, s3_path, ACL="public-read")
client.put(
self.local_export_path / "meta" / "export.json",
self._meta().path,
ACL="public-read",
)
[docs]
class DownloadExportFromS3(luigi.Task):
"""Downloads a local dataset export from S3.
This performs the inverse operation of :class:`UploadExportToS3`
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi DownloadExportFromS3 \
--local-export-path=export/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
"""
local_export_path = PathParameter(is_dir=True, create=True)
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path = S3PathParameter(significant=False)
parallelism = luigi.IntParameter(default=10, significant=False)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`ExportGraph` task that writes local files at the
expected location."""
return [
UploadExportToS3(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
s3_export_path=self.s3_export_path,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
[docs]
def complete(self) -> bool:
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
[docs]
def run(self) -> None:
"""Copies all files: first the export itself, then :file:`meta.json`."""
import collections
import multiprocessing.dummy
import luigi.contrib.s3
import tqdm
client = luigi.contrib.s3.S3Client()
# recursively copy local files to S3, and end with export metadata
paths = []
for format_ in self.formats:
local_dir = self.local_export_path / format_.name
s3_dir = f"{self.s3_export_path}/{format_.name}"
files = list(client.list(s3_dir))
assert files, "No files found"
files_by_type = collections.defaultdict(list)
for file in files:
files_by_type[file.split("/")[0]].append(file)
for object_type, files in files_by_type.items():
(local_dir / object_type).mkdir(parents=True, exist_ok=True)
for file_ in files:
paths.append(
(
f"{s3_dir}/{file_}",
str(local_dir / file_),
)
)
with multiprocessing.dummy.Pool(self.parallelism) as p:
for i, _ in tqdm.tqdm(
enumerate(p.imap_unordered(lambda args: client.get(*args), paths)),
total=len(paths),
desc="Downloading graph export",
):
self.set_progress_percentage(int(i * 100 / len(paths)))
export_json_path = self.local_export_path / "meta" / "export.json"
export_json_path.parent.mkdir(exist_ok=True)
client.get(
f"{self.s3_export_path}/meta/export.json",
self._meta().path,
)
[docs]
class LocalExport(luigi.Task):
"""Task that depends on a local dataset being present -- either directly from
:class:`ExportGraph` or via :class:`DownloadExportFromS3`.
"""
local_export_path = PathParameter(is_dir=True)
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
export_task_type = luigi.TaskParameter(
default=DownloadExportFromS3,
significant=False,
description="""The task used to get the dataset if it is not present.
Should be either ``ExportGraph`` or ``DownloadExportFromS3``.""",
)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns an instance of either :class:`ExportGraph` or
:class:`DownloadExportFromS3` depending on the value of
:attr:`export_task_type`."""
if issubclass(self.export_task_type, ExportGraph):
return [
ExportGraph(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
elif issubclass(self.export_task_type, DownloadExportFromS3):
return [
DownloadExportFromS3(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
else:
raise ValueError(
f"Unexpected export_task_type: {self.export_task_type.__name__}"
)
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
[docs]
def complete(self) -> bool:
if "2020-" < self.local_export_path.name < "2022-12-07":
# exports before 2022-12-07 did not have the metadata needed; skip check
# for old exports.
return True
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
[docs]
class AthenaDatabaseTarget(luigi.Target):
"""Target for the existence of a database on Athena."""
def __init__(self, name: str, table_names: Set[str]):
self.name = name
self.table_names = table_names
[docs]
def exists(self) -> bool:
import boto3
client = boto3.client("athena")
database_list = client.list_databases(CatalogName="AwsDataCatalog")
for database in database_list["DatabaseList"]:
if database["Name"] == self.name:
break
else:
# the database doesn't exist at all
return False
table_metadata = client.list_table_metadata(
CatalogName="AwsDataCatalog", DatabaseName=self.name
)
missing_tables = self.table_names - {
table["Name"] for table in table_metadata["TableMetadataList"]
}
return not missing_tables
[docs]
class CreateAthena(luigi.Task):
"""Creates tables on AWS Athena pointing to a given graph dataset on S3.
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi CreateAthena \
--ExportGraph-config=graph.staging.yml \
--athena-db-name=swh_20221108 \
--object-types=origin,origin_visit \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
--s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
which is equivalent to this CLI call:
swh dataset athena create \
--database-name swh_20221108 \
--location-prefix s3://softwareheritage/graph/swh_2022-11-08 \
--output-location s3://softwareheritage/graph/tmp/athena \
--replace-tables
"""
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path = S3PathParameter()
s3_athena_output_location = S3PathParameter()
athena_db_name = luigi.Parameter()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.s3_export_path.replace("-", "").endswith(
"/" + self.athena_db_name.split("_", 1)[1]
):
raise ValueError(
f"S3 export path ({self.s3_export_path}) does not match "
f"Athena database name ({self.athena_db_name})."
f"They should use these formats: "
f"'s3://<whatever>/YYYY-MM-DD[_<flavor>]/' "
f"and '<prefix>_YYYYMMDD[_<flavor>]"
)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns the corresponding :class:`UploadExportToS3` instance,
with ORC as only format."""
return [
UploadExportToS3(
formats=[Format.orc], # type: ignore[attr-defined]
object_types=self.object_types,
s3_export_path=self.s3_export_path,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns an instance of :class:`AthenaDatabaseTarget`."""
from swh.dataset.athena import TABLES
return [AthenaDatabaseTarget(self.athena_db_name, set(TABLES))]
[docs]
def run(self) -> None:
"""Creates tables from the ORC dataset."""
from swh.dataset.athena import create_tables
create_tables(
self.athena_db_name,
self.s3_export_path,
output_location=self.s3_athena_output_location,
replace=True,
)
[docs]
class RunExportAll(luigi.WrapperTask):
"""Runs both the S3 and Athena export.
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi RunExportAll \
--ExportGraph-config=graph.staging.yml \
--ExportGraph-processes=12 \
--UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
--athena-db-name=swh_20221108 \
--object-types=origin,origin_visit \
--s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
"""
formats = luigi.EnumListParameter(
enum=Format, batch_method=merge_lists, default=list(Format)
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path = S3PathParameter()
s3_athena_output_location = S3PathParameter()
athena_db_name = luigi.Parameter()
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns instances of :class:`CreateAthena` and :class:`UploadExportToS3`."""
# CreateAthena depends on UploadExportToS3(formats=[edges]), so we need to
# explicitly depend on UploadExportToS3(formats=self.formats) here, to also
# export the formats requested by the user.
return [
CreateAthena(
object_types=self.object_types,
s3_export_path=self.s3_export_path,
s3_athena_output_location=self.s3_athena_output_location,
athena_db_name=self.athena_db_name,
),
UploadExportToS3(
formats=self.formats,
object_types=self.object_types,
s3_export_path=self.s3_export_path,
),
]