Source code for swh.graph.luigi.provenance

# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""
Luigi tasks to help compute the provenance of content blobs
===========================================================

This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
driving the computation of the :ref:`provenance-index`.
"""

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from pathlib import Path
from typing import Dict

import luigi
import psutil

from .compressed_graph import LocalGraph
from .utils import count_nodes


[docs] def default_max_ram_mb() -> int: return psutil.virtual_memory().total // 1_000_000
[docs] class ListProvenanceNodes(luigi.Task): """Lists all nodes reachable from releases and 'head revisions'.""" local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads")
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns :class:`LocalGraph` and :class:`SortRevrelByDate` instances.""" return { "graph": LocalGraph(local_graph_path=self.local_graph_path), }
def _arrow_output_path(self) -> Path: return self.provenance_dir / "nodes"
[docs] def output(self) -> Dict[str, luigi.LocalTarget]: """Returns :file:`{provenance_dir}/nodes/`""" return luigi.LocalTarget(self._arrow_output_path())
[docs] def run(self) -> None: """Runs ``list-provenance-nodes`` from ``tools/provenance``""" from ..shell import Rust print("listing nodes to", self._arrow_output_path()) # fmt: off ( Rust( "list-provenance-nodes", self.local_graph_path / self.graph_name, "--node-filter", self.provenance_node_filter, "--nodes-out", str(self._arrow_output_path()), ) ).run()
# fmt: on
[docs] class ComputeEarliestTimestamps(luigi.Task): """Creates an array storing, for each directory/content SWHIDs, the author date of the first revision/release that contains it. """ local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads") def _max_ram(self): # see java/src/main/java/org/softwareheritage/graph/utils/ListEarliestRevisions.java nb_nodes = count_nodes( self.local_graph_path, self.graph_name, "ori,snp,rel,rev,dir,cnt" ) visited_bitarray = nb_nodes // 8 timestamps_array = nb_nodes * 8 graph_size = nb_nodes * 8 spare_space = 1_000_000_000 return graph_size + visited_bitarray + timestamps_array + spare_space @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self._max_ram() // 1_000_000}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns :class:`LocalGraph` and :class:`SortRevrelByDate` instances.""" return { "graph": LocalGraph(local_graph_path=self.local_graph_path), }
def _bin_timestamps_output_path(self) -> Path: return self.provenance_dir / "earliest_timestamps.bin"
[docs] def output(self) -> Dict[str, luigi.LocalTarget]: """Returns :file:`{provenance_dir}/revrel_by_author_date/` and `:file:`{provenance_dir}/earliest_timestamps.bin`.""" return { "bin_timestamps": luigi.LocalTarget(self._bin_timestamps_output_path()), }
[docs] def run(self) -> None: """Runs ``compute-earliest-timestamps`` from ``tools/provenance``""" from ..shell import Rust # fmt: off ( Rust( "compute-earliest-timestamps", "--node-filter", self.provenance_node_filter, self.local_graph_path / self.graph_name, "--timestamps-out", str(self._bin_timestamps_output_path()), ) ).run()
# fmt: on
[docs] class ListDirectoryMaxLeafTimestamp(luigi.Task): """Creates a file that contains all directory/content SWHIDs, along with the first revision/release author date and SWHIDs they occur in. """ local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads") def _max_ram(self): # see # java/src/main/java/org/softwareheritage/graph/utils/ListDirectoryMaxLeafTimestamp.java nb_nodes = count_nodes( self.local_graph_path, self.graph_name, "ori,snp,rel,rev,dir,cnt" ) unvisitedchildren_array = maxtimestamps_array = nb_nodes * 8 graph_size = nb_nodes * 8 spare_space = 1_000_000_000 return graph_size + unvisitedchildren_array + maxtimestamps_array + spare_space @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self._max_ram() // 1_000_000}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns :class:`LocalGraph` and :class:`ComputeEarliestTimestamps` instances.""" kwargs = dict( local_export_path=self.local_export_path, local_graph_path=self.local_graph_path, graph_name=self.graph_name, provenance_dir=self.provenance_dir, provenance_node_filter=self.provenance_node_filter, ) return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "earliest_revisions": ComputeEarliestTimestamps(**kwargs), "reachable_nodes": ListProvenanceNodes(**kwargs), }
def _output_path(self) -> Path: return self.provenance_dir / "max_leaf_timestamps.bin"
[docs] def output(self) -> luigi.LocalTarget: """Returns {provenance_dir}/max_leaf_timestamps.bin""" return luigi.LocalTarget(self._output_path())
[docs] def run(self) -> None: """Runs ``list-directory-with-max-leaf-timestamp`` from ``tools/provenance``""" from ..shell import Rust # fmt: off ( Rust( "list-directory-with-max-leaf-timestamp", self.local_graph_path / self.graph_name, "--node-filter", self.provenance_node_filter, "--reachable-nodes", self.input()["reachable_nodes"], "--timestamps", self.input()["earliest_revisions"]["bin_timestamps"], "--max-timestamps-out", str(self._output_path()), ) ).run()
# fmt: on
[docs] class ComputeDirectoryFrontier(luigi.Task): """Creates a file that contains the "directory frontier" as defined by `swh-provenance <https://gitlab.softwareheritage.org/swh/devel/swh-provenance/>`_. In short, it is a directory which directly contains a file (not a directory), which is a non-root directory in a revision newer than the directory timestamp computed by ListDirectoryMaxLeafTimestamp. """ local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads") max_ram_mb = luigi.IntParameter(default=default_max_ram_mb(), significant=False) @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self.max_ram_mb}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns :class:`LocalGraph` and :class:`ListDirectoryMaxLeafTimestamp` instances.""" return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "directory_max_leaf_timestamps": ListDirectoryMaxLeafTimestamp( local_export_path=self.local_export_path, local_graph_path=self.local_graph_path, graph_name=self.graph_name, provenance_dir=self.provenance_dir, provenance_node_filter=self.provenance_node_filter, ), }
def _output_path(self) -> Path: return self.provenance_dir / "directory_frontier"
[docs] def output(self) -> luigi.LocalTarget: """Returns {provenance_dir}/directory_frontier/""" return luigi.LocalTarget(self._output_path())
[docs] def run(self) -> None: """Runs ``compute-directory-frontier`` from ``tools/provenance``""" import multiprocessing from ..shell import Rust # fmt: off ( Rust( "compute-directory-frontier", self.local_graph_path / self.graph_name, "--thread-buffer-size", str(self.max_ram_mb * 1_000_000 // multiprocessing.cpu_count()), "--node-filter", self.provenance_node_filter, "--max-timestamps", self.input()["directory_max_leaf_timestamps"], "--directories-out", self.output(), ) ).run()
# fmt: on
[docs] class ListFrontierDirectoriesInRevisions(luigi.Task): """Creates a file that contains the list of revision any "frontier directory" (as defined by `swh-provenance <https://gitlab.softwareheritage.org/swh/devel/swh-provenance/>`_) is in. While a directory is considered frontier only relative to a revision, the produced file contains the list of **all** revisions a directory is in, for directories which are frontier for **any** revision. """ local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads") max_ram_mb = luigi.IntParameter(default=default_max_ram_mb(), significant=False) @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self.max_ram_mb}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns :class:`LocalGraph` and :class:`ComputeDirectoryFrontier` instances.""" kwargs = dict( local_export_path=self.local_export_path, local_graph_path=self.local_graph_path, graph_name=self.graph_name, provenance_dir=self.provenance_dir, provenance_node_filter=self.provenance_node_filter, ) return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "reachable_nodes": ListProvenanceNodes(**kwargs), "directory_frontier": ComputeDirectoryFrontier(**kwargs), "directory_max_leaf_timestamps": ListDirectoryMaxLeafTimestamp(**kwargs), }
def _output_path(self) -> Path: return self.provenance_dir / "frontier_directories_in_revisions"
[docs] def output(self) -> luigi.LocalTarget: """Returns {provenance_dir}/frontier_directories_in_revisions/""" return luigi.LocalTarget(self._output_path())
[docs] def run(self) -> None: """Runs ``org.softwareheritage.graph.utils.ListFrontierDirectoriesInRevisions``""" import multiprocessing from ..shell import Rust # fmt: off ( Rust( "frontier-directories-in-revisions", self.local_graph_path / self.graph_name, "--thread-buffer-size", str(self.max_ram_mb * 1_000_000 // multiprocessing.cpu_count()), "--node-filter", self.provenance_node_filter, "--reachable-nodes", self.input()["reachable_nodes"], "--frontier-directories", self.input()["directory_frontier"], "--max-timestamps", self.input()["directory_max_leaf_timestamps"], "--directories-out", self.output(), ) ).run()
# fmt: on
[docs] class ListContentsInRevisionsWithoutFrontier(luigi.Task): """Creates a file that contains the list of (file, revision) where the file is reachable from the revision without going through any "directory frontier" as defined by `swh-provenance <https://gitlab.softwareheritage.org/swh/devel/swh-provenance/>`_. In short, it is a directory which directly contains a file (not a directory), which is a non-root directory in a revision newer than the directory timestamp computed by ListDirectoryMaxLeafTimestamp. """ local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads") max_ram_mb = luigi.IntParameter(default=default_max_ram_mb(), significant=False) @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self.max_ram_mb}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns :class:`LocalGraph` and :class:`ListDirectoryMaxLeafTimestamp` instances.""" kwargs = dict( local_export_path=self.local_export_path, local_graph_path=self.local_graph_path, graph_name=self.graph_name, provenance_dir=self.provenance_dir, provenance_node_filter=self.provenance_node_filter, ) return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "reachable_nodes": ListProvenanceNodes(**kwargs), "directory_frontier": ComputeDirectoryFrontier( max_ram_mb=self.max_ram_mb, **kwargs ), }
def _output_path(self) -> Path: return self.provenance_dir / "contents_in_revisions_without_frontiers"
[docs] def output(self) -> luigi.LocalTarget: """Returns {provenance_dir}/contents_in_revisions_without_frontiers""" return luigi.LocalTarget(self._output_path())
[docs] def run(self) -> None: """Runs ``contents-in-revisions-without-frontier`` from ``tools/provenance``""" import multiprocessing from ..shell import Rust # fmt: off ( Rust( "contents-in-revisions-without-frontier", self.local_graph_path / self.graph_name, "--thread-buffer-size", str(self.max_ram_mb * 1_000_000 // multiprocessing.cpu_count()), "--node-filter", self.provenance_node_filter, "--reachable-nodes", self.input()["reachable_nodes"], "--frontier-directories", self.input()["directory_frontier"], "--contents-out", self.output(), ) ).run()
# fmt: on
[docs] class ListContentsInFrontierDirectories(luigi.Task): """Enumerates all contents in all directories returned by :class:`ComputeDirectoryFrontier`.""" local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads") max_ram_mb = luigi.IntParameter(default=default_max_ram_mb(), significant=False) @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self.max_ram_mb}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns :class:`LocalGraph` and :class:`ComputeDirectoryFrontier` instances.""" kwargs = dict( local_export_path=self.local_export_path, local_graph_path=self.local_graph_path, graph_name=self.graph_name, provenance_dir=self.provenance_dir, provenance_node_filter=self.provenance_node_filter, ) return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "reachable_nodes": ListProvenanceNodes(**kwargs), "directory_frontier": ComputeDirectoryFrontier( max_ram_mb=self.max_ram_mb, **kwargs ), }
def _output_path(self) -> Path: return self.provenance_dir / "contents_in_frontier_directories"
[docs] def output(self) -> luigi.LocalTarget: """Returns {provenance_dir}/contents_in_frontier_directories/""" return luigi.LocalTarget(self._output_path())
[docs] def run(self) -> None: """Runs ``contents-in-directories`` from ``tools/provenance``""" import multiprocessing from ..shell import Rust # fmt: off ( Rust( "contents-in-directories", self.local_graph_path / self.graph_name, "--thread-buffer-size", str(self.max_ram_mb * 1_000_000 // multiprocessing.cpu_count()), "--node-filter", self.provenance_node_filter, "--frontier-directories", self.input()["directory_frontier"], "--contents-out", self.output(), ) ).run()
# fmt: on
[docs] class RunProvenance(luigi.WrapperTask): """(Transitively) depends on all provenance tasks""" local_export_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") provenance_dir = luigi.PathParameter() provenance_node_filter = luigi.Parameter(default="heads") max_ram_mb = luigi.IntParameter(default=default_max_ram_mb(), significant=False)
[docs] def requires(self): """Returns :class:`ListContentsInFrontierDirectories` and :class:`ListContentsInRevisionsWithoutFrontier`""" kwargs = dict( local_export_path=self.local_export_path, local_graph_path=self.local_graph_path, graph_name=self.graph_name, provenance_dir=self.provenance_dir, provenance_node_filter=self.provenance_node_filter, ) return [ ListProvenanceNodes(**kwargs), ListContentsInFrontierDirectories(max_ram_mb=self.max_ram_mb, **kwargs), ListContentsInRevisionsWithoutFrontier( max_ram_mb=self.max_ram_mb, **kwargs ), ListFrontierDirectoriesInRevisions(max_ram_mb=self.max_ram_mb, **kwargs), ]