# Copyright (C) 2022-2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks to analyze, and produce datasets related to, graph topology
=======================================================================
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
driving the computation of a topological order, and count the number
of paths to every node.
File layout
-----------
This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`)
is present, and generates/manipulates the following files::
base_dir/
<date>[_<flavor>]/
topology/
topological_order_dfs.csv.zst
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import operator
from pathlib import Path
from typing import Dict, List, Tuple
import luigi
from swh.export.luigi import S3PathParameter
from swh.graph.luigi.compressed_graph import LocalGraph
from swh.graph.luigi.utils import _ParquetToS3ToAthenaTask, count_nodes
OBJECT_TYPES = {"ori", "snp", "rel", "rev", "dir", "cnt"}
[docs]
def inverse_direction(direction: str) -> str:
if direction == "forward":
return "backward"
elif direction == "backward":
return "forward"
else:
assert False, f"unknown direction: {direction!r}"
[docs]
class TopoSort(luigi.Task):
"""Creates a file that contains all SWHIDs in topological order from a compressed
graph."""
local_graph_path = luigi.PathParameter()
topology_dir = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
object_types = luigi.Parameter()
direction = luigi.ChoiceParameter(choices=["forward", "backward"])
algorithm = luigi.ChoiceParameter(choices=["dfs", "bfs"], default="dfs")
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns an instance of :class:`LocalGraph`."""
return [LocalGraph(local_graph_path=self.local_graph_path)]
[docs]
def output(self) -> luigi.LocalTarget:
""".csv.zst file that contains the topological order."""
return luigi.LocalTarget(
self.topology_dir
/ f"topological_order_{self.algorithm}_{self.direction}_{self.object_types}.csv.zst"
)
[docs]
def run(self) -> None:
"""Runs 'toposort' command from tools/topology and compresses"""
from swh.graph.shell import AtomicFileSink, Command
from ..shell import Rust
invalid_object_types = set(self.object_types.split(",")) - OBJECT_TYPES
if invalid_object_types:
raise ValueError(f"Invalid object types: {invalid_object_types}")
nb_nodes = count_nodes(
self.local_graph_path, self.graph_name, self.object_types
)
nb_lines = nb_nodes + 1 # CSV header
# fmt: off
(
Rust(
"toposort",
self.local_graph_path / self.graph_name,
"--algorithm", self.algorithm,
"--direction", self.direction,
"--node-types", self.object_types,
)
| Command.pv("--line-mode", "--wait", "--size", str(nb_lines))
| Command.zstdmt("-9") # not -19 because of CPU usage + little gain
> AtomicFileSink(self.output())
).run()
# fmt: on
[docs]
class ComputeGenerations(luigi.Task):
"""Creates a file that contains all SWHIDs in topological order from a compressed
graph."""
local_graph_path = luigi.PathParameter()
topology_dir = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
object_types = luigi.Parameter()
direction = luigi.ChoiceParameter(choices=["forward", "backward"])
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns an instance of :class:`LocalGraph`."""
return [LocalGraph(local_graph_path=self.local_graph_path)]
def _topo_order_path(self) -> Path:
return (
self.topology_dir
/ f"topological_order_{self.direction}_{self.object_types}.bitstream"
)
def _topo_order_offsets_path(self) -> Path:
return (
self.topology_dir
/ f"topological_order_{self.direction}_{self.object_types}.bitstream.offsets"
)
def _topo_order_properties_path(self) -> Path:
return (
self.topology_dir
/ f"topological_order_{self.direction}_{self.object_types}.bitstream.properties.json" # noqa
)
def _depths_path(self) -> Path:
return self.topology_dir / f"depths_{self.direction}_{self.object_types}.bin"
[docs]
def output(self) -> dict[str, luigi.Target]:
""".csv.zst file that contains the topological order."""
return {
"topo_order": luigi.LocalTarget(self._topo_order_path()),
"topo_order_offsets": luigi.LocalTarget(self._topo_order_offsets_path()),
"topo_order_properties": luigi.LocalTarget(
self._topo_order_properties_path()
),
"generations": luigi.LocalTarget(self._depths_path()),
}
[docs]
def run(self) -> None:
"""Runs 'toposort' command from tools/topology and compresses"""
from ..shell import Rust
invalid_object_types = set(self.object_types.split(",")) - OBJECT_TYPES
if invalid_object_types:
raise ValueError(f"Invalid object types: {invalid_object_types}")
# fmt: off
(
Rust(
"generations",
self.local_graph_path / self.graph_name,
"--direction", self.direction,
"--node-types", self.object_types,
"--output-order", self._topo_order_path(),
"--output-depths", self._depths_path(),
)
).run()
# fmt: on
[docs]
class UploadGenerationsToS3(luigi.Task):
"""Uploads the output of :class:`ComputeGenerations` to S3"""
local_graph_path = luigi.PathParameter()
topology_dir = luigi.PathParameter()
dataset_name = luigi.Parameter()
graph_name = luigi.Parameter(default="graph")
object_types = luigi.Parameter()
direction = luigi.ChoiceParameter(choices=["forward", "backward"])
[docs]
def requires(self) -> luigi.Task:
"""Returns an instance of :class:`ComputeGenerations`."""
return ComputeGenerations(
local_graph_path=self.local_graph_path,
topology_dir=self.topology_dir,
graph_name=self.graph_name,
object_types=self.object_types,
direction=self.direction,
)
[docs]
def output(self) -> List[luigi.Target]:
"""Returns .bitstream and .bin paths on S3."""
import luigi.contrib.s3
return [
luigi.contrib.s3.S3Target(f"{self._s3_prefix()}/{filename}")
for filename in self._filenames()
]
def _s3_prefix(self) -> str:
return f"s3://softwareheritage/derived_datasets/{self.dataset_name}/topology"
def _filenames(self) -> List[str]:
return [
f"topological_order_{self.direction}_{self.object_types}.bitstream",
f"topological_order_{self.direction}_{self.object_types}.bitstream.offsets",
f"topological_order_{self.direction}_{self.object_types}.bitstream.properties.json",
f"depths_{self.direction}_{self.object_types}.bin",
]
[docs]
def run(self) -> None:
"""Copies the files"""
import multiprocessing.dummy
import tqdm
self.__status_messages: Dict[Path, str] = {}
actual_filenames = {Path(target.path).name for target in self.input().values()}
filenames = set(self._filenames())
assert (
actual_filenames == filenames
), f"Expected ComputeGenerations to return {filenames}, got {actual_filenames}"
with multiprocessing.dummy.Pool(len(filenames)) as p:
for i, relative_path in tqdm.tqdm(
enumerate(p.imap_unordered(self._upload_file, filenames)),
total=len(filenames),
desc=f"Uploading to {self._s3_prefix()}",
):
self.set_progress_percentage(int(i * 100 / len(filenames)))
self.set_status_message("\n".join(self.__status_messages.values()))
def _upload_file(self, filename: Path) -> Path:
import luigi.contrib.s3
client = luigi.contrib.s3.S3Client()
self.__status_messages[filename] = f"Uploading {filename}"
client.put_multipart(
f"{self.topology_dir}/{filename}",
f"{self._s3_prefix()}/{filename}",
ACL="public-read",
)
del self.__status_messages[filename]
return filename
[docs]
class CountPaths(luigi.Task):
"""Creates a file that lists:
* the number of paths leading to each node, and starting from all leaves, and
* the number of paths leading to each node, and starting from all other nodes
Counts include the trivial self-path (zero-length path from a node to itself).
"""
local_graph_path = luigi.PathParameter()
topology_dir = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
write_parquet = luigi.BoolParameter(default=True, batch_method=operator.or_)
object_types = luigi.Parameter()
direction = luigi.ChoiceParameter(choices=["forward", "backward"])
def _max_ram(self):
nb_nodes = count_nodes(
self.local_graph_path, self.graph_name, self.object_types
)
graph_size = nb_nodes * 8
# Two arrays of floats (countsFromRoots and countsFromAll)
count_doublearray = nb_nodes * 8
spare_space = 100_000_000
return graph_size + count_doublearray * 2 + spare_space
@property
def resources(self):
"""Return the estimated RAM use of this task."""
import socket
return {f"{socket.getfqdn()}_ram_mb": self._max_ram() / 1_000_000}
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`LocalGraph` and one of
:class:`ComputeGenerations`."""
return {
"graph": LocalGraph(local_graph_path=self.local_graph_path),
"generations": ComputeGenerations(
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
topology_dir=self.topology_dir,
object_types=",".join(
obj_type
for obj_type in self.object_types.split(",")
if obj_type != "cnt" # don't need them, always at beginning/end
),
direction=inverse_direction(self.direction),
),
}
[docs]
def output(self) -> dict[str, luigi.LocalTarget]:
"""Parquet directory (if enabled) and binary arrays with path counts."""
base = self.topology_dir / f"path_counts_{self.direction}_{self.object_types}"
d = {
"arrays": luigi.LocalTarget(base),
}
if self.write_parquet:
d["parquet"] = luigi.LocalTarget(base / "parquet")
return d
[docs]
def run(self) -> None:
"""Runs 'count_paths' command from tools/topology"""
from ..shell import Rust
invalid_object_types = set(self.object_types.split(",")) - OBJECT_TYPES
if invalid_object_types:
raise ValueError(f"Invalid object types: {invalid_object_types}")
topological_order_path = self.input()["generations"]["topo_order"].path
if self.write_parquet:
parquet_out = ["--parquet-out", self.output()["parquet"]]
else:
parquet_out = []
# fmt: off
(
Rust(
"count_paths",
self.local_graph_path / self.graph_name,
"--direction",
self.direction,
"--node-types",
self.object_types,
"--topological-order",
topological_order_path,
*parquet_out,
"--array-out",
self.output()["arrays"],
)
).run()
# fmt: on
[docs]
class CountDescendants(luigi.Task):
"""Counts the number of unique descendants (or ancestors) of each node
using a probabilistic HyperLogLog counter.
With ``--direction forward``, for each node counts unique nodes reachable
from it. With ``--direction backward``, counts unique nodes that can reach it.
"""
local_graph_path = luigi.PathParameter()
topology_dir = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
object_types = luigi.Parameter()
write_parquet = luigi.BoolParameter(default=True, batch_method=operator.or_)
direction = luigi.ChoiceParameter(choices=["forward", "backward"])
rsd = luigi.FloatParameter()
seed = luigi.OptionalIntParameter(default=None)
def _max_ram(self):
nb_nodes = count_nodes(
self.local_graph_path, self.graph_name, self.object_types
)
graph_size = nb_nodes * 8
# HyperLogLog registers: ~(1.04/rsd)^2 bytes per node
hll_memory = nb_nodes * (1.04 / self.rsd) ** 2
output_array = nb_nodes * 8
spare_space = 100_000_000
return graph_size + hll_memory + output_array + spare_space
@property
def resources(self):
"""Return the estimated RAM use of this task."""
import socket
return {f"{socket.getfqdn()}_ram_mb": self._max_ram() / 1_000_000}
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`LocalGraph` and one of
:class:`ComputeGenerations`."""
return {
"graph": LocalGraph(local_graph_path=self.local_graph_path),
"generations": ComputeGenerations(
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
topology_dir=self.topology_dir,
object_types=",".join(
obj_type
for obj_type in self.object_types.split(",")
if obj_type != "cnt" # don't need them, always at beginning/end
),
direction=inverse_direction(self.direction),
),
}
[docs]
def output(self) -> dict[str, luigi.LocalTarget]:
"""Parquet directory (if enabled) and binary array with descendant counts."""
base = (
self.topology_dir
/ f"descendant_counts_{self.direction}_{self.object_types}_rsd={self.rsd}"
)
d = {
"arrays": luigi.LocalTarget(base),
"bitfieldvec": luigi.LocalTarget(base / "bitfieldvec"),
}
if self.write_parquet:
d["parquet"] = luigi.LocalTarget(base / "parquet")
return d
[docs]
def run(self) -> None:
"""Runs 'count_descendants' command from tools/topology"""
from ..shell import Rust
invalid_object_types = set(self.object_types.split(",")) - OBJECT_TYPES
if invalid_object_types:
raise ValueError(f"Invalid object types: {invalid_object_types}")
topological_order_path = self.input()["generations"]["topo_order"].path
if self.seed is None:
seed: list[str] = []
else:
seed = ["--seed", str(self.seed)]
if self.write_parquet:
parquet_out = ["--parquet-out", self.output()["parquet"]]
else:
parquet_out = []
# fmt: off
(
Rust(
"count_descendants",
self.local_graph_path / self.graph_name,
"--direction", self.direction,
"--node-types", self.object_types,
"--topological-order", topological_order_path,
"--rsd", str(self.rsd),
*seed,
*parquet_out,
"--array-out", self.output()["arrays"],
"--bitfieldvec-out", self.output()["bitfieldvec"],
)
).run()
# fmt: on
[docs]
class PathCountsParquetToS3(_ParquetToS3ToAthenaTask):
"""Reads the CSV from :class:`CountPaths`, converts it to ORC,
upload the ORC to S3, and create an Athena table for it."""
topology_dir = luigi.PathParameter()
object_types = luigi.Parameter()
direction = luigi.ChoiceParameter(choices=["forward", "backward"])
dataset_name = luigi.Parameter()
s3_athena_output_location = S3PathParameter()
[docs]
def requires(self) -> CountPaths:
"""Returns corresponding CountPaths instance"""
if self.dataset_name not in str(self.topology_dir):
raise Exception(
f"Dataset name {self.dataset_name!r} is not part of the "
f"topology_dir {self.topology_dir!r}"
)
return CountPaths(
topology_dir=self.topology_dir,
object_types=self.object_types,
direction=self.direction,
write_parquet=True,
)
def _base_filename(self) -> str:
return f"path_counts_{self.direction}_{self.object_types}"
def _input_parquet_path(self) -> Path:
return self.topology_dir / f"{self._base_filename()}.csv.zst"
def _s3_bucket(self) -> str:
# TODO: configurable
return "softwareheritage"
def _s3_prefix(self) -> str:
# TODO: configurable
return f"derived_datasets/{self.dataset_name}/{self._base_filename()}"
def _orc_columns(self) -> List[Tuple[str, str]]:
return [
("SWHID", "string"),
("paths_from_roots", "double"),
("all_paths", "double"),
]
def _athena_db_name(self) -> str:
return f"derived_{self.dataset_name.replace('-', '')}"
def _athena_table_name(self) -> str:
return self._base_filename().replace(",", "")