Source code for swh.graph.cli

# Copyright (C) 2019-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from pathlib import Path
import shlex
import sys
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click

from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.core.cli import swh as swh_cli_group

    from swh.graph.webgraph import CompressionStep  # noqa

logger = logging.getLogger(__name__)

[docs] class StepOption(click.ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step"
[docs] def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep] from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa steps: Set[CompressionStep] = set() specs = value.split(",") for spec in specs: if "-" in spec: # step range (raw_l, raw_r) = spec.split("-", maxsplit=1) if raw_l == "": # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == "": # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1:"invalid step specification: {value}, " f"see --help") l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union( set(CompressionStep(i) for i in range(l_idx.value, r_idx.value + 1)) ) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: f"invalid step specification: {value}, " f"see --help" ) return steps
[docs] class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string"""
[docs] def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = { "graph": ("dict", {"cls": "local", "grpc_server": {}}) }"graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="YAML configuration file", ) @click.option( "--target", type=str, help="Which Rust target to use executables from, usually 'release' " "(the default) or 'debug'.", ) @click.pass_context def graph_cli_group(ctx, config_file, target): """Software Heritage graph tools.""" from swh.core import config ctx.ensure_object(dict) conf =, DEFAULT_CONFIG) if "graph" not in conf: raise ValueError( 'no "graph" stanza found in configuration file %s' % config_file ) ctx.obj["config"] = conf if target is None: target = conf.get("target", "release") conf["target"] = target @graph_cli_group.command(name="rpc-serve") @click.option( "--host", "-h", default="", metavar="IP", show_default=True, help="host IP address to bind the server on", ) @click.option( "--port", "-p", default=5009, type=click.INT, metavar="PORT", show_default=True, help="port to bind the server on", ) @click.option( "--graph", "-g", "graph_path", default=None, metavar="GRAPH", help="compressed graph basename", ) @click.pass_context def serve(ctx, host, port, graph_path): """Run the graph RPC service.""" import aiohttp.web from swh.graph.http_rpc_server import make_app config = ctx.obj["config"] config_graph = config["graph"] cls = config_graph["cls"] if cls in ("local", "local_rust"): if graph_path is None: raise ValueError( "Please, specify the graph path (-g <path>) for a 'local' rpc instance" ) # Only required when spawning a local rpc instance config["graph"]["grpc_server"]["path"] = graph_path elif cls == "remote": if "grpc_server" not in config_graph: raise ValueError( 'Please, specify the "grpc_server" key configuration in the "graph" section' ) else: raise ValueError( f'Value for "cls" must be "local"/"local_rust" or "remote", not {cls!r}' ) app = make_app(config=config) aiohttp.web.run_app(app, host=host, port=port) @graph_cli_group.command(name="download") @click.option( "--s3-url", default=None, help="S3 directory containing the graph to download. " "Defaults to '{s3_prefix}/{name}/compressed/'", ) @click.option( "--s3-prefix", default="s3://softwareheritage/graph/", help="Base directory of Software Heritage's graphs on S3", ) @click.option( "--name", default=None, help="Name of the dataset to download. This is an ISO8601 date, optionally with a " "suffix. See", ) @click.option( "--parallelism", "-j", default=5, help="Number of threads used to download/decompress files.", ) @click.argument( "target-dir", type=click.Path( file_okay=False, writable=True, path_type=Path, # type: ignore[type-var] ), ) @click.pass_context def download( ctx, s3_url: Optional[str], s3_prefix: str, name: Optional[str], parallelism: int, target_dir: Path, ): """Downloads a compressed SWH graph to the given target directory""" from import GraphDownloader if s3_url and name: raise click.ClickException("--s3-url and --name are mutually exclusive") elif not s3_url and not name: raise click.ClickException("Either --s3-url or --name must be provided") elif not s3_url: s3_url = f"{s3_prefix.rstrip('/')}/{name}/compressed/" target_dir.mkdir(parents=True, exist_ok=True) GraphDownloader( local_graph_path=target_dir, s3_graph_path=s3_url, parallelism=parallelism, ).download_graph( progress_percent_cb=lambda _: None, progress_status_cb=lambda _: None, ) @graph_cli_group.command(name="list-datasets") @click.option( "--s3-bucket", default="softwareheritage", help="S3 bucket name containing Software Heritage graph datasets. " "Defaults to 'sotwareheritage'", ) @click.pass_context def list_datasets( ctx, s3_bucket: Optional[str], ): """List graph datasets available for download. Print the names of the Software Heritage graph datasets that can be downloaded with the following command: $ swh graph download --name <dataset_name> <target_directory> The list may contain datasets that are not suitable for production, or not yet fully available. See for the official list of datasets, along with release notes. """ import boto3 from botocore import UNSIGNED from botocore.client import Config click.echo( "The following list is automatically generated from Software Heritage's " "S3 bucket.", err=True, ) click.echo( "It may contain datasets that are not suitable for production, or not yet " "fully available.", err=True, ) click.echo( "See " "for the official list of datasets, along with release notes.", err=True, ) s3_client = boto3.client("s3", config=Config(signature_version=UNSIGNED)) paginator = s3_client.get_paginator("list_objects_v2") for dataset_prefix in [ prefix["Prefix"] for page in paginator.paginate(Bucket=s3_bucket, Prefix="graph/", Delimiter="/") for prefix in page["CommonPrefixes"] ]: if "Contents" in s3_client.list_objects_v2( Bucket=s3_bucket, Prefix=dataset_prefix + "compressed/", Delimiter="/", ): click.echo(dataset_prefix.split("/")[1]) @graph_cli_group.command(name="reindex") @click.option( "--force", is_flag=True, help="Regenerate files even if they already exist. Implies --ef", ) @click.option( "--ef", is_flag=True, help="Regenerate .ef files even if they already exist" ) @click.argument( "graph", type=click.Path( writable=True, ), ) @click.pass_context def reindex(ctx, force: bool, ef: bool, graph: str): """Reindex a SWH GRAPH to the latest graph format. GRAPH should be composed of the graph folder followed by the graph prefix (by default "graph") eg. "graph_folder/graph". """ import os.path from import Rust ef = ef or force conf = ctx.obj["config"] if "target" not in conf: conf["target"] = "release" if ( ef or not os.path.exists(f"{graph}.ef") or not os.path.exists(f"{graph}-transposed.ef") ):"Recreating Elias-Fano indexes on adjacency lists") Rust("swh-graph-index", "ef", f"{graph}", conf=conf).run() Rust("swh-graph-index", "ef", f"{graph}-transposed", conf=conf).run() if ( ef or not os.path.exists(f"{graph}-labelled.ef") or not os.path.exists(f"{graph}-labelled-transposed.ef") ): with open(f"{graph}.nodes.count.txt", "rt") as f: node_count = label_offsets_count = str(int(node_count) + 1) # ditto"Recreating Elias-Fano indexes on arc labels") Rust( "swh-graph-index", "labels-ef", f"{graph}-labelled", label_offsets_count, conf=conf, ).run() Rust( "swh-graph-index", "labels-ef", f"{graph}-transposed-labelled", label_offsets_count, conf=conf, ).run() node2type_fname = f"{graph}.node2type.bin" if force or not os.path.exists(node2type_fname):"Creating node2type.bin") if os.path.exists(node2type_fname): os.unlink(node2type_fname) Rust("swh-graph-node2type", graph, conf=conf).run() @graph_cli_group.command(name="grpc-serve") @click.option( "--port", "-p", default=None, type=click.INT, metavar="PORT", show_default=True, help=( "port to bind the server on (note: host is not configurable " "for now and will be Defaults to 50091" ), ) @click.option("--graph", "-g", metavar="GRAPH", help="compressed graph basename") @click.pass_context def grpc_serve(ctx, port, graph): """start the graph GRPC service This command uses execve to execute the Rust GRPC service. """ import os from swh.graph.grpc_server import build_rust_grpc_server_cmdline config = ctx.obj["config"] target = ctx.obj["config"].get("target", "release") config["graph"]["target"] = target if graph is not None: config["graph"]["path"] = graph else: if "path" not in config["graph"]: raise ValueError("No graph base name provided") if port is None: port = config["graph"].get("port", 50091) config["graph"]["port"] = port logger.debug("Building gPRC server command line") cmd, port = build_rust_grpc_server_cmdline(**config["graph"]) rust_bin = cmd[0] # XXX: shlex.join() is in 3.8 #"Starting gRPC server: %s", shlex.join(cmd))"Starting gRPC server: %s", " ".join(shlex.quote(x) for x in cmd)) os.execvp(rust_bin, cmd) @graph_cli_group.command() @click.option( "--input-dataset", "-i", type=PathlibPath(), help="graph dataset directory, in ORC format", ) @click.option( "--output-directory", "-o", type=PathlibPath(), help="directory where to store compressed graph", ) @click.option( "--graph-name", "-g", default="graph", metavar="NAME", help="name of the output graph (default: 'graph')", ) @click.option( "--steps", "-s", metavar="STEPS", type=StepOption(), help="run only these compression steps (default: all steps)", ) @click.option("--test-flavor", "--test-flavour", type=str, help="Test flavo[u]r") @click.pass_context def compress(ctx, input_dataset, output_directory, graph_name, steps, test_flavor): """Compress a graph using WebGraph Input: a directory containing a graph dataset in ORC format Output: a directory containing a WebGraph compressed graph Compression steps are: (1) extract_nodes, (2) mph, (3) bv, (4) bfs, (5) permute_bfs, (6) transpose_bfs, (7) simplify, (8) llp, (9) permute_llp, (10) obl, (11) compose_orders, (12) stats, (13) transpose, (14) transpose_obl, (15) maps, (16) extract_persons, (17) mph_persons, (18) node_properties, (19) mph_labels, (20) fcl_labels, (21) edge_labels, (22) edge_labels_obl, (23) edge_labels_transpose_obl, (24) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ from swh.graph import webgraph try: conf = ctx.obj["config"]["graph"]["compress"] except KeyError: conf = {} # use defaults try: conf["target"] = ctx.obj["config"]["target"] except KeyError: conf["target"] = "release" # use release builds by default if test_flavor is None: # TODO: see is this can be None test_flavor = conf.get("test_flavor", "full") conf["test_flavor"] = test_flavor try: webgraph.compress( graph_name, input_dataset, output_directory, test_flavor, steps, conf ) except webgraph.CompressionSubprocessError as e: try: if e.log_path.is_file(): with"rb") as f: if e.log_path.stat().st_size > 1000:, 2) # read only the last 1kB f.readline() # skip first line, might be partial sys.stderr.write("[...]\n") sys.stderr.write("\n") sys.stderr.flush() sys.stderr.buffer.write( sys.stderr.flush() except Exception: raise pass raise click.ClickException(e.message)
[docs] def get_all_subclasses(cls): all_subclasses = [] for subclass in cls.__subclasses__(): all_subclasses.append(subclass) all_subclasses.extend(get_all_subclasses(subclass)) return all_subclasses
@graph_cli_group.command() @click.option( "--base-directory", required=True, type=PathlibPath(), help="""The base directory where all datasets and compressed graphs are. Its subdirectories should be named after a date (and optional flavor). For example: ``/poolswh/softwareheritage/``.""", ) @click.option( "--athena-prefix", required=False, type=str, help="""A prefix for the Athena Database that will be created and/or used. For example: ``swh``.""", ) @click.option( "--s3-prefix", required=False, type=str, help="""The base S3 "directory" where all datasets and compressed graphs are. Its subdirectories should be named after a date (and optional flavor). For example: ``s3://softwareheritage/graph/``.""", ) @click.option( "--max-ram", help="""Maximum RAM that some scripts will try not to exceed""", ) @click.option( "--batch-size", type=int, help="""Default value for compression tasks handling objects in batch""", ) @click.option( "--grpc-api", help="""Default value for the <hostname>:<port> of the gRPC server""" ) @click.option( "--s3-athena-output-location", required=False, type=str, help="""The base S3 "directory" where all datasets and compressed graphs are. Its subdirectories should be named after a date (and optional flavor). For example: ``s3://softwareheritage/graph/``.""", ) @click.option( "--graph-base-directory", required=False, type=PathlibPath(), help="""Overrides the path of the graph to use. Defaults to the value of ``{base_directory}/{dataset_name}/{compressed}/``. For example: ``/dev/shm/swh-graph/default/``.""", ) @click.option( "--export-base-directory", required=False, type=PathlibPath(), help="""Overrides the path of the export to use. Defaults to the value of ``--base-directory``.""", ) @click.option( "--dataset-name", required=True, type=str, help="""Should be a date and optionally a flavor, which will be used as directory name. For example: ``2022-04-25`` or ``2022-11-12_staging``.""", ) @click.option( "--parent-dataset-name", required=False, type=str, help="""When generating a subdataset (eg. ``2024-08-23-python3k``), this is the name of a full export (eg. ``2024-08-23``) the subdataset should be built from.""", ) @click.option( "--export-name", required=False, type=str, help="""Should be a date and optionally a flavor, which will be used as directory name for the export (not the compressed graph). For example: ``2022-04-25`` or ``2022-11-12_staging``. Defaults to the value of --dataset-name""", ) @click.option( "--parent-export-name", required=False, type=str, help="""When generating a subdataset (eg. ``2024-08-23-python3k``), this is the name of a full export (eg. ``2024-08-23``) the subdataset should be built from. Defaults to the value of --parent-dataset-name""", ) @click.option( "--luigi-config", type=PathlibPath(), help="""Extra options to add to ``luigi.cfg``, following the same format. This overrides any option that would be other set automatically.""", ) @click.option( "--retry-luigi-delay", type=int, default=10, help="""Time to wait before re-running Luigi, if some tasks are pending but stuck.""", ) @click.argument("luigi_param", nargs=-1) @click.pass_context def luigi( ctx, base_directory: Path, graph_base_directory: Optional[Path], export_base_directory: Optional[Path], s3_prefix: Optional[str], athena_prefix: Optional[str], max_ram: Optional[str], batch_size: Optional[int], grpc_api: Optional[str], s3_athena_output_location: Optional[str], dataset_name: str, parent_dataset_name: Optional[str], parent_export_name: Optional[str], export_name: Optional[str], retry_luigi_delay: int, luigi_config: Optional[Path], luigi_param: List[str], ): r""" Internal command of swh-graph. Use 'swh export luigi' instead. Calls Luigi with the given task and params, and automatically configures paths based on --base-directory and --dataset-name. The list of Luigi params should be prefixed with ``--`` so they are not interpreted by the ``swh`` CLI. For example:: swh datasets luigi \ --base-directory ~/tmp/ \ --dataset-name 2022-12-05_test \ -- \ RunAll \ --local-scheduler to pass ``RunAll --local-scheduler`` as Luigi params Or, to compute a derived dataset:: swh graph luigi \ --graph-base-directory /dev/shm/swh-graph/default/ \ --base-directory /poolswh/softwareheritage/vlorentz/ \ --athena-prefix swh \ --dataset-name 2022-04-25 \ --s3-athena-output-location s3://some-bucket/tmp/athena \ -- \ --log-level INFO \ FindEarliestRevisions \ --scheduler-url http://localhost:50092/ \ --blob-filter citation """ import configparser import os import secrets import socket import subprocess import tempfile import time import luigi import psutil # Popular the list of subclasses of luigi.Task import swh.export.luigi # noqa import swh.graph.luigi # noqa config = configparser.ConfigParser() # By default, Luigi always returns code 0. # See config["retcode"] = { "already_running": "128", "missing_data": "129", "not_run": "130", "task_failed": "131", "scheduling_error": "132", "unhandled_exception": "133", } if max_ram: if max_ram.endswith("G"): max_ram_mb = int(max_ram[:-1]) * 1024 elif max_ram.endswith("M"): max_ram_mb = int(max_ram[:-1]) else: raise click.ClickException( "--max-ram must be an integer followed by M or G" ) else: max_ram_mb = psutil.virtual_memory().total // (1024 * 1024) # Only used by the local scheduler; otherwise it needs to be configured # in luigid.cfg hostname = socket.getfqdn() config["resources"] = { f"{hostname}_ram_mb": str(max_ram_mb), } export_name = export_name or dataset_name parent_export_name = parent_export_name or parent_dataset_name export_path = (export_base_directory or base_directory) / export_name dataset_path = base_directory / dataset_name default_values = dict( local_export_path=export_path, local_graph_path=dataset_path / "compressed", derived_datasets_path=dataset_path, topological_order_dir=dataset_path / "topology/", origin_urls_path=dataset_path / "origin_urls.csv.zst", export_id=f"{export_name}-{secrets.token_hex(10)}", export_name=export_name, dataset_name=dataset_name, ) if graph_base_directory: default_values["local_graph_path"] = graph_base_directory if s3_prefix: default_values["s3_export_path"] = f"{s3_prefix.rstrip('/')}/{export_name}" default_values["s3_graph_path"] = ( f"{s3_prefix.rstrip('/')}/{dataset_name}/compressed" ) if parent_dataset_name: default_values["s3_parent_export_path"] = ( f"{s3_prefix.rstrip('/')}/{parent_export_name}" ) default_values["s3_parent_graph_path"] = ( f"{s3_prefix.rstrip('/')}/{parent_dataset_name}/compressed" ) if s3_athena_output_location: default_values["s3_athena_output_location"] = s3_athena_output_location if max_ram: default_values["max_ram_mb"] = max_ram_mb if batch_size: default_values["batch_size"] = batch_size if grpc_api: default_values["grpc_api"] = grpc_api if athena_prefix: default_values["athena_db_name"] = ( f"{athena_prefix}_{dataset_name.replace('-', '')}" ) if parent_dataset_name: default_values["athena_parent_db_name"] = ( f"{athena_prefix}_{parent_dataset_name.replace('-', '')}" ) for task_cls in get_all_subclasses(luigi.Task): task_name = task_cls.__name__ # If the task has an argument with one of the known name, add the default value # to its config. task_config = { arg_name: str(arg_value) for arg_name, arg_value in default_values.items() if hasattr(task_cls, arg_name) } if task_config: config[task_name] = task_config # If any config is provided, add it. # This may override default arguments configured above. if luigi_config is not None: with tempfile.NamedTemporaryFile(mode="w+t", prefix="luigi_", suffix=".cfg") as fd: config.write(fd) fd.flush() while True: start_time = time.time() proc = [ "luigi", "--module", "swh.export.luigi", "--module", "swh.graph.luigi", *luigi_param, ], env={ "LUIGI_CONFIG_PATH":, **os.environ, }, ) # If all tasks are stuck, run another loop to wait for them to be unstuck. # The normal way to do this would be to set config["worker"]["keep_alive"] # to True, but we do it this way to force Luigi to recompute resources # because most tasks in can only know their resource # usage after ExtractNodes and ExtractPersons ran. if proc.returncode != int(config["retcode"]["not_run"]): break # wait a few seconds between loops to avoid wasting CPU time.sleep(max(0, retry_luigi_delay - (time.time() - start_time))) exit(proc.returncode) @graph_cli_group.command(name="find-context") @click.option( "-g", "--graph-grpc-server", default="localhost:50091", metavar="GRAPH_GRPC_SERVER", show_default=True, help="Graph RPC server address: as host:port", ) @click.option( "-c", "--content-swhid", default="swh:1:cnt:3b997e8ef2e38d5b31fb353214a54686e72f0870", metavar="CNTSWHID", show_default=True, help="SWHID of the content", ) @click.option( "-f", "--filename", default="", metavar="FILENAME", show_default=True, help="Name of file to search for", ) @click.option( "-o", "--origin-url", default="", metavar="ORIGINURL", show_default=True, help="URL of the origin where we look for a content", ) @click.option( "--all-origins/--no-all-origins", default=False, help="Compute fqswhid for all origins", ) @click.option( "--fqswhid/--no-fqswhid", default=True, help="Compute fqswhid. If disabled, print only the origins.", ) @click.option( "--trace/--no-trace", default=False, help="Print nodes examined while building fully qualified SWHID.", ) @click.option( "--random-origin/--no-random-origin", default=True, help="Compute fqswhid for a random origin", ) @click.pass_context def find_context(ctx, **kwargs): """Utility to get the fully qualified SWHID for a given core SWHID. Uses the graph traversal to find the shortest path to an origin, and retains the first seen revision or release as anchor for cnt and dir types.""" from swh.graph.find_context import main as lookup return lookup(**kwargs)
[docs] def main(): return graph_cli_group(auto_envvar_prefix="SWH_GRAPH")
if __name__ == "__main__": main()