# Copyright (C) 2022-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks
===========
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks,
as an alternative to the CLI that can be composed with other tasks,
such as swh-graph's.
File layout
-----------
Tasks in this module work on "export directories", which have this layout::
swh_<date>[_<flavor>]/
edges/
origin/
snapshot/
...
stamps/
origin
snapshot
...
orc/
origin/
snapshot/
...
stamps/
origin
snapshot
...
meta/
export.json
``stamps`` files are written after corresponding directories are written.
Their presence indicates the corresponding directory was fully generated/copied.
This allows skipping work that was already done, while ignoring interrupted jobs.
They are omitted after the initial export (ie. when downloading to/from other machines).
``meta/export.json`` contains information about the dataset, for provenance tracking.
For example:
.. code-block:: json
{
"flavor": "full",
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"brokers": [
"broker1.journal.staging.swh.network:9093"
],
"prefix": "swh.journal.objects",
"formats": [
"edges",
"orc"
],
"object_types": [
"revision",
"release",
"snapshot",
"origin_visit_status",
"origin_visit",
"origin"
],
"privileged": false,
"hostname": "desktop5",
"tool": {
"name": "swh.export",
"version": "0.3.2"
}
}
``object_types`` contains a list of "main tables" exported; this excludes relational
tables like ``directory_entry``.
Running all on staging
----------------------
An easy way to run it (eg. on the staging database), is to have these config
files:
.. code-block: yaml
:caption: graph.staging.yml
journal:
brokers:
- broker1.journal.staging.swh.network:9093
prefix: swh.journal.objects
sasl.mechanism: "SCRAM-SHA-512"
security.protocol: "sasl_ssl"
sasl.username: "<username>"
sasl.password: "<password>"
privileged: false
group_id: "<username>-test-dataset-export"
.. code-block: yaml
:caption: luigi.cfg
[ExportGraph]
config=graph.staging.yml
processes=16
[RunExportAll]
formats=edges,orc
s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/
And run this command, for example::
luigi --log-level INFO --local-scheduler --module swh.export.luigi RunExportAll \
--UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \
--s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \
--athena-db-name=vlorentz_20221109_staging
Note that this arbitrarily divides config options between :file:`luigi.cfg` and the CLI
for readability; but `they can be used interchangeably <https://luigi.readthedocs.io/en/stable/configuration.html#parameters-from-config-ingestion>`__
""" # noqa
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import enum
import logging
from pathlib import Path
import shutil
from typing import (
TYPE_CHECKING,
Any,
Dict,
Hashable,
Iterator,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
import luigi
from swh.export import cli
from swh.export.relational import MAIN_TABLES
from swh.export.utils import subdirectories_for_object_type
if TYPE_CHECKING:
from swh.model.swhids import ExtendedSWHID
ObjectType = enum.Enum( # type: ignore[misc]
"ObjectType", [obj_type for obj_type in MAIN_TABLES.keys()]
)
Format = enum.Enum("Format", list(cli.AVAILABLE_EXPORTERS)) # type: ignore[misc]
T = TypeVar("T", bound=Hashable)
[docs]
def merge_lists(lists: Iterator[List[T]]) -> List[T]:
"""Returns a list made of all items of the arguments, with no duplicate."""
res = set()
for list_ in lists:
res.update(set(list_))
return list(res)
[docs]
class PathParameter(luigi.PathParameter):
"""
A parameter that is a local filesystem path.
If ``is_dir``, ``is_file``, or ``exists`` is :const:`True`, then existence of
the path (and optionally type) is checked.
If ``create`` is set, then ``is_dir`` must be :const:`True`, and the directory
is created if it does not already exist.
"""
def __init__(
self,
is_dir: bool = False,
is_file: bool = False,
exists: bool = False,
create: bool = False,
**kwargs,
):
"""
:param is_dir: whether the path should be to a directory
:param is_file: whether the path should be to a directory
:param exists: whether the path should already exist
:param create: whether the path should be created if it does not exist
``is_dir`` and ``is_file`` are mutually exclusive.
``exists`` and ``create`` are mutually exclusive.
"""
if create and not is_dir:
raise ValueError("`is_dir` must be True if `create` is True")
if is_dir and is_file:
raise ValueError("`is_dir` and `is_file` are mutually exclusive")
super().__init__(**kwargs)
self.is_dir = is_dir
self.is_file = is_file
self.exists = exists
self.create = create
[docs]
def parse(self, s: str) -> Path:
path = Path(s)
if self.create:
path.mkdir(parents=True, exist_ok=True)
if (self.exists or self.is_dir or self.is_file) and not path.exists():
raise ValueError(f"{s} does not exist")
if self.is_dir and not path.is_dir():
raise ValueError(f"{s} is not a directory")
if self.is_file and not path.is_file():
raise ValueError(f"{s} is not a file")
return path
[docs]
class S3PathParameter(luigi.Parameter):
"""A parameter that strip trailing slashes"""
def __init__(self, *args, **kwargs):
""""""
# Override luigi.Parameter.__init__'s docstring, which contains a broken ref
super().__init__(*args, **kwargs)
[docs]
def normalize(self, s):
return s.rstrip("/")
[docs]
class FractionalFloatParameter(luigi.FloatParameter):
"""A float parameter that must be between 0 and 1"""
def __init__(self, *args, **kwargs):
""""""
# Override luigi.Parameter.__init__'s docstring, which contains a broken ref
super().__init__(*args, **kwargs)
[docs]
def parse(self, s):
v = super().parse(s)
if not 0.0 <= v <= 1.0:
raise ValueError(f"{s} is not a float between 0 and 1")
return v
[docs]
def stamps_paths(formats: List[Format], object_types: List[ObjectType]) -> List[str]:
"""Returns a list of (local FS or S3) paths used to mark tables as successfully
exported.
"""
return [
f"tmp/stamps/{object_type.name.lower()}"
for format_ in formats
for object_type in object_types
]
def _export_metadata_has_object_types(
export_metadata: Union[luigi.LocalTarget, "luigi.contrib.s3.S3Target"],
object_types: List[ObjectType],
) -> bool:
import json
with export_metadata.open() as fd:
meta = json.load(fd)
return set(meta["object_types"]) >= {
object_type.name for object_type in object_types
}
[docs]
def get_masked_swhids(logger, config: Dict[str, Any]) -> Set["ExtendedSWHID"]:
"""Fetches the masking database and returns the list of all non-visible SWHIDs"""
import tqdm
from swh.storage.proxies.masking.db import MaskingQuery
if config["masking_db"] is None:
logger.warning("Exporting dataset without masking.")
return set()
masking_query = MaskingQuery.connect(config["masking_db"])
return {
swhid
for (swhid, statuses) in tqdm.tqdm(
masking_query.iter_masked_swhids(),
desc="Listing masked SWHIDs",
unit_scale=True,
)
}
[docs]
class StartExport(luigi.Task):
"""Pseudo-task that computes the journal offsets from and to which objects should
be exported"""
config_file: Path = PathParameter(is_file=True) # type: ignore[assignment]
local_export_path: Path = PathParameter(is_dir=True, create=True) # type: ignore[assignment]
local_sensitive_export_path: Optional[Path] = luigi.OptionalPathParameter(
default=None
)
export_id = luigi.OptionalParameter(
description="""
Unique ID of the export run. This is appended to the kafka
group_id config file option. If group_id is not set in the
'journal' section of the config file, defaults to 'swh-export-export-'.
""",
)
margin: float = FractionalFloatParameter( # type: ignore[assignment]
default=1.0,
description="""
Offset margin to start consuming from. E.g. is set to '0.95',
consumers will start at 95%% of the last committed offset;
in other words, start earlier than last committed position.
""",
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
[docs]
def output(self) -> Dict[Union[str, ObjectType], luigi.LocalTarget]:
"""Returns a stamp file for each step, in `self.local_export_path/tmp/stamps/`"""
results: Dict[Union[str, ObjectType], luigi.LocalTarget] = {
"stamp": luigi.LocalTarget(
self.local_export_path / "tmp" / "stamps" / "START.json"
)
}
offsets_dir = self.local_export_path / "tmp" / "offsets"
offsets_dir.mkdir(exist_ok=True, parents=True)
results.update(
{
obj_type: luigi.LocalTarget(offsets_dir / f"{obj_type.name}.json")
for obj_type in self.object_types
}
)
return results
[docs]
def complete(self) -> bool:
import json
if not super().complete():
return False
with self.output()["stamp"].open() as f:
export_id = json.load(f)["export_id"]
assert export_id == self.export_id, (
f"Export was started with export_id {export_id} "
f"but the current task is for {self.export_id}"
)
return True
[docs]
def run(self) -> None:
"""Writes the offsets file"""
import datetime
import json
import yaml
from .journalprocessor import ParallelJournalProcessor
with open(self.config_file) as f:
config = yaml.safe_load(f)
logger = logging.getLogger(__name__)
masked_swhids = get_masked_swhids(logger, config)
# {obj_type: {partition: (low, high)}
offsets: Dict[str, Dict[int, Tuple[int, int]]] = {}
for obj_type in [ # order matter, in order to avoid holes
ObjectType.origin_visit_status, # type: ignore[attr-defined]
ObjectType.origin_visit, # type: ignore[attr-defined]
ObjectType.origin, # type: ignore[attr-defined]
ObjectType.snapshot, # type: ignore[attr-defined]
ObjectType.release, # type: ignore[attr-defined]
ObjectType.revision, # type: ignore[attr-defined]
ObjectType.directory, # type: ignore[attr-defined]
ObjectType.skipped_content, # type: ignore[attr-defined]
ObjectType.content, # type: ignore[attr-defined]
]:
if obj_type not in self.object_types:
continue
journal_processor = ParallelJournalProcessor(
config,
masked_swhids,
[], # exporters, not needed yet
self.export_id,
obj_type.name,
node_sets_path=self.local_export_path / ".node_sets",
persons_dir=self.local_export_path / "unused", # placeholder
processes=4, # very quick, no need for more
)
journal_processor.get_offsets()
assert journal_processor.offsets is not None
offsets[obj_type] = journal_processor.offsets
(self.local_export_path / "tmp" / "dup_persons").mkdir(
parents=True, exist_ok=True
)
for obj_type in self.object_types:
with self.output()[obj_type].open("w") as f:
json.dump({"offsets": offsets[obj_type]}, f)
with self.output()["stamp"].open("w") as f:
json.dump(
{
"export_id": self.export_id,
"start_date": datetime.datetime.now(
tz=datetime.timezone.utc
).isoformat(),
"margin": self.margin,
"object_types": [obj_type.name for obj_type in self.object_types],
},
f,
)
[docs]
class ExportTopic(luigi.Task):
"""Exports a single topic, given already computed offsets in the journal."""
config_file: Path = PathParameter(is_file=True) # type: ignore[assignment]
local_export_path: Path = PathParameter(is_dir=True, create=True) # type: ignore[assignment]
local_sensitive_export_path: Optional[Path] = luigi.OptionalPathParameter(
default=None
)
export_id = luigi.OptionalParameter(
description="""
Unique ID of the export run. This is appended to the kafka
group_id config file option. If group_id is not set in the
'journal' section of the config file, defaults to 'swh-export-export-'.
""",
)
formats = luigi.EnumListParameter(
enum=Format,
batch_method=merge_lists,
default=[Format.orc], # type: ignore[attr-defined]
)
processes = luigi.IntParameter(default=1, significant=False)
margin: float = FractionalFloatParameter( # type: ignore[assignment]
default=1.0,
description="""
Offset margin to start consuming from. E.g. is set to '0.95',
consumers will start at 95%% of the last committed offset;
in other words, start earlier than last committed position.
""",
)
object_types = luigi.EnumListParameter(enum=ObjectType, default=list(ObjectType))
def _stamp_files(self) -> List[Path]:
stamp_dir = Path(self.local_export_path) / "tmp" / "stamps"
return [stamp_dir / f"{obj_type.name}.json" for obj_type in self.object_types]
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`StartExport`"""
return {
"start": StartExport(
config_file=self.config_file,
local_export_path=self.local_export_path,
local_sensitive_export_path=self.local_sensitive_export_path,
export_id=self.export_id,
margin=self.margin,
object_types=self.object_types,
)
}
[docs]
def output(self) -> List[luigi.LocalTarget]:
"""Returns a :class:`luigi.LocalTarget` instance for each stamp file"""
return list(map(luigi.LocalTarget, self._stamp_files()))
def _setrlimit(self, nb_shards):
import resource
logger = logging.getLogger(__name__)
# ParallelJournalProcessor opens 256 LevelDBs in total. Depending on the number of
# processes, this can exceed the maximum number of file descriptors (soft limit
# defaults to 1024 on Debian), so let's increase it.
(soft, hard) = resource.getrlimit(resource.RLIMIT_NOFILE)
open_fds_per_shard = 61 # estimated with plyvel==1.3.0 and libleveldb1d==1.22-3
spare = 1024 # for everything other than LevelDB
want_fd = nb_shards * open_fds_per_shard + spare
if hard < want_fd:
logger.warning(
"Hard limit of open file descriptors (%d) is lower than ideal (%d)",
hard,
want_fd,
)
if soft < want_fd:
want_fd = min(want_fd, hard)
logger.info(
"Soft limit of open file descriptors (%d) is too low, increasing to %d",
soft,
want_fd,
)
resource.setrlimit(resource.RLIMIT_NOFILE, (want_fd, hard))
[docs]
def run(self) -> None:
"""Consumes all of the ``self.OBJECT_TYPE`` topic into
``self.export_path / self.OBJECT_TYPE``."""
import functools
from importlib import import_module
import json
import shutil
import yaml
from .journalprocessor import ParallelJournalProcessor
with open(self.config_file) as f:
config = yaml.safe_load(f)
logger = logging.getLogger(__name__)
masked_swhids = get_masked_swhids(logger, config)
offsets: Dict[ObjectType, Dict[int, Tuple[int, int]]] = {}
for obj_type in self.object_types:
with self.input()["start"][obj_type].open("r") as f:
# {obj_type: {partition: (low, high)}
offsets[obj_type] = {
int(partition): (low, high)
for (partition, (low, high)) in json.load(f)["offsets"].items()
}
print(list(offsets))
self._setrlimit(
sum(
len(topic_offsets)
for (obj_type, topic_offsets) in offsets.items()
if obj_type in self.object_types
)
)
def importcls(clspath):
mod, cls = clspath.split(":")
m = import_module(mod)
return getattr(m, cls)
exporter_cls = dict(
(fmt, importcls(clspath))
for (fmt, clspath) in cli.AVAILABLE_EXPORTERS.items()
if Format[fmt] in self.formats
)
parallel_exporters = {}
for obj_type in self.object_types:
subdirectories = subdirectories_for_object_type(obj_type.name.lower())
for f in self.formats:
for subdirectory in subdirectories:
export_directory = self.local_export_path / f.name / subdirectory
try:
# remove any leftover from a failed previous run
shutil.rmtree(export_directory)
except FileNotFoundError:
pass
# ensure export directory exists as it is expected by the graph compression
# tool but it will not be created if the journal topic to export is empty
export_directory.mkdir(parents=True)
if self.local_sensitive_export_path is not None:
try:
shutil.rmtree(
self.local_sensitive_export_path / f.name / obj_type.name
)
except FileNotFoundError:
pass
exporters = [
functools.partial(
exporter_cls[f.name],
config=config,
object_types=[obj_type.name],
export_path=self.local_export_path / f.name,
sensitive_export_path=(
self.local_sensitive_export_path / f.name
if self.local_sensitive_export_path is not None
else None
),
)
for f in self.formats
]
journal_processor = ParallelJournalProcessor(
config,
masked_swhids,
exporters,
self.export_id,
obj_type.name,
node_sets_path=self.local_export_path / ".node_sets",
persons_dir=self.local_export_path / "tmp" / "dup_persons",
processes=self.processes,
)
journal_processor.offsets = offsets[obj_type]
parallel_exporters[obj_type] = journal_processor
for obj_type, parallel_exporter in parallel_exporters.items():
parallel_exporter.run()
for obj_type in self.object_types:
try:
shutil.rmtree(self.local_export_path / ".node_sets" / obj_type.name)
except FileNotFoundError:
pass
for path in self._stamp_files():
path.write_text(json.dumps({}))
[docs]
class ExportPersonsTable(luigi.Task):
"""Aggregates lists of persons exported by :class:`ExportTopic` into a single table
with no duplicates."""
config_file: Path = PathParameter(is_file=True) # type: ignore[assignment]
local_export_path: Path = PathParameter(is_dir=True, create=True) # type: ignore[assignment]
local_sensitive_export_path: Optional[Path] = luigi.OptionalPathParameter(
default=None
)
export_id = luigi.OptionalParameter(
description="""
Unique ID of the export run. This is appended to the kafka
group_id config file option. If group_id is not set in the
'journal' section of the config file, defaults to 'swh-export-export-'.
""",
)
formats = luigi.EnumListParameter(
enum=Format,
batch_method=merge_lists,
default=[Format.orc], # type: ignore[attr-defined]
)
processes = luigi.IntParameter(default=1, significant=False)
margin: float = FractionalFloatParameter( # type: ignore[assignment]
default=1.0,
description="""
Offset margin to start consuming from. E.g. is set to '0.95',
consumers will start at 95%% of the last committed offset;
in other words, start earlier than last committed position.
""",
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`StartExport`, and an instance of :class:`ExportTopic`
for each value in ``self.object_types``"""
requirements: Dict[str, luigi.Task] = {
"start": StartExport(
config_file=self.config_file,
local_export_path=self.local_export_path,
local_sensitive_export_path=self.local_sensitive_export_path,
export_id=self.export_id,
margin=self.margin,
object_types=self.object_types,
)
}
requirements.update(
{
obj_type: ExportTopic(
config_file=self.config_file,
local_export_path=self.local_export_path,
local_sensitive_export_path=self.local_sensitive_export_path,
export_id=self.export_id,
formats=self.formats,
processes=self.processes,
margin=self.margin,
object_types=[obj_type],
)
for obj_type in self.object_types
if obj_type in (ObjectType.revision, ObjectType.release) # type: ignore[attr-defined]
}
)
return requirements
[docs]
def output(self) -> Dict[str, luigi.LocalTarget]:
"""Returns ``{self.local_export_path}/tmp/stamps/person.json``"""
return {
"stamp": luigi.LocalTarget(
Path(self.local_export_path) / "tmp" / "stamps" / "person.json"
)
}
[docs]
def run(self):
"""Aggregates lists of persons exported by :class:`ExportTopic` into a single
table with no duplicates."""
import json
import uuid
from .fullnames import process_fullnames
if self.local_sensitive_export_path is not None:
fullnames_export_path = self.local_sensitive_export_path / "orc" / "person"
fullnames_export_path.mkdir(parents=True, exist_ok=True)
fullnames_orc = fullnames_export_path / f"{uuid.uuid4()}.orc"
process_fullnames(
fullnames_orc, self.local_export_path / "tmp" / "dup_persons"
)
with self.output()["stamp"].open("w") as f:
json.dump({}, f)
[docs]
class ExportGraph(luigi.Task):
"""Exports the entire graph to the local filesystem.
Example invocation::
luigi --local-scheduler --module swh.export.luigi ExportGraph \
--config=graph.prod.yml \
--local-export-path=export/ \
--formats=edges
which is equivalent to this CLI call:
swh export --config-file graph.prod.yml graph export export/ --formats=edges
"""
config_file: Path = PathParameter(is_file=True) # type: ignore[assignment]
local_export_path: Path = PathParameter(is_dir=True, create=True) # type: ignore[assignment]
local_sensitive_export_path: Optional[Path] = luigi.OptionalPathParameter(
default=None
)
export_id = luigi.OptionalParameter(
default=None,
description="""
Unique ID of the export run. This is appended to the kafka
group_id config file option. If group_id is not set in the
'journal' section of the config file, defaults to 'swh-export-export-'.
""",
)
formats = luigi.EnumListParameter(
enum=Format,
batch_method=merge_lists,
default=[Format.orc], # type: ignore[attr-defined]
)
processes = luigi.IntParameter(default=1, significant=False)
margin: float = FractionalFloatParameter( # type: ignore[assignment]
default=1.0,
description="""
Offset margin to start consuming from. E.g. is set to '0.95',
consumers will start at 95%% of the last committed offset;
in other words, start earlier than last committed position.
""",
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
export_name = luigi.Parameter()
[docs]
def output(self) -> List[luigi.LocalTarget]:
"""Returns path of `meta/export.json` on the local FS."""
return [self._meta()]
[docs]
def complete(self) -> bool:
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
def _stamps(self):
return [
luigi.LocalTarget(self.local_export_path / path)
for path in stamps_paths(self.formats, self.object_types)
]
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
[docs]
def get_export_id(self) -> str:
import json
import logging
import uuid
logger = logging.getLogger(__name__)
if self.export_id:
logger.info("Using configured export id %s", self.export_id)
return self.export_id
else:
start_stamp_path = self.local_export_path / "tmp" / "stamps" / "START.json"
if start_stamp_path.exists():
export_id = json.loads(start_stamp_path.read_text())["export_id"]
logger.info("Reusing export id %s", export_id)
else:
export_id = f"{self.export_name}-{uuid.uuid4()}"
logger.info("Creating new export with id %s", export_id)
return export_id
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`StartExport`, and an instance of :class:`ExportTopic`
for each value in ``self.object_types``"""
export_id = self.get_export_id()
kwargs = dict(
config_file=self.config_file,
local_export_path=self.local_export_path,
local_sensitive_export_path=self.local_sensitive_export_path,
export_id=export_id,
margin=self.margin,
)
dependencies: Dict[str, luigi.Task] = {
obj_type: ExportTopic(
**kwargs,
processes=self.processes,
formats=self.formats,
object_types=[obj_type],
)
for obj_type in self.object_types
}
dependencies["START"] = StartExport(
**kwargs,
object_types=self.object_types,
)
dependencies["PERSONS"] = ExportPersonsTable(
**kwargs,
formats=self.formats,
object_types=self.object_types,
)
return dependencies
[docs]
def run(self) -> None:
"""Runs the full export, then writes stamps, then :file:`meta.json`."""
import datetime
from importlib.metadata import version
import json
import socket
from swh.core import config
conf = config.read(str(self.config_file))
with self.input()["START"]["stamp"].open() as f:
start_date = datetime.datetime.fromisoformat(json.load(f)["start_date"])
end_date = datetime.datetime.now(tz=datetime.timezone.utc)
# Create stamps
for output in self._stamps():
output.makedirs()
with output.open("w") as fd:
pass
# Write export metadata
meta = {
"flavor": "full",
"export_start": start_date.isoformat(),
"export_end": end_date.isoformat(),
"brokers": conf["journal"]["brokers"],
"prefix": conf["journal"]["prefix"],
"formats": [format_.name for format_ in self.formats],
"object_types": [object_type.name for object_type in self.object_types],
"privileged": conf["journal"].get("privileged"),
"hostname": socket.getfqdn(),
"tool": {
"name": "swh.export",
"version": version("swh.export"),
},
}
with self._meta().open("w") as fd:
json.dump(meta, fd, indent=4)
shutil.rmtree(self.local_export_path / "tmp")
[docs]
class UploadExportToS3(luigi.Task):
"""Uploads a local dataset export to S3; creating automatically if it does
not exist.
Example invocation::
luigi --local-scheduler --module swh.export.luigi UploadExportToS3 \
--local-export-path=export/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
"""
local_export_path: Path = PathParameter(is_dir=True, create=True, significant=False) # type: ignore[assignment]
formats = luigi.EnumListParameter(
enum=Format,
batch_method=merge_lists,
default=[Format.orc], # type: ignore[attr-defined]
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path: str = S3PathParameter() # type: ignore[assignment]
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`ExportGraph` task that writes local files at the
expected location."""
return [
ExportGraph(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on S3."""
return [self._meta()]
def _meta(self):
import luigi.contrib.s3
return luigi.contrib.s3.S3Target(f"{self.s3_export_path}/meta/export.json")
[docs]
def complete(self) -> bool:
"""Returns whether the graph dataset was exported with a superset of
``object_types``"""
if (
"s3://softwareheritage/graph/"
< self.s3_export_path
< "s3://softwareheritage/graph/2022-12-07"
):
# exports before 2022-12-07 did not have the metadata needed; skip check
# for old exports.
return True
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
[docs]
def run(self) -> None:
"""Copies all files: first the export itself, then :file:`meta.json`."""
import os
import luigi.contrib.s3
import tqdm
client = luigi.contrib.s3.S3Client()
# recursively copy local files to S3, and end with stamps and export metadata
for format_ in self.formats:
for dirname in os.listdir(self.local_export_path / format_.name):
if dirname in ("stamps", "tmp"):
# used as stamps while exporting, pointless to copy them
continue
if dirname == "meta":
# used as final stamp; copy it at the end
continue
local_dir = self.local_export_path / format_.name / dirname
s3_dir = f"{self.s3_export_path}/{format_.name}/{dirname}"
status_message = f"Uploading {format_.name}/{dirname}/"
self.set_status_message(status_message)
for file_ in tqdm.tqdm(
list(os.listdir(local_dir)),
desc=status_message,
):
local_path = local_dir / file_
s3_path = f"{s3_dir}/{file_}"
obj_summary = client.get_key(s3_path)
if (
obj_summary is not None
and obj_summary.size == local_path.stat().st_size
):
# already uploaded (probably by a previous interrupted run)
continue
client.put_multipart(local_path, s3_path, ACL="public-read")
client.put(
self.local_export_path / "meta" / "export.json",
self._meta().path,
ACL="public-read",
)
[docs]
class DownloadExportFromS3(luigi.Task):
"""Downloads a local dataset export from S3.
This performs the inverse operation of :class:`UploadExportToS3`
Example invocation::
luigi --local-scheduler --module swh.export.luigi DownloadExportFromS3 \
--local-export-path=export/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
"""
local_export_path: Path = PathParameter(is_dir=True, create=True) # type: ignore[assignment]
formats = luigi.EnumListParameter(
enum=Format,
batch_method=merge_lists,
default=[Format.orc], # type: ignore[attr-defined]
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path: str = S3PathParameter(significant=False) # type: ignore[assignment]
parallelism = luigi.IntParameter(default=10, significant=False)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`ExportGraph` task that writes local files at the
expected location."""
return [
UploadExportToS3(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
s3_export_path=self.s3_export_path,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
[docs]
def complete(self) -> bool:
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
[docs]
def run(self) -> None:
"""Copies all files: first the export itself, then :file:`meta.json`."""
import collections
import multiprocessing.dummy
import luigi.contrib.s3
import tqdm
client = luigi.contrib.s3.S3Client()
# recursively copy local files to S3, and end with export metadata
paths = []
for format_ in self.formats:
local_dir = self.local_export_path / format_.name
s3_dir = f"{self.s3_export_path}/{format_.name}"
files = list(client.list(s3_dir))
assert files, "No files found"
files_by_type = collections.defaultdict(list)
for file in files:
files_by_type[file.split("/")[0]].append(file)
for object_type, files in files_by_type.items():
(local_dir / object_type).mkdir(parents=True, exist_ok=True)
for file_ in files:
paths.append(
(
f"{s3_dir}/{file_}",
str(local_dir / file_),
)
)
with multiprocessing.dummy.Pool(self.parallelism) as p:
for i, _ in tqdm.tqdm(
enumerate(p.imap_unordered(lambda args: client.get(*args), paths)),
total=len(paths),
desc="Downloading graph export",
):
self.set_progress_percentage(int(i * 100 / len(paths)))
export_json_path = self.local_export_path / "meta" / "export.json"
export_json_path.parent.mkdir(exist_ok=True)
client.get(
f"{self.s3_export_path}/meta/export.json",
self._meta().path,
)
[docs]
class LocalExport(luigi.Task):
"""Task that depends on a local dataset being present -- either directly from
:class:`ExportGraph` or via :class:`DownloadExportFromS3`.
"""
local_export_path: Path = PathParameter(is_dir=True) # type: ignore[assignment]
local_sensitive_export_path: Optional[Path] = luigi.OptionalPathParameter(
default=None
)
formats = luigi.EnumListParameter(
enum=Format,
batch_method=merge_lists,
default=[Format.orc], # type: ignore[attr-defined]
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
export_task_type = luigi.TaskParameter(
default=DownloadExportFromS3,
significant=False,
description="""The task used to get the dataset if it is not present.
Should be either ``ExportGraph`` or ``DownloadExportFromS3``.""",
)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns an instance of either :class:`ExportGraph` or
:class:`DownloadExportFromS3` depending on the value of
:attr:`export_task_type`."""
if issubclass(self.export_task_type, ExportGraph):
return [
ExportGraph(
local_export_path=self.local_export_path,
local_sensitive_export_path=self.local_sensitive_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
elif issubclass(self.export_task_type, DownloadExportFromS3):
return [
DownloadExportFromS3(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
else:
raise ValueError(
f"Unexpected export_task_type: {self.export_task_type.__name__}"
)
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
[docs]
def complete(self) -> bool:
if "2020-" < self.local_export_path.name < "2022-12-07":
# exports before 2022-12-07 did not have the metadata needed; skip check
# for old exports.
return True
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
[docs]
class AthenaDatabaseTarget(luigi.Target):
"""Target for the existence of a database on Athena."""
def __init__(self, name: str, table_names: Set[str]):
self.name = name
self.table_names = table_names
[docs]
def exists(self) -> bool:
import boto3
client = boto3.client("athena")
database_list = client.list_databases(CatalogName="AwsDataCatalog")
for database in database_list["DatabaseList"]:
if database["Name"] == self.name:
break
else:
# the database doesn't exist at all
return False
table_metadata = client.list_table_metadata(
CatalogName="AwsDataCatalog", DatabaseName=self.name
)
missing_tables = self.table_names - {
table["Name"] for table in table_metadata["TableMetadataList"]
}
return not missing_tables
[docs]
class CreateAthena(luigi.Task):
"""Creates tables on AWS Athena pointing to a given graph dataset on S3.
Example invocation::
luigi --local-scheduler --module swh.export.luigi CreateAthena \
--ExportGraph-config=graph.staging.yml \
--athena-db-name=swh_20221108 \
--object-types=origin,origin_visit \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
--s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
which is equivalent to this CLI call:
swh export athena create \
--database-name swh_20221108 \
--location-prefix s3://softwareheritage/graph/swh_2022-11-08 \
--output-location s3://softwareheritage/graph/tmp/athena \
--replace-tables
"""
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path = S3PathParameter()
s3_athena_output_location = S3PathParameter()
athena_db_name = luigi.Parameter()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.s3_export_path.replace("-", "").endswith(
"/" + self.athena_db_name.split("_", 1)[1]
):
raise ValueError(
f"S3 export path ({self.s3_export_path}) does not match "
f"Athena database name ({self.athena_db_name})."
f"They should use these formats: "
f"'s3://<whatever>/YYYY-MM-DD[_<flavor>]/' "
f"and '<prefix>_YYYYMMDD[_<flavor>]"
)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns the corresponding :class:`UploadExportToS3` instance,
with ORC as only format."""
return [
UploadExportToS3(
formats=[Format.orc], # type: ignore[attr-defined]
object_types=self.object_types,
s3_export_path=self.s3_export_path,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns an instance of :class:`AthenaDatabaseTarget`."""
from .athena import TABLES
return [AthenaDatabaseTarget(self.athena_db_name, set(TABLES))]
[docs]
def run(self) -> None:
"""Creates tables from the ORC dataset."""
from .athena import create_tables
create_tables(
self.athena_db_name,
self.s3_export_path,
output_location=self.s3_athena_output_location,
replace=True,
)
[docs]
class RunExportAll(luigi.WrapperTask):
"""Runs both the S3 and Athena export.
Example invocation::
luigi --local-scheduler --module swh.export.luigi RunExportAll \
--ExportGraph-config=graph.staging.yml \
--ExportGraph-processes=12 \
--UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
--athena-db-name=swh_20221108 \
--object-types=origin,origin_visit \
--s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
"""
formats = luigi.EnumListParameter(
enum=Format,
batch_method=merge_lists,
default=[Format.orc], # type: ignore[attr-defined]
)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
s3_export_path = S3PathParameter()
s3_athena_output_location = S3PathParameter()
athena_db_name = luigi.Parameter()
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns instances of :class:`CreateAthena` and :class:`UploadExportToS3`."""
# CreateAthena depends on UploadExportToS3(formats=[edges]), so we need to
# explicitly depend on UploadExportToS3(formats=self.formats) here, to also
# export the formats requested by the user.
return [
CreateAthena(
object_types=self.object_types,
s3_export_path=self.s3_export_path,
s3_athena_output_location=self.s3_athena_output_location,
athena_db_name=self.athena_db_name,
),
UploadExportToS3(
formats=self.formats,
object_types=self.object_types,
s3_export_path=self.s3_export_path,
),
]