Source code for swh.graph.cli

# Copyright (C) 2019-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from pathlib import Path
import shlex
import sys
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click

from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.core.cli import swh as swh_cli_group

if TYPE_CHECKING:
    from swh.graph.webgraph import CompressionStep  # noqa

logger = logging.getLogger(__name__)


[docs] class StepOption(click.ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step"
[docs] def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep] from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa steps: Set[CompressionStep] = set() specs = value.split(",") for spec in specs: if "-" in spec: # step range (raw_l, raw_r) = spec.split("-", maxsplit=1) if raw_l == "": # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == "": # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail(f"invalid step specification: {value}, " f"see --help") l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union( set(CompressionStep(i) for i in range(l_idx.value, r_idx.value + 1)) ) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail( f"invalid step specification: {value}, " f"see --help" ) return steps
[docs] class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string"""
[docs] def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = { "graph": ("dict", {"cls": "local", "grpc_server": {}}) } @swh_cli_group.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="YAML configuration file", ) @click.pass_context def graph_cli_group(ctx, config_file): """Software Heritage graph tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file, DEFAULT_CONFIG) if "graph" not in conf: raise ValueError( 'no "graph" stanza found in configuration file %s' % config_file ) ctx.obj["config"] = conf @graph_cli_group.command(name="rpc-serve") @click.option( "--host", "-h", default="0.0.0.0", metavar="IP", show_default=True, help="host IP address to bind the server on", ) @click.option( "--port", "-p", default=5009, type=click.INT, metavar="PORT", show_default=True, help="port to bind the server on", ) @click.option( "--graph", "-g", "graph_path", default=None, metavar="GRAPH", help="compressed graph basename", ) @click.pass_context def serve(ctx, host, port, graph_path): """Run the graph RPC service.""" import aiohttp.web from swh.graph.http_rpc_server import make_app config = ctx.obj["config"] config_graph = config["graph"] cls = config_graph["cls"] if cls in ("local", "local_rust"): if graph_path is None: raise ValueError( "Please, specify the graph path (-g <path>) for a 'local' rpc instance" ) # Only required when spawning a local rpc instance config["graph"]["grpc_server"]["path"] = graph_path elif cls == "remote": if "grpc_server" not in config_graph: raise ValueError( 'Please, specify the "grpc_server" key configuration in the "graph" section' ) else: raise ValueError( f'Value for "cls" must be "local"/"local_rust" or "remote", not {cls!r}' ) app = make_app(config=config) aiohttp.web.run_app(app, host=host, port=port) @graph_cli_group.command(name="download") @click.option( "--s3-url", default=None, help="S3 directory containing the graph to download. " "Defaults to '{s3_prefix}/{name}/compressed/'", ) @click.option( "--s3-prefix", default="s3://softwareheritage/graph/", help="Base directory of Software Heritage's graphs on S3", ) @click.option( "--name", default=None, help="Name of the dataset to download. This is an ISO8601 date, optionally with a " "suffix. See https://docs.softwareheritage.org/devel/swh-dataset/graph/dataset.html", ) @click.option( "--parallelism", "-j", default=5, help="Number of threads used to download/decompress files.", ) @click.argument( "target", type=click.Path( file_okay=False, writable=True, path_type=Path, # type: ignore[type-var] ), ) @click.pass_context def download( ctx, s3_url: Optional[str], s3_prefix: str, name: Optional[str], parallelism: int, target: Path, ): """Downloads a compressed SWH graph to the given target directory""" from swh.graph.download import GraphDownloader if s3_url and name: raise click.ClickException("--s3-url and --name are mutually exclusive") elif not s3_url and not name: raise click.ClickException("Either --s3-url or --name must be provided") elif not s3_url: s3_url = f"{s3_prefix.rstrip('/')}/{name}/compressed/" target.mkdir(parents=True, exist_ok=True) GraphDownloader( local_graph_path=target, s3_graph_path=s3_url, parallelism=parallelism, ).download_graph( progress_percent_cb=lambda _: None, progress_status_cb=lambda _: None, ) @graph_cli_group.command(name="reindex") @click.option( "--force", is_flag=True, help="Regenerate files even if they already exist. Implies --ef", ) @click.option( "--ef", is_flag=True, help="Regenerate .ef files even if they already exist" ) @click.option( "--debug", is_flag=True, help="Use debug executables instead of release executables" ) @click.argument( "graph", type=click.Path( writable=True, ), ) @click.pass_context def reindex( ctx, force: bool, ef: bool, graph: str, debug: bool, ): """Reindex a SWH GRAPH to the latest graph format. GRAPH should be composed of the graph folder followed by the graph prefix (by default "graph") eg. "graph_folder/graph". """ import os.path from swh.graph.shell import Rust ef = ef or force conf = ctx.obj["config"] if "debug" not in conf and debug: conf["debug"] = debug if ( ef or not os.path.exists(f"{graph}.ef") or not os.path.exists(f"{graph}-transposed.ef") ): logger.info("Recreating Elias-Fano indexes on adjacency lists") Rust("swh-graph-index", "ef", f"{graph}", conf=conf).run() Rust("swh-graph-index", "ef", f"{graph}-transposed", conf=conf).run() if ( ef or not os.path.exists(f"{graph}-labelled.ef") or not os.path.exists(f"{graph}-labelled-transposed.ef") ): with open(f"{graph}.nodes.count.txt", "rt") as f: node_count = f.read().strip() label_offsets_count = str(int(node_count) + 1) # ditto logger.info("Recreating Elias-Fano indexes on arc labels") Rust( "swh-graph-index", "labels-ef", f"{graph}-labelled", label_offsets_count, conf=conf, ).run() Rust( "swh-graph-index", "labels-ef", f"{graph}-transposed-labelled", label_offsets_count, conf=conf, ).run() node2type_fname = f"{graph}.node2type.bin" if force or not os.path.exists(node2type_fname): logger.info("Creating node2type.bin") if os.path.exists(node2type_fname): os.unlink(node2type_fname) Rust("swh-graph-node2type", graph, conf=conf).run() @graph_cli_group.command(name="grpc-serve") @click.option( "--port", "-p", default=None, type=click.INT, metavar="PORT", show_default=True, help=( "port to bind the server on (note: host is not configurable " "for now and will be 0.0.0.0). Defaults to 50091" ), ) @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.pass_context def grpc_serve(ctx, port, graph): """start the graph GRPC service This command uses execve to execute the Rust GRPC service. """ import os from swh.graph.grpc_server import build_rust_grpc_server_cmdline config = ctx.obj["config"] config["graph"]["path"] = graph if port is None and "port" not in config["graph"]: port = 50091 if port is not None: config["graph"]["port"] = port logger.debug("Building gPRC server command line") cmd, port = build_rust_grpc_server_cmdline(**config["graph"]) rust_bin = cmd[0] # XXX: shlex.join() is in 3.8 # logger.info("Starting gRPC server: %s", shlex.join(cmd)) logger.info("Starting gRPC server: %s", " ".join(shlex.quote(x) for x in cmd)) os.execvp(rust_bin, cmd) @graph_cli_group.command() @click.option( "--input-dataset", "-i", required=True, type=PathlibPath(), help="graph dataset directory, in ORC format", ) @click.option( "--output-directory", "-o", required=True, type=PathlibPath(), help="directory where to store compressed graph", ) @click.option( "--graph-name", "-g", default="graph", metavar="NAME", help="name of the output graph (default: 'graph')", ) @click.option( "--steps", "-s", metavar="STEPS", type=StepOption(), help="run only these compression steps (default: all steps)", ) @click.pass_context def compress(ctx, input_dataset, output_directory, graph_name, steps): """Compress a graph using WebGraph Input: a directory containing a graph dataset in ORC format Output: a directory containing a WebGraph compressed graph Compression steps are: (1) extract_nodes, (2) mph, (3) bv, (4) bfs, (5) permute_bfs, (6) transpose_bfs, (7) simplify, (8) llp, (9) permute_llp, (10) obl, (11) compose_orders, (12) stats, (13) transpose, (14) transpose_obl, (15) maps, (16) extract_persons, (17) mph_persons, (18) node_properties, (19) mph_labels, (20) fcl_labels, (21) edge_labels, (22) edge_labels_obl, (23) edge_labels_transpose_obl, (24) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ from swh.graph import webgraph try: conf = ctx.obj["config"]["graph"]["compress"] except KeyError: conf = {} # use defaults try: webgraph.compress(graph_name, input_dataset, output_directory, steps, conf) except webgraph.CompressionSubprocessError as e: try: if e.log_path.is_file(): with e.log_path.open("rb") as f: if e.log_path.stat().st_size > 1000: f.seek(-1000, 2) # read only the last 1kB f.readline() # skip first line, might be partial sys.stderr.write("[...]\n") sys.stderr.write("\n") sys.stderr.flush() sys.stderr.buffer.write(f.read()) sys.stderr.flush() except Exception: raise pass raise click.ClickException(e.message)
[docs] def get_all_subclasses(cls): all_subclasses = [] for subclass in cls.__subclasses__(): all_subclasses.append(subclass) all_subclasses.extend(get_all_subclasses(subclass)) return all_subclasses
@graph_cli_group.command() @click.option( "--base-directory", required=True, type=PathlibPath(), help="""The base directory where all datasets and compressed graphs are. Its subdirectories should be named after a date (and optional flavor). For example: ``/poolswh/softwareheritage/``.""", ) @click.option( "--base-sensitive-directory", required=False, type=PathlibPath(), help="""The base directory for any data that should not be publicly available (eg. because it contains people's names). For example: ``/poolswh/softwareheritage/``.""", ) @click.option( "--athena-prefix", required=False, type=str, help="""A prefix for the Athena Database that will be created and/or used. For example: ``swh``.""", ) @click.option( "--s3-prefix", required=False, type=str, help="""The base S3 "directory" where all datasets and compressed graphs are. Its subdirectories should be named after a date (and optional flavor). For example: ``s3://softwareheritage/graph/``.""", ) @click.option( "--max-ram", help="""Maximum RAM that some scripts will try not to exceed""", ) @click.option( "--batch-size", type=int, help="""Default value for compression tasks handling objects in batch""", ) @click.option( "--grpc-api", help="""Default value for the <hostname>:<port> of the gRPC server""" ) @click.option( "--s3-athena-output-location", required=False, type=str, help="""The base S3 "directory" where all datasets and compressed graphs are. Its subdirectories should be named after a date (and optional flavor). For example: ``s3://softwareheritage/graph/``.""", ) @click.option( "--graph-base-directory", required=False, type=PathlibPath(), help="""Overrides the path of the graph to use. Defaults to the value of ``{base_directory}/{dataset_name}/{compressed}/``. For example: ``/dev/shm/swh-graph/default/``.""", ) @click.option( "--export-base-directory", required=False, type=PathlibPath(), help="""Overrides the path of the export to use. Defaults to the value of ``--base-directory``.""", ) @click.option( "--dataset-name", required=True, type=str, help="""Should be a date and optionally a flavor, which will be used as directory name. For example: ``2022-04-25`` or ``2022-11-12_staging``.""", ) @click.option( "--export-name", required=False, type=str, help="""Should be a date and optionally a flavor, which will be used as directory name for the export (not the compressed graph). For example: ``2022-04-25`` or ``2022-11-12_staging``. Defaults to the value of --dataset-name""", ) @click.option( "--previous-dataset-name", required=False, type=str, help="""When regenerating a derived dataset, this can be set to the name of a previous dataset the derived dataset was generated for. Some results from the previous generated dataset will be reused to speed-up regeneration.""", ) @click.option( "--luigi-config", type=PathlibPath(), help="""Extra options to add to ``luigi.cfg``, following the same format. This overrides any option that would be other set automatically.""", ) @click.option( "--retry-luigi-delay", type=int, default=10, help="""Time to wait before re-running Luigi, if some tasks are pending but stuck.""", ) @click.argument("luigi_param", nargs=-1) @click.pass_context def luigi( ctx, base_directory: Path, graph_base_directory: Optional[Path], export_base_directory: Optional[Path], base_sensitive_directory: Optional[Path], s3_prefix: Optional[str], athena_prefix: Optional[str], max_ram: Optional[str], batch_size: Optional[int], grpc_api: Optional[str], s3_athena_output_location: Optional[str], dataset_name: str, export_name: Optional[str], previous_dataset_name: Optional[str], retry_luigi_delay: int, luigi_config: Optional[Path], luigi_param: List[str], ): r""" Calls Luigi with the given task and params, and automatically configures paths based on --base-directory and --dataset-name. The list of Luigi params should be prefixed with ``--`` so they are not interpreted by the ``swh`` CLI. For example:: swh graph luigi \ --base-directory ~/tmp/ \ --dataset-name 2022-12-05_test ListOriginContributors \ -- \ RunAll \ --local-scheduler to pass ``RunAll --local-scheduler`` as Luigi params Or, to compute a derived dataset:: swh graph luigi \ --graph-base-directory /dev/shm/swh-graph/default/ \ --base-directory /poolswh/softwareheritage/vlorentz/ \ --athena-prefix swh \ --dataset-name 2022-04-25 \ --s3-athena-output-location s3://some-bucket/tmp/athena \ -- \ --log-level INFO \ FindEarliestRevisions \ --scheduler-url http://localhost:50092/ \ --blob-filter citation """ import configparser import os import secrets import socket import subprocess import tempfile import time import luigi import psutil # Popular the list of subclasses of luigi.Task import swh.dataset.luigi # noqa import swh.graph.luigi # noqa config = configparser.ConfigParser() # By default, Luigi always returns code 0. # See https://luigi.readthedocs.io/en/stable/configuration.html#retcode config["retcode"] = { "already_running": "128", "missing_data": "129", "not_run": "130", "task_failed": "131", "scheduling_error": "132", "unhandled_exception": "133", } if max_ram: if max_ram.endswith("G"): max_ram_mb = int(max_ram[:-1]) * 1024 elif max_ram.endswith("M"): max_ram_mb = int(max_ram[:-1]) else: raise click.ClickException( "--max-ram must be an integer followed by M or G" ) else: max_ram_mb = psutil.virtual_memory().total // (1024 * 1024) # Only used by the local scheduler; otherwise it needs to be configured # in luigid.cfg hostname = socket.getfqdn() config["resources"] = { f"{hostname}_ram_mb": str(max_ram_mb), } export_name = export_name or dataset_name export_path = (export_base_directory or base_directory) / export_name dataset_path = base_directory / dataset_name default_values = dict( local_export_path=export_path, local_graph_path=dataset_path / "compressed", derived_datasets_path=dataset_path, topological_order_dir=dataset_path / "topology/", origin_contributors_path=dataset_path / "contribution_graph.csv.zst", origin_urls_path=dataset_path / "origin_urls.csv.zst", export_id=f"{export_name}-{secrets.token_hex(10)}", export_name=export_name, dataset_name=dataset_name, previous_dataset_name=previous_dataset_name, ) if graph_base_directory: default_values["local_graph_path"] = graph_base_directory if s3_prefix: default_values["s3_export_path"] = f"{s3_prefix.rstrip('/')}/{export_name}" default_values["s3_graph_path"] = ( f"{s3_prefix.rstrip('/')}/{dataset_name}/compressed" ) if s3_athena_output_location: default_values["s3_athena_output_location"] = s3_athena_output_location if max_ram: default_values["max_ram_mb"] = max_ram_mb if batch_size: default_values["batch_size"] = batch_size if grpc_api: default_values["grpc_api"] = grpc_api if base_sensitive_directory: sensitive_path = base_sensitive_directory / dataset_name default_values["deanonymized_origin_contributors_path"] = ( sensitive_path / "contributors_deanonymized.csv.zst" ) default_values["deanonymization_table_path"] = ( sensitive_path / "persons_sha256_to_name.csv.zst" ) if previous_dataset_name: default_values["previous_derived_datasets_path"] = ( base_directory / previous_dataset_name ) if athena_prefix: default_values["athena_db_name"] = ( f"{athena_prefix}_{dataset_name.replace('-', '')}" ) for task_cls in get_all_subclasses(luigi.Task): task_name = task_cls.__name__ # If the task has an argument with one of the known name, add the default value # to its config. task_config = { arg_name: str(arg_value) for arg_name, arg_value in default_values.items() if hasattr(task_cls, arg_name) } if task_config: config[task_name] = task_config # If any config is provided, add it. # This may override default arguments configured above. if luigi_config is not None: config.read(luigi_config) with tempfile.NamedTemporaryFile(mode="w+t", prefix="luigi_", suffix=".cfg") as fd: config.write(fd) fd.flush() while True: start_time = time.time() proc = subprocess.run( [ "luigi", "--module", "swh.dataset.luigi", "--module", "swh.graph.luigi", *luigi_param, ], env={ "LUIGI_CONFIG_PATH": fd.name, **os.environ, }, ) # If all tasks are stuck, run another loop to wait for them to be unstuck. # The normal way to do this would be to set config["worker"]["keep_alive"] # to True, but we do it this way to force Luigi to recompute resources # because most tasks in compressed_graph.py can only know their resource # usage after ExtractNodes and ExtractPersons ran. if proc.returncode != int(config["retcode"]["not_run"]): break # wait a few seconds between loops to avoid wasting CPU time.sleep(max(0, retry_luigi_delay - (time.time() - start_time))) exit(proc.returncode)
[docs] def main(): return graph_cli_group(auto_envvar_prefix="SWH_GRAPH")
if __name__ == "__main__": main()