Source code for swh.graph.luigi.file_names

# Copyright (C) 2022-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""
Luigi tasks for producing the most common names of every content and datasets based on file names
=================================================================================================

"""  # noqa

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from pathlib import Path
from typing import List, Tuple

import luigi

from swh.dataset.luigi import S3PathParameter

from .compressed_graph import LocalGraph
from .utils import _ParquetToS3ToAthenaTask, count_nodes


[docs] class PopularContentNames(luigi.Task): """Creates a CSV file that contains the most popular name(s) of each content""" local_graph_path = luigi.PathParameter() popular_contents_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") max_results_per_content = luigi.IntParameter(default=1) popularity_threshold = luigi.IntParameter(default=0) def _max_ram(self): nb_nodes = count_nodes( self.local_graph_path, self.graph_name, "ori,snp,rel,rev,dir,cnt" ) graph_size = nb_nodes * 8 # Does not keep any large array for all nodes, but uses temporary HashMaps # to store counts of names locally. # 1GB should be more than enough. spare_space = 1_000_000_000 return graph_size + spare_space @property def resources(self): """Return the estimated RAM use of this task.""" import socket return {f"{socket.getfqdn()}_ram_mb": self._max_ram() / 1_000_000}
[docs] def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`LocalGraph`.""" return [LocalGraph(local_graph_path=self.local_graph_path)]
[docs] def output(self) -> luigi.LocalTarget: """.csv.zst file that contains the topological order.""" return luigi.LocalTarget(self.popular_contents_path)
[docs] def run(self) -> None: """Runs ``popular-content-names`` from :file:`tools/file_names`""" from ..shell import Rust # fmt: on ( Rust( "popular-content-names", self.local_graph_path / self.graph_name, "--max-results", str(self.max_results_per_content), "--min-occurrences", str(self.popularity_threshold), "--out", self.output(), ) ).run()
# fmt: off
[docs] class PopularContentPaths(luigi.Task): """Creates a CSV file that contains the most popular path of each content/directory given as input""" local_graph_path = luigi.PathParameter() popular_contents_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") input_swhids = luigi.PathParameter() max_depth = luigi.IntParameter(default=2) def _max_ram(self): nb_nodes = count_nodes( self.local_graph_path, self.graph_name, "ori,snp,rel,rev,dir,cnt" ) graph_size = nb_nodes * 8 # Does not keep any large array for all nodes, but uses temporary HashMaps # to store counts of names locally. # 10GB should be more than enough. spare_space = 10_000_000_000 return graph_size + spare_space @property def resources(self): """Return the estimated RAM use of this task.""" import socket return {f"{socket.getfqdn()}_ram_mb": self._max_ram() / 1_000_000}
[docs] def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`LocalGraph`.""" return [LocalGraph(local_graph_path=self.local_graph_path)]
[docs] def output(self) -> luigi.LocalTarget: """.csv.zst file that contains the topological order.""" return luigi.LocalTarget(self.popular_contents_path)
[docs] def run(self) -> None: """Runs org.softwareheritage.graph.utils.PopularContentPaths and compresses""" import multiprocessing.dummy from ..shell import Command, Rust, wc input_swhid_files = list(self.input_swhids.iterdir()) zstd_opts = ["--memory=1024MB"] # Needed for Antonio's highly-compressed files with multiprocessing.dummy.Pool() as p: nb_contents = sum( p.imap_unordered( lambda path: wc(Command.zstdcat(*zstd_opts, path), "-l") - 1, input_swhid_files, ) ) # Stream the header from all inputs but the first input_streams = [Command.zstdcat(*zstd_opts, input_swhid_files[0])] for input_swhid_file in input_swhid_files[1:]: input_streams.append( Command.zstdcat(*zstd_opts, input_swhid_file) | Command.tail("-n", "+2") ) # fmt: on ( Command.cat(*input_streams) | Rust( "popular-content-paths", self.local_graph_path / self.graph_name, "--expected-nodes", str(nb_contents), "--depth", str(self.max_depth), "--out", self.output(), ) ).run()
# fmt: off
[docs] class PopularContentNamesOrcToS3(_ParquetToS3ToAthenaTask): """Reads the CSV from :class:`PopularContents`, converts it to ORC, upload the ORC to S3, and create an Athena table for it.""" popular_contents_path = luigi.PathParameter() dataset_name = luigi.Parameter() s3_athena_output_location = S3PathParameter()
[docs] def requires(self) -> PopularContentNames: """Returns corresponding PopularContentNames instance""" if self.dataset_name not in str(self.popular_contents_path): raise Exception( f"Dataset name {self.dataset_name!r} is not part of the " f"popular_contents_path {self.popular_contents_path!r}" ) return PopularContentNames( popular_contents_path=self.popular_contents_path, )
def _input_parquet_path(self) -> Path: return self.popular_contents_path def _s3_bucket(self) -> str: # TODO: configurable return "softwareheritage" def _s3_prefix(self) -> str: # TODO: configurable return f"derived_datasets/{self.dataset_name}/popular_contents/" def _parquet_columns(self) -> List[Tuple[str, str]]: return [ ("SWHID", "string"), ("length", "bigint"), ("filename", "binary"), ("occurrences", "bigint"), ] def _athena_db_name(self) -> str: return f"derived_{self.dataset_name.replace('-', '')}" def _athena_table_name(self) -> str: return "popular_contents"
[docs] class ListFilesByName(luigi.Task): """From every refs/heads/master, refs/heads/main, or HEAD branch in any snapshot, browse the whole directory tree looking for files named <filename>, and lists them to stdout.""" local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") output_path = luigi.PathParameter() file_name = luigi.Parameter() def _max_ram(self): nb_nodes = count_nodes( self.local_graph_path, self.graph_name, "ori,snp,rel,rev,dir,cnt" ) graph_size = nb_nodes * 8 num_threads = 96 # see listFilesInSnapshotChunk csv_buffers = self.batch_size * num_threads * 1000000 # Does not keep any large array for all nodes, but uses temporary stacks # and hash sets. # 1GB per thread should be more than enough. bfs_buffers = 1_000_000_000 * num_threads spare_space = 1_000_000_000 return graph_size + csv_buffers + bfs_buffers + spare_space @property def resources(self): """Return the estimated RAM use of this task.""" import socket return {f"{socket.getfqdn()}_ram_mb": self._max_ram() / 1_000_000}
[docs] def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`LocalGraph`.""" return [LocalGraph(local_graph_path=self.local_graph_path)]
[docs] def output(self) -> luigi.LocalTarget: """Directory of .csv.zst files containing the list of file occurrences with that name.""" return luigi.LocalTarget(self.output_path)
[docs] def run(self) -> None: """Runs org.softwareheritage.graph.utils.PopularContentNames and compresses""" from ..shell import Rust # fmt: on ( Rust( "list-files-by-name", self.local_graph_path / self.graph_name, self.file_name, "--out", self.output(), ) ).run()
# fmt: off