# Copyright (C) 2022-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
r"""
Luigi tasks for compression
===========================
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks,
as an alternative to the CLI that can be composed with other tasks,
such as swh-dataset's.
It implements the task DAG described in :ref:`swh-graph-compression-steps`.
Unlike the CLI, this requires the graph to be named `graph`.
Filtering
---------
The ``object_types`` parameter (``--object-types`` on the CLI) specifies
the set of node types to read from the dataset export, and it defaults to
all types: ``ori,snp,rel,rev,dir,cnt``.
Because the dataset export is keyed by edge sources, some objects
without any of these types will be present in the input dataset. For example,
if exporting ``ori,snp,rel,rev``, root Directory of every release and revision
will be present, though without its labels (as well as a few Content objects
pointed by some Releases).
File layout
-----------
In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`,
:file:`graph.mph`, ...), tasks in this module produce this directory structure::
base_dir/
<date>[_<flavor>]/
compressed/
graph.graph
graph.mph
...
meta/
export.json
compression.json
``graph.meta/export.json`` is copied from the ORC dataset exported by
:mod:`swh.dataset.luigi`.
``graph.meta/compression.json`` contains information about the compression itself,
for provenance tracking.
For example:
.. code-block:: json
[
{
"steps": null,
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"object_types": [
"origin",
"origin_visit"
],
"hostname": "desktop5",
"conf": {},
"tool": {
"name": "swh.graph",
"version": "2.2.0"
},
"commands": [
{
"command": [
"bash",
"-c",
"java it.unimi.dsi.big.webgraph.labelling.BitStreamArcLabelledImmutableGraph --list ..."
],
"cgroup": "/sys/fs/cgroup/user.slice/user-1002.slice/user@1002.service/app.slice/swh.graph@103038/bash@0",
"cgroup_stats": {
"memory.events": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
"memory.events.local": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
"memory.swap.current": "0",
"memory.zswap.current": "0",
"memory.swap.events": "high 0\nmax 0\nfail 0",
"cpu.stat": "usage_usec 531350\nuser_usec 424286\nsystem_usec 107063\n...",
"memory.current": "614400",
"memory.stat": "anon 0\nfile 110592\nkernel 176128\nkernel_stack 0\n...",
"memory.numa_stat": "anon N0=0\nfile N0=110592\nkernel_stack N0=0\n...",
"memory.peak": "49258496"
}
}
]
}
]
When the compression pipeline is run in separate steps, each of the steps is recorded
as an object in the root list.
S3 layout
---------
As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk.
However, this is undesirable on at-rest/long-term storage like on S3, because
some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be
quickly compressed from 300 to 1GB).
Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed
when downloading.
The layout is otherwise the same as the file layout.
""" # noqa
import collections
import itertools
import math
from pathlib import Path
from typing import Any, Dict, List, MutableSequence, Sequence, Set
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import luigi
from swh.dataset.luigi import Format, LocalExport
from swh.dataset.luigi import ObjectType as Table
from swh.dataset.luigi import S3PathParameter
from swh.graph.webgraph import CompressionStep, do_step
_LOW_XMX = 128_000_000
"""Arbitrary value that should work as -Xmx for Java processes which don't need
much memory and as spare for processes which do"""
def _govmph_bitarray_size(nb_nodes: int) -> int:
"""Returns the size of the bitarray used by
it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction depending on the number of
nodes
"""
# https://github.com/vigna/Sux4J/blob/e9fd7412204272a2796e3038e95beb1d8cbc244a/src/it/unimi/dsi/sux4j/mph/GOVMinimalPerfectHashFunction.java#L157-L160
c_times_256 = math.floor((1.09 + 0.01) * 256)
# https://github.com/vigna/Sux4J/blob/e9fd7412204272a2796e3038e95beb1d8cbc244a/src/it/unimi/dsi/sux4j/mph/GOVMinimalPerfectHashFunction.java#L355
return int(2 * (1 + (nb_nodes * c_times_256 >> 8)) / 8)
# This mirrors the long switch statement in
# java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java
# but maps only to the main tables (see swh/dataset/relational.py)
#
# Note that swh-dataset's "object type" (which we refer to as "table" in the module
# to avoid confusion) corresponds to a "main table" of a relational DB, eg.
# "directory" or "origin_visit", but not relational table like "directory_entries".
#
# In swh-graph, "object type" is cnt,dir,rev,rel,snp,ori; which roughly matches
# main tables, but with some mashed together.
_TABLES_PER_OBJECT_TYPE = {
"cnt": [Table.skipped_content, Table.content], # type: ignore[attr-defined]
"dir": [Table.directory], # type: ignore[attr-defined]
"rev": [Table.revision], # type: ignore[attr-defined]
"rel": [Table.release], # type: ignore[attr-defined]
"snp": [Table.snapshot], # type: ignore[attr-defined]
"ori": [
Table.origin, # type: ignore[attr-defined]
Table.origin_visit, # type: ignore[attr-defined]
Table.origin_visit_status, # type: ignore[attr-defined]
],
}
assert set(itertools.chain.from_iterable(_TABLES_PER_OBJECT_TYPE.values())) == set(
Table
)
def _tables_for_object_types(object_types: List[str]) -> List[Table]:
"""Returns the list of ORC tables required to produce a compressed graph with
the given object types."""
tables = []
for object_type in object_types:
tables.extend(_TABLES_PER_OBJECT_TYPE[object_type])
return tables
[docs]
class ObjectTypesParameter(luigi.Parameter):
"""A parameter type whose value is either ``*`` or a set of comma-separated
object types (eg. ``ori,snp,rel,rev,dir,cnt``).
"""
def __init__(self, *args, **kwargs):
""""""
# Only present to override the docstring defined by Luigi; which
# contains a reference to batch_method, which doesn't exist in SWH's doc
super().__init__(*args, **kwargs)
[docs]
def parse(self, s: str) -> List[str]:
if s == "*":
return self.parse(",".join(_TABLES_PER_OBJECT_TYPE))
else:
types = s.split(",")
invalid_types = set(types) - set(_TABLES_PER_OBJECT_TYPE)
if invalid_types:
raise ValueError(f"Invalid object types: {invalid_types!r}")
return types
[docs]
def serialize(self, value: List[str]) -> str:
return ",".join(value)
class _CompressionStepTask(luigi.Task):
STEP: CompressionStep
INPUT_FILES: Set[str]
"""Dependencies of this step."""
OUTPUT_FILES: Set[str]
"""List of files which this task produces, without the graph name as prefix.
"""
EXPORT_AS_INPUT: bool = False
"""Whether this task should depend directly on :class:`LocalExport`.
If not, it is assumed it depends transitiviely via one of the tasks returned
by :meth:`requires`.
"""
USES_ALL_CPU_THREADS: bool = False
""":const:`True` on tasks that use all available CPU for their entire runtime.
These tasks should be scheduled in such a way they do not run at the same time,
because running them concurrently does not improve run time."""
local_export_path = luigi.PathParameter(significant=False)
graph_name = luigi.Parameter(default="graph")
local_graph_path: Path = luigi.PathParameter()
# TODO: Only add this parameter to tasks that use it
batch_size = luigi.IntParameter(
default=0,
significant=False,
description="""
Size of work batches to use while compressing.
Larger is faster, but consumes more resources.
""",
)
rust_executable_dir = luigi.Parameter(
default="./target/release/",
significant=False,
description="Path to the Rust executable used to manipulate the graph.",
)
object_types: list[str] = ObjectTypesParameter() # type: ignore[assignment]
def _get_count(self, count_name: str, task_name: str) -> int:
count_path = self.local_graph_path / f"{self.graph_name}.{count_name}.count.txt"
if not count_path.exists():
# ExtractNodes didn't run yet so we do not know how many there are.
# As it means this task cannot run right now anyway, pretend there are so
# many nodes the task wouldn't fit in memory
print(
f"warning: {self.__class__.__name__}._nb_{count_name} called but "
f"{task_name} did not run yet"
)
return 10**100
return int(count_path.read_text())
def _nb_nodes(self) -> int:
return self._get_count("nodes", "NodeStats")
def _nb_edges(self) -> int:
return self._get_count("edges", "EdgeStats")
def _nb_labels(self) -> int:
return self._get_count("labels", "LabelStats")
def _nb_persons(self) -> int:
return self._get_count("persons", "PersonsStats")
def _bvgraph_allocation(self):
"""Returns the memory needed to load the input .graph of this task."""
# In reverse order of their generationg (and therefore size)
suffixes = [
"-transposed.graph",
".graph",
"-bfs-simplified.graph",
"-bfs.graph",
"-base.graph",
]
suffixes_in_input = set(suffixes) & self.INPUT_FILES
try:
(suffix_in_input,) = suffixes_in_input
except ValueError:
raise ValueError(
f"Expected {self.__class__.__name__} to have exactly one graph as "
f"input, got: {suffixes_in_input}"
) from None
graph_path = self.local_graph_path / f"{self.graph_name}{suffix_in_input}"
if graph_path.exists():
graph_size = graph_path.stat().st_size
else:
# This graph wasn't generated yet.
# As it means this task cannot run right now anyway, assume it is the
# worst case: it's the same size as the previous graph.
for suffix in suffixes[suffixes.index(suffix_in_input) :]:
graph_path = self.local_graph_path / f"{self.graph_name}{suffix}"
if graph_path.exists():
graph_size = graph_path.stat().st_size
break
else:
# No graph was generated yet, so we have to assume the worst
# possible compression ratio (a whole long int for each edge in
# the adjacency lists)
graph_size = 8 * self._nb_edges()
bitvector_size = self._nb_nodes() / 8
# https://github.com/vigna/fastutil/blob/master/src/it/unimi/dsi/fastutil/io/FastMultiByteArrayInputStream.java
fis_size = 1 << 30
fis_size += graph_size
# https://github.com/vigna/webgraph-big/blob/3.7.0/src/it/unimi/dsi/big/webgraph/BVGraph.java#L1443
# https://github.com/vigna/webgraph-big/blob/3.7.0/src/it/unimi/dsi/big/webgraph/BVGraph.java#L1503
offsets_size = self._nb_nodes() * 8
return int(bitvector_size + fis_size + offsets_size)
def _mph_size(self):
# The 2022-12-07 export had 27 billion nodes and its .mph weighted 7.2GB.
# (so, about 0.25 bytes per node)
# We can reasonably assume it won't ever take more than 2 bytes per node.
return self._nb_nodes() * 2
def _persons_mph_size(self):
return self._nb_persons() * 8
def _labels_mph_size(self):
# ditto, but there were 3 billion labels and .mph was 8GB
# (about 2.6 bytes per node)
return self._nb_labels() * 8
def _large_java_allocations(self) -> int:
"""Returns the value to set as the JVM's -Xmx parameter, in bytes"""
raise NotImplementedError(f"{self.__class__.__name__}._large_java_allocations")
def _expected_memory(self) -> int:
"""Returns the expected total memory usage of this task, in bytes."""
extra_memory_coef = 1.1 # accounts for the JVM using more than -Xmx
return int((self._large_java_allocations() + _LOW_XMX) * extra_memory_coef)
@property
def resources(self) -> Dict[str, int]:
import socket
hostname = socket.getfqdn()
d = {
f"{hostname}_ram_mb": self._expected_memory() // (1024 * 1024),
f"{hostname}_max_cpu": int(self.USES_ALL_CPU_THREADS),
}
return d
@resources.setter
def resources(self, value):
raise NotImplementedError("setting 'resources' attribute")
def _stamp(self) -> Path:
"""Returns the path of this tasks's stamp file"""
return self.local_graph_path / "meta" / "stamps" / f"{self.STEP}.json"
def complete(self) -> bool:
"""Returns whether the files are written AND this task's stamp is present in
:file:`meta/compression.json`"""
import json
if not super().complete():
return False
for output in self.output():
path = Path(output.path)
if not path.exists():
raise Exception(f"expected output {path} does not exist")
if path.is_file():
if path.stat().st_size == 0:
if path.name.endswith(
(".labels.fcl.bytearray", ".labels.fcl.pointers")
) and {"dir", "snp", "ori"}.isdisjoint(set(self.object_types)):
# It's expected that .labels.fcl.bytearray is empty when both dir
# and snp are excluded, because these are the only objects
# with labels on their edges.
continue
raise Exception(f"expected output file {path} is empty")
elif path.is_dir():
if next(path.iterdir(), None) is None:
raise Exception(f"expected input directory {path} is empty")
else:
raise Exception(f"expected output {path} is not a file or directory")
if not self._stamp().is_file():
with self._stamp().open() as fd:
json.load(fd) # Check it was fully written
return True
def requires(self) -> Sequence[luigi.Task]:
"""Returns a list of luigi tasks matching :attr:`PREVIOUS_STEPS`."""
requirements_d = {}
for input_file in self.INPUT_FILES:
for cls in _CompressionStepTask.__subclasses__():
if input_file in cls.OUTPUT_FILES:
kwargs = dict(
local_export_path=self.local_export_path,
graph_name=self.graph_name,
local_graph_path=self.local_graph_path,
object_types=self.object_types,
rust_executable_dir=self.rust_executable_dir,
)
if self.batch_size:
kwargs["batch_size"] = self.batch_size
requirements_d[cls.STEP] = cls(**kwargs)
break
else:
assert False, f"Found no task outputting file '{input_file}'."
requirements: MutableSequence[luigi.Task] = list(requirements_d.values())
if self.EXPORT_AS_INPUT:
requirements.append(
LocalExport(
local_export_path=self.local_export_path,
formats=[Format.orc], # type: ignore[attr-defined]
object_types=_tables_for_object_types(self.object_types),
)
)
return requirements
def output(self) -> List[luigi.LocalTarget]:
"""Returns a list of luigi targets matching :attr:`OUTPUT_FILES`."""
return [luigi.LocalTarget(self._stamp())] + [
luigi.LocalTarget(f"{self.local_graph_path / self.graph_name}{name}")
for name in self.OUTPUT_FILES
]
def run(self) -> None:
"""Runs the step, by shelling out to the relevant Java program"""
import datetime
import json
import socket
import time
import pkg_resources
from swh.graph.config import check_config_compress
for input_file in self.INPUT_FILES:
path = self.local_graph_path / f"{self.graph_name}{input_file}"
if not path.exists():
raise Exception(f"expected input {path} does not exist")
if path.is_file():
if path.stat().st_size == 0:
raise Exception(f"expected input file {path} is empty")
elif path.is_dir():
if next(path.iterdir(), None) is None:
raise Exception(f"expected input directory {path} is empty")
else:
raise Exception(f"expected output {path} is not a file or directory")
if self.EXPORT_AS_INPUT:
export_meta = json.loads(
(self.local_export_path / "meta/export.json").read_text()
)
expected_tables = {
table.name for table in _tables_for_object_types(self.object_types)
}
missing_tables = expected_tables - set(export_meta["object_types"])
if missing_tables:
raise Exception(
f"Want to compress graph with object types {self.object_types!r} "
f"(meaning tables {expected_tables!r} are needed) "
f"but export has only tables {export_meta['object_types']!r}. "
f"Missing tables: {missing_tables!r}"
)
conf: dict[str, Any] = {
"object_types": ",".join(self.object_types),
"max_ram": f"{(self._large_java_allocations() + _LOW_XMX)//(1024*1024)}M",
# TODO: make this more configurable
}
if self.batch_size:
conf["batch_size"] = self.batch_size
if self.STEP == CompressionStep.LLP and self.gammas: # type: ignore[attr-defined]
conf["llp_gammas"] = self.gammas # type: ignore[attr-defined]
conf["rust_executable_dir"] = self.rust_executable_dir
conf = check_config_compress(
conf,
graph_name=self.graph_name,
in_dir=self.local_export_path / "orc",
out_dir=self.local_graph_path,
)
start_date = datetime.datetime.now(tz=datetime.timezone.utc)
start_time = time.monotonic()
run_results = do_step(step=self.STEP, conf=conf)
end_time = time.monotonic()
end_date = datetime.datetime.now(tz=datetime.timezone.utc)
stamp_content = {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"runtime": end_time - start_time,
"hostname": socket.getfqdn(),
"object_types": self.object_types,
"conf": conf,
"tool": {
"name": "swh.graph",
"version": pkg_resources.get_distribution("swh.graph").version,
},
"commands": [
{
"command": res.command,
"cgroup": str(res.cgroup) if res.cgroup else None,
"cgroup_stats": res.cgroup_stats,
}
for res in run_results
],
}
self._stamp().parent.mkdir(parents=True, exist_ok=True)
with self._stamp().open("w") as fd:
json.dump(stamp_content, fd, indent=4)
[docs]
class NodeStats(_CompressionStepTask):
STEP = CompressionStep.NODE_STATS
INPUT_FILES = {".nodes/"}
OUTPUT_FILES = {
".nodes.count.txt",
".nodes.stats.txt",
}
priority = 100
"""high priority, to help the scheduler allocate resources"""
def _large_java_allocations(self) -> int:
return 0
[docs]
class EdgeStats(_CompressionStepTask):
STEP = CompressionStep.EDGE_STATS
EXPORT_AS_INPUT = True
INPUT_FILES: Set[str] = set()
OUTPUT_FILES = {
".edges.count.txt",
".edges.stats.txt",
}
priority = 100
"""high priority, to help the scheduler allocate resources"""
def _large_java_allocations(self) -> int:
return 0
[docs]
class LabelStats(_CompressionStepTask):
STEP = CompressionStep.LABEL_STATS
INPUT_FILES = {".labels.csv.zst"}
OUTPUT_FILES = {
".labels.count.txt",
}
USES_ALL_CPU_THREADS = True
priority = 100
"""high priority, to help the scheduler allocate resources"""
def _large_java_allocations(self) -> int:
return 0
[docs]
class Mph(_CompressionStepTask):
STEP = CompressionStep.MPH
INPUT_FILES = {".nodes/", ".nodes.count.txt"}
OUTPUT_FILES = {".pthash"}
USES_ALL_CPU_THREADS = True
def _large_java_allocations(self) -> int:
return 0
[docs]
class Bv(_CompressionStepTask):
STEP = CompressionStep.BV
EXPORT_AS_INPUT = True
INPUT_FILES = {".pthash"}
OUTPUT_FILES = {"-base.graph"}
def _large_java_allocations(self) -> int:
import psutil
# TODO: deduplicate this formula with the one for DEFAULT_BATCH_SIZE in
# ScatteredArcsORCGraph.java
batch_size = psutil.virtual_memory().total * 0.4 / (8 * 2)
return int(self._mph_size() + batch_size)
[docs]
class BvEf(_CompressionStepTask):
STEP = CompressionStep.BV_EF
INPUT_FILES = {"-base.graph"}
OUTPUT_FILES = {"-base.ef"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class BfsRoots(_CompressionStepTask):
STEP = CompressionStep.BFS_ROOTS
EXPORT_AS_INPUT = True
INPUT_FILES = set()
OUTPUT_FILES = {"-bfs.roots.txt"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class Bfs(_CompressionStepTask):
STEP = CompressionStep.BFS
INPUT_FILES = {"-base.graph", "-base.ef", "-bfs.roots.txt", ".pthash"}
OUTPUT_FILES = {"-bfs.order"}
def _large_java_allocations(self) -> int:
bvgraph_size = self._bvgraph_allocation()
visitorder_size = self._nb_nodes() * 8 # longarray in BFS.java
extra_size = 500_000_000 # TODO: why is this needed?
return bvgraph_size + visitorder_size + extra_size
[docs]
class PermuteAndSimplifyBfs(_CompressionStepTask):
STEP = CompressionStep.PERMUTE_AND_SIMPLIFY_BFS
INPUT_FILES = {"-base.graph", "-base.ef", "-bfs.order"}
OUTPUT_FILES = {"-bfs-simplified.graph"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class BfsEf(_CompressionStepTask):
STEP = CompressionStep.BFS_EF
INPUT_FILES = {"-bfs-simplified.graph"}
OUTPUT_FILES = {"-bfs-simplified.ef"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class BfsDcf(_CompressionStepTask):
STEP = CompressionStep.BFS_DCF
INPUT_FILES = {"-bfs-simplified.graph"}
OUTPUT_FILES = {"-bfs-simplified.dcf"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class Llp(_CompressionStepTask):
STEP = CompressionStep.LLP
INPUT_FILES = {"-bfs-simplified.graph", "-bfs-simplified.ef", "-bfs-simplified.dcf"}
OUTPUT_FILES = {"-llp.order"}
gammas = luigi.Parameter(significant=False, default=None)
def _large_java_allocations(self) -> int:
# actually it loads the simplified graph, but we reuse the size of the
# base BVGraph, to simplify this code
bvgraph_size = self._bvgraph_allocation()
label_array_size = volume_array_size = permutation_size = major_array_size = (
self._nb_nodes() * 8
) # longarrays
canchange_array_size = (
self._nb_nodes() * 4
) # bitarray, not optimized like a bitvector
return (
label_array_size
+ bvgraph_size
+ volume_array_size
+ permutation_size * 3 # 'intLabel', 'major', and next value of 'major'
+ major_array_size
+ canchange_array_size
)
[docs]
class PermuteLlp(_CompressionStepTask):
STEP = CompressionStep.PERMUTE_LLP
INPUT_FILES = {".pthash.order", "-base.graph", "-base.ef"}
OUTPUT_FILES = {".graph", ".properties"}
def _large_java_allocations(self) -> int:
from swh.graph.config import check_config
# TODO: deduplicate this with PermuteBfs; it runs the same code except for the
# batch size
if self.batch_size:
batch_size = self.batch_size
else:
batch_size = check_config({})["batch_size"]
# https://github.com/vigna/webgraph-big/blob/3.7.0/src/it/unimi/dsi/big/webgraph/Transform.java#L2196
permutation_size = self._nb_nodes() * 8
# https://github.com/vigna/webgraph-big/blob/3.7.0/src/it/unimi/dsi/big/webgraph/Transform.java#L2196
source_batch_size = target_batch_size = batch_size * 8 # longarrays
extra_size = self._nb_nodes() * 16 # FIXME: why is this needed?
return permutation_size + source_batch_size + target_batch_size + extra_size
[docs]
class Offsets(_CompressionStepTask):
STEP = CompressionStep.OFFSETS
INPUT_FILES = {".graph"}
OUTPUT_FILES = {".offsets"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class Ef(_CompressionStepTask):
STEP = CompressionStep.EF
INPUT_FILES = {".graph", ".offsets"}
OUTPUT_FILES = {".ef"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class ComposeOrders(_CompressionStepTask):
STEP = CompressionStep.COMPOSE_ORDERS
INPUT_FILES = {"-llp.order", "-bfs.order"}
OUTPUT_FILES = {".pthash.order"}
def _large_java_allocations(self) -> int:
permutation_size = self._nb_nodes() * 8 # longarray
return permutation_size * 3
[docs]
class Transpose(_CompressionStepTask):
STEP = CompressionStep.TRANSPOSE
# .obl is an optional input; but we need to make sure it's not being written
# while Transpose is starting, or Transpose would error with EOF while reading it
INPUT_FILES = {".graph", ".ef"}
OUTPUT_FILES = {"-transposed.graph", "-transposed.properties"}
def _large_java_allocations(self) -> int:
from swh.graph.config import check_config
if self.batch_size:
batch_size = self.batch_size
else:
batch_size = check_config({})["batch_size"]
permutation_size = self._nb_nodes() * 8 # longarray
source_batch_size = target_batch_size = start_batch_size = (
batch_size * 8
) # longarrays
return (
permutation_size + source_batch_size + target_batch_size + start_batch_size
)
[docs]
class TransposeOffsets(_CompressionStepTask):
STEP = CompressionStep.TRANSPOSE_OFFSETS
INPUT_FILES = {"-transposed.graph"}
OUTPUT_FILES = {"-transposed.offsets"}
def _large_java_allocations(self) -> int:
bvgraph_size = self._bvgraph_allocation()
return bvgraph_size
[docs]
class TransposeEf(_CompressionStepTask):
STEP = CompressionStep.TRANSPOSE_EF
INPUT_FILES = {"-transposed.graph", "-transposed.offsets"}
OUTPUT_FILES = {"-transposed.ef"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class Maps(_CompressionStepTask):
STEP = CompressionStep.MAPS
INPUT_FILES = {".pthash", ".pthash.order", ".nodes/"}
OUTPUT_FILES = {".node2swhid.bin"}
def _large_java_allocations(self) -> int:
mph_size = self._mph_size()
bfsmap_size = self._nb_nodes() * 8 # longarray
return mph_size + bfsmap_size
[docs]
class PersonsStats(_CompressionStepTask):
STEP = CompressionStep.PERSONS_STATS
INPUT_FILES = {".persons.csv.zst"}
OUTPUT_FILES = {".persons.count.txt"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class MphPersons(_CompressionStepTask):
STEP = CompressionStep.MPH_PERSONS
INPUT_FILES = {".persons.csv.zst", ".persons.count.txt"}
OUTPUT_FILES = {".persons.pthash"}
def _large_java_allocations(self) -> int:
bitvector_size = _govmph_bitarray_size(self._nb_persons())
return bitvector_size
[docs]
class NodeProperties(_CompressionStepTask):
STEP = CompressionStep.NODE_PROPERTIES
INPUT_FILES = {".pthash.order", ".pthash", ".persons.pthash", ".node2swhid.bin"}
EXPORT_AS_INPUT = True
OUTPUT_FILES = {
".property.content.is_skipped.bits",
} | {
f".property.{name}.bin"
for name in (
"author_id",
"author_timestamp",
"author_timestamp_offset",
"committer_id",
"committer_timestamp",
"committer_timestamp_offset",
"content.length",
"message",
"message.offset",
"tag_name",
"tag_name.offset",
)
}
priority = 10
"""semi-high priority because it takes a very long time to run"""
[docs]
def output(self) -> List[luigi.LocalTarget]:
"""Returns a list of luigi targets matching :attr:`OUTPUT_FILES`."""
excluded_files = set()
if "cnt" not in self.object_types:
excluded_files |= {
"content.is_skipped.bits",
"content.length.bin",
}
if "rev" not in self.object_types and "rel" not in self.object_types:
excluded_files |= {
"author_id.bin",
"author_timestamp.bin",
"author_timestamp_offset.bin",
"message.bin",
"message.offset.bin",
}
if "rel" not in self.object_types:
excluded_files |= {
"tag_name.bin",
"tag_name.offset.bin",
}
if "rev" not in self.object_types:
excluded_files |= {
"committer_id.bin",
"committer_timestamp.bin",
"committer_timestamp_offset.bin",
}
excluded_files = {f".property.{name}" for name in excluded_files}
return [luigi.LocalTarget(self._stamp())] + [
luigi.LocalTarget(f"{self.local_graph_path / self.graph_name}{name}")
for name in self.OUTPUT_FILES - excluded_files
]
def _large_java_allocations(self) -> int:
# each property has its own arrays, but they don't run at the same time.
# The biggest are:
# * content length/writeMessages/writeTagNames (long array)
# * writeTimestamps (long array + short array)
# * writePersonIds (int array, but also loads the person MPH)
subtask_size = max(
self._nb_nodes() * (8 + 2), # writeTimestamps
self._nb_nodes() * 4 + self._persons_mph_size(),
)
return self._mph_size() + self._persons_mph_size() + subtask_size
[docs]
class PthashLabels(_CompressionStepTask):
STEP = CompressionStep.MPH_LABELS
INPUT_FILES = {".labels.csv.zst", ".labels.count.txt"}
OUTPUT_FILES = {".labels.pthash"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class LabelsOrder(_CompressionStepTask):
STEP = CompressionStep.LABELS_ORDER
INPUT_FILES = {".labels.csv.zst", ".labels.pthash", ".labels.count.txt"}
OUTPUT_FILES = {".labels.pthash.order"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class FclLabels(_CompressionStepTask):
STEP = CompressionStep.FCL_LABELS
INPUT_FILES = {".labels.csv.zst", ".labels.count.txt"}
OUTPUT_FILES = {
".labels.fcl.bytearray",
".labels.fcl.pointers",
".labels.fcl.properties",
}
def _large_java_allocations(self) -> int:
return self._labels_mph_size()
[docs]
class EdgeLabels(_CompressionStepTask):
STEP = CompressionStep.EDGE_LABELS
INPUT_FILES = {
".labels.pthash",
".labels.pthash.order",
".pthash",
".pthash.order",
}
EXPORT_AS_INPUT = True
OUTPUT_FILES = {
"-labelled.labeloffsets",
"-labelled.labels",
"-labelled.properties",
}
priority = 10
"""semi-high priority because it takes a long time to run"""
def _large_java_allocations(self) -> int:
import multiprocessing
# See ExtractNodes._large_java_allocations for this constant
orc_buffers_size = 256_000_000
nb_orc_readers = multiprocessing.cpu_count()
return (
orc_buffers_size * nb_orc_readers
+ self._mph_size()
+ self._labels_mph_size()
)
[docs]
class EdgeLabelsTranspose(_CompressionStepTask):
STEP = CompressionStep.EDGE_LABELS_TRANSPOSE
INPUT_FILES = {
".labels.pthash",
".labels.pthash.order",
".pthash",
".pthash.order",
}
EXPORT_AS_INPUT = True
OUTPUT_FILES = {
"-transposed-labelled.labeloffsets",
"-transposed-labelled.labels",
"-transposed-labelled.properties",
}
priority = 10
"""semi-high priority because it takes a long time to run"""
def _large_java_allocations(self) -> int:
import multiprocessing
# See ExtractNodes._large_java_allocations for this constant
orc_buffers_size = 256_000_000
nb_orc_readers = multiprocessing.cpu_count()
return (
orc_buffers_size * nb_orc_readers
+ self._mph_size()
+ self._labels_mph_size()
)
[docs]
class EdgeLabelsEf(_CompressionStepTask):
STEP = CompressionStep.EDGE_LABELS_EF
INPUT_FILES = {"-labelled.labels", "-labelled.labeloffsets"}
OUTPUT_FILES = {"-labelled.ef"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class EdgeLabelsTransposeEf(_CompressionStepTask):
STEP = CompressionStep.EDGE_LABELS_TRANSPOSE_EF
INPUT_FILES = {"-transposed-labelled.labels", "-transposed-labelled.labeloffsets"}
OUTPUT_FILES = {"-transposed-labelled.ef"}
def _large_java_allocations(self) -> int:
return 0
[docs]
class Stats(_CompressionStepTask):
STEP = CompressionStep.STATS
INPUT_FILES = {".graph", ".graph.ef", "-transposed.graph", "-transposed.graph.ef"}
OUTPUT_FILES = {".stats"}
def _large_java_allocations(self) -> int:
return 0
_duplicate_steps = [
step
for (step, count) in collections.Counter(
cls.STEP for cls in _CompressionStepTask.__subclasses__()
).items()
if count != 1
]
assert not _duplicate_steps, f"Duplicate steps: {_duplicate_steps}"
_duplicate_outputs = [
filename
for (filename, count) in collections.Counter(
filename
for cls in _CompressionStepTask.__subclasses__()
for filename in cls.OUTPUT_FILES
).items()
if count != 1
]
assert not _duplicate_outputs, f"Duplicate outputs: {_duplicate_outputs}"
[docs]
class CompressGraph(luigi.Task):
local_export_path = luigi.PathParameter(significant=False)
graph_name = luigi.Parameter(default="graph")
local_graph_path: Path = luigi.PathParameter()
batch_size = luigi.IntParameter(
default=0,
significant=False,
description="""
Size of work batches to use while compressing.
Larger is faster, but consumes more resources.
""",
)
rust_executable_dir = luigi.Parameter(
default="./target/release/",
significant=False,
description="Path to the Rust executable used to manipulate the graph.",
)
object_types: list[str] = ObjectTypesParameter( # type: ignore[assignment]
default=list(_TABLES_PER_OBJECT_TYPE)
)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`LocalExport` task, and leaves of the compression dependency
graph"""
kwargs = dict(
local_export_path=self.local_export_path,
graph_name=self.graph_name,
local_graph_path=self.local_graph_path,
object_types=self.object_types,
rust_executable_dir=self.rust_executable_dir,
)
if set(self.object_types).isdisjoint({"dir", "snp", "ori"}):
# Only nodes of these three types have outgoing arcs with labels
label_tasks = []
else:
label_tasks = [
EdgeStats(**kwargs),
LabelStats(**kwargs),
FclLabels(**kwargs),
EdgeLabelsEf(**kwargs),
EdgeLabelsTransposeEf(**kwargs),
]
return [
LocalExport(
local_export_path=self.local_export_path,
formats=[Format.orc], # type: ignore[attr-defined]
object_types=_tables_for_object_types(self.object_types),
),
NodeStats(**kwargs),
TransposeEf(**kwargs),
Maps(**kwargs),
NodeProperties(**kwargs),
*label_tasks,
]
[docs]
def output(self) -> List[luigi.LocalTarget]:
"""Returns the ``meta/*.json`` targets"""
return [self._export_meta(), self._compression_meta()]
def _export_meta(self) -> luigi.LocalTarget:
"""Returns the metadata on the dataset export"""
return luigi.LocalTarget(self.local_graph_path / "meta/export.json")
def _compression_meta(self) -> luigi.LocalTarget:
"""Returns the metadata on the compression pipeline"""
return luigi.LocalTarget(self.local_graph_path / "meta/compression.json")
[docs]
def run(self):
"""Runs the full compression pipeline, then writes :file:`meta/compression.json`
This does not support running individual steps yet."""
import json
import socket
import pkg_resources
from swh.graph.config import check_config_compress
conf = {
"object_types": ",".join(self.object_types),
}
if self.batch_size:
conf["batch_size"] = self.batch_size
conf = check_config_compress(
conf,
graph_name=self.graph_name,
in_dir=self.local_export_path,
out_dir=self.local_graph_path,
)
step_stamp_paths = []
for step in CompressionStep:
if step == CompressionStep.CLEAN_TMP:
# This step is not run as its own Luigi task
continue
path = self.local_graph_path / "meta" / "stamps" / f"{step}.json"
if path.exists():
step_stamp_paths.append(path)
steps = [json.loads(path.read_text()) for path in step_stamp_paths]
do_step(CompressionStep.CLEAN_TMP, conf=conf)
# Copy export metadata
with open(self._export_meta().path, "wb") as fd:
fd.write((self.local_export_path / "meta" / "export.json").read_bytes())
if self._compression_meta().exists():
with self._compression_meta().open("r") as fd:
meta = json.load(fd)
else:
meta = []
meta.append(
{
"steps": steps,
"start": min(step["start"] for step in steps),
"end": max(step["end"] for step in steps),
"total_runtime": sum(step["runtime"] for step in steps),
"object_types": ",".join(self.object_types),
"hostname": socket.getfqdn(),
"conf": conf,
"tool": {
"name": "swh.graph",
"version": pkg_resources.get_distribution("swh.graph").version,
},
}
)
with self._compression_meta().open("w") as fd:
json.dump(meta, fd, indent=4)
for path in step_stamp_paths:
path.unlink()
[docs]
class UploadGraphToS3(luigi.Task):
"""Uploads a local compressed graphto S3; creating automatically if it does
not exist.
Example invocation::
luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \
--local-graph-path=graph/ \
--s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
"""
local_graph_path = luigi.PathParameter(significant=False)
s3_graph_path = S3PathParameter()
parallelism = luigi.IntParameter(default=10, significant=False)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`CompressGraph` task that writes local files at the
expected location."""
return [
CompressGraph(
local_graph_path=self.local_graph_path,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on S3."""
return [self._meta()]
def _meta(self):
import luigi.contrib.s3
return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json")
[docs]
def run(self) -> None:
"""Copies all files: first the graph itself, then :file:`meta/compression.json`."""
import multiprocessing.dummy
import luigi.contrib.s3
import tqdm
# working threads import it, we need to make sure it is imported so they don't
# race to the import
from ..shell import Command # noqa
compression_metadata_path = self.local_graph_path / "meta" / "compression.json"
seen_compression_metadata = False
# recursively copy local files to S3, and end with compression metadata
paths = []
for path in self.local_graph_path.glob("**/*"):
if path == compression_metadata_path:
# Write it last
seen_compression_metadata = True
continue
if path.is_dir():
continue
paths.append(path)
assert (
seen_compression_metadata
), "did not see meta/compression.json in directory listing"
self.__status_messages: Dict[Path, str] = {}
client = luigi.contrib.s3.S3Client()
with multiprocessing.dummy.Pool(self.parallelism) as p:
for i, relative_path in tqdm.tqdm(
enumerate(p.imap_unordered(self._upload_file, paths)),
total=len(paths),
desc="Uploading compressed graph",
):
self.set_progress_percentage(int(i * 100 / len(paths)))
self.set_status_message("\n".join(self.__status_messages.values()))
# Write it last, to act as a stamp
client.put(
compression_metadata_path,
self._meta().path,
ACL="public-read",
)
def _upload_file(self, path):
import tempfile
import luigi.contrib.s3
from ..shell import Command
client = luigi.contrib.s3.S3Client()
relative_path = path.relative_to(self.local_graph_path)
if path.suffix == ".bin":
# Large sparse file; store it compressed on S3.
with tempfile.NamedTemporaryFile(prefix=path.stem, suffix=".bin.zst") as fd:
self.__status_messages[path] = f"Compressing {relative_path}"
Command.zstdmt(
"--force", "--force", "--keep", path, "-o", fd.name
).run()
self.__status_messages[path] = f"Uploading {relative_path} (compressed)"
client.put_multipart(
fd.name,
f"{self.s3_graph_path}/{relative_path}.zst",
ACL="public-read",
)
else:
self.__status_messages[path] = f"Uploading {relative_path}"
client.put_multipart(
path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read"
)
del self.__status_messages[path]
return relative_path
[docs]
class DownloadGraphFromS3(luigi.Task):
"""Downloads a local dataset graph from S3.
This performs the inverse operation of :class:`UploadGraphToS3`
Example invocation::
luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \
--local-graph-path=graph/ \
--s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/
"""
local_graph_path: Path = luigi.PathParameter()
s3_graph_path: str = S3PathParameter(significant=False) # type: ignore[assignment]
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`UploadGraphToS3` task that writes local files to S3."""
return [
UploadGraphToS3(
local_graph_path=self.local_graph_path,
s3_graph_path=self.s3_graph_path,
)
]
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _meta(self):
return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json")
[docs]
def run(self) -> None:
"""Copies all files: first the graph itself, then :file:`meta/compression.json`."""
from swh.graph.download import GraphDownloader
GraphDownloader(
local_graph_path=self.local_graph_path,
s3_graph_path=self.s3_graph_path,
parallelism=10,
).download_graph(
progress_percent_cb=self.set_progress_percentage,
progress_status_cb=self.set_status_message,
)
[docs]
class LocalGraph(luigi.Task):
"""Task that depends on a local dataset being present -- either directly from
:class:`ExportGraph` or via :class:`DownloadGraphFromS3`.
"""
local_graph_path: Path = luigi.PathParameter()
compression_task_type = luigi.TaskParameter(
default=DownloadGraphFromS3,
significant=False,
description="""The task used to get the compressed graph if it is not present.
Should be either ``swh.graph.luigi.CompressGraph`` or
``swh.graph.luigi.DownloadGraphFromS3``.""",
)
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns an instance of either :class:`CompressGraph` or
:class:`DownloadGraphFromS3` depending on the value of
:attr:`compression_task_type`."""
if issubclass(self.compression_task_type, CompressGraph):
return [
CompressGraph(
local_graph_path=self.local_graph_path,
)
]
elif issubclass(self.compression_task_type, DownloadGraphFromS3):
return [
DownloadGraphFromS3(
local_graph_path=self.local_graph_path,
)
]
else:
raise ValueError(
f"Unexpected compression_task_type: "
f"{self.compression_task_type.__name__}"
)
[docs]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _meta(self):
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json")