# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
import pathlib
import sys
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.dataset.relational import MAIN_TABLES
if TYPE_CHECKING:
from swh.model.swhids import ExtendedSWHID
@swh_cli_group.group(name="dataset", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False),
help="Configuration file.",
)
@click.pass_context
def dataset_cli_group(ctx, config_file):
"""Dataset Tools.
A set of tools to export datasets from the Software Heritage Archive in
various formats.
"""
from swh.core import config
ctx.ensure_object(dict)
conf = config.read(config_file)
ctx.obj["config"] = conf
@dataset_cli_group.group("graph")
@click.pass_context
def graph(ctx):
"""Manage graph export"""
pass
AVAILABLE_EXPORTERS = {
"edges": "swh.dataset.exporters.edges:GraphEdgesExporter",
"orc": "swh.dataset.exporters.orc:ORCExporter",
}
@graph.command("export")
@click.argument("export-path", type=click.Path())
@click.option(
"--export-id",
"-e",
help=(
"Unique ID of the export run. This is appended to the kafka "
"group_id config file option. If group_id is not set in the "
"'journal' section of the config file, defaults to 'swh-dataset-export-'."
),
)
@click.option(
"--formats",
"-f",
type=click.STRING,
default=",".join(AVAILABLE_EXPORTERS.keys()),
show_default=True,
help="Formats to export.",
)
@click.option("--processes", "-p", default=1, help="Number of parallel processes")
@click.option(
"--exclude",
type=click.STRING,
help="Comma-separated list of object types to exclude",
)
@click.option(
"--types",
"object_types",
type=click.STRING,
help="Comma-separated list of objects types to export",
)
@click.option(
"--margin",
type=click.FloatRange(0, 1),
help=(
"Offset margin to start consuming from. E.g. is set to '0.95', "
"consumers will start at 95% of the last committed offset; "
"in other words, start earlier than last committed position."
),
)
@click.pass_context
def export_graph(ctx, export_path, formats, exclude, object_types, **kwargs):
"""Export the Software Heritage graph as an edge dataset."""
config = ctx.obj["config"]
if object_types:
object_types = {o.strip() for o in object_types.split(",")}
invalid_object_types = object_types - set(MAIN_TABLES.keys())
if invalid_object_types:
raise click.BadOptionUsage(
option_name="types",
message=f"Invalid object types: {', '.join(invalid_object_types)}.",
)
else:
object_types = set(MAIN_TABLES.keys())
exclude_obj_types = {o.strip() for o in (exclude.split(",") if exclude else [])}
export_formats = [c.strip() for c in formats.split(",")]
for f in export_formats:
if f not in AVAILABLE_EXPORTERS:
raise click.BadOptionUsage(
option_name="formats", message=f"{f} is not an available format."
)
run_export_graph(
config,
pathlib.Path(export_path),
export_formats,
list(object_types),
exclude_obj_types=exclude_obj_types,
**kwargs,
)
[docs]
def get_masked_swhids(logger, config: Dict[str, Any]) -> Set["ExtendedSWHID"]:
"""Fetches the masking database and returns the list of all non-visible SWHIDs"""
import tqdm
from swh.storage.proxies.masking.db import MaskingQuery
if config["masking_db"] is None:
logger.warning("Exporting dataset without masking.")
return set()
masking_query = MaskingQuery.connect(config["masking_db"])
return {
swhid
for (swhid, statuses) in tqdm.tqdm(
masking_query.iter_masked_swhids(),
desc="Listing masked SWHIDs",
unit_scale=True,
)
}
[docs]
def run_export_graph(
config: Dict[str, Any],
export_path: pathlib.Path,
export_formats: List[str],
object_types: List[str],
exclude_obj_types: Set[str],
export_id: Optional[str],
processes: int,
margin: Optional[float],
):
import functools
from importlib import import_module
import logging
import resource
import uuid
from swh.dataset.journalprocessor import ParallelJournalProcessor
logger = logging.getLogger(__name__)
masked_swhids = get_masked_swhids(logger, config)
if not export_id:
export_id = str(uuid.uuid4())
# Enforce order (from origin to contents) to reduce number of holes in the graph.
object_types = [
obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types
]
# ParallelJournalProcessor opens 256 LevelDBs in total. Depending on the number of
# processes, this can exceed the maximum number of file descriptors (soft limit
# defaults to 1024 on Debian), so let's increase it.
(soft, hard) = resource.getrlimit(resource.RLIMIT_NOFILE)
nb_shards = 256 # TODO: make this configurable or detect nb of kafka partitions
open_fds_per_shard = 61 # estimated with plyvel==1.3.0 and libleveldb1d==1.22-3
spare = 1024 # for everything other than LevelDB
want_fd = nb_shards * open_fds_per_shard + spare
if hard < want_fd:
logger.warning(
"Hard limit of open file descriptors (%d) is lower than ideal (%d)",
hard,
want_fd,
)
if soft < want_fd:
want_fd = min(want_fd, hard)
logger.info(
"Soft limit of open file descriptors (%d) is too low, increasing to %d",
soft,
want_fd,
)
resource.setrlimit(resource.RLIMIT_NOFILE, (want_fd, hard))
def importcls(clspath):
mod, cls = clspath.split(":")
m = import_module(mod)
return getattr(m, cls)
exporter_cls = dict(
(fmt, importcls(clspath))
for (fmt, clspath) in AVAILABLE_EXPORTERS.items()
if fmt in export_formats
)
# Run the exporter for each edge type.
parallel_exporters = {}
for obj_type in object_types:
if obj_type in exclude_obj_types:
continue
exporters = [
functools.partial(
exporter_cls[f], config=config, export_path=export_path / f
)
for f in export_formats
]
parallel_exporters[obj_type] = ParallelJournalProcessor(
config,
masked_swhids,
exporters,
export_id,
obj_type,
node_sets_path=export_path / ".node_sets" / obj_type,
processes=processes,
offset_margin=margin,
)
# Fetch all offsets before we start exporting to minimize the time interval
# between the offsets of each topic
parallel_exporters[obj_type].get_offsets()
for obj_type, parallel_exporter in parallel_exporters.items():
print("Exporting {}:".format(obj_type))
parallel_exporter.run()
@graph.command("sort")
@click.argument("export-path", type=click.Path())
@click.pass_context
def sort_graph(ctx, export_path):
config = ctx.obj["config"]
from swh.dataset.exporters.edges import sort_graph_nodes
sort_graph_nodes(export_path, config)
@dataset_cli_group.group("athena")
@click.pass_context
def athena(ctx):
"""Manage and query a remote AWS Athena database"""
pass
@athena.command("create")
@click.option(
"--database-name", "-d", default="swh", help="Name of the database to create"
)
@click.option(
"--location-prefix",
"-l",
required=True,
help="S3 prefix where the dataset can be found",
)
@click.option(
"-o", "--output-location", help="S3 prefix where results should be stored"
)
@click.option(
"-r", "--replace-tables", is_flag=True, help="Replace the tables that already exist"
)
def athena_create(
database_name, location_prefix, output_location=None, replace_tables=False
):
"""Create tables on AWS Athena pointing to a given graph dataset on S3."""
from swh.dataset.athena import create_tables
create_tables(
database_name,
location_prefix,
output_location=output_location,
replace=replace_tables,
)
@athena.command("query")
@click.option(
"--database-name", "-d", default="swh", help="Name of the database to query"
)
@click.option(
"-o", "--output-location", help="S3 prefix where results should be stored"
)
@click.argument("query_file", type=click.File("r"), default=sys.stdin)
def athena_query(
database_name,
query_file,
output_location=None,
):
"""Query the AWS Athena database with a given command"""
from swh.dataset.athena import run_query_get_results
print(
run_query_get_results(
database_name,
query_file.read(),
output_location=output_location,
),
end="",
) # CSV already ends with \n
@athena.command("gensubdataset")
@click.option("--database", "-d", default="swh", help="Name of the base database")
@click.option(
"--subdataset-database",
required=True,
help="Name of the subdataset database to create",
)
@click.option(
"--subdataset-location",
required=True,
help="S3 prefix where the subdataset should be stored",
)
@click.option(
"--swhids",
required=True,
help="File containing the list of SWHIDs to include in the subdataset",
)
def athena_gensubdataset(database, subdataset_database, subdataset_location, swhids):
"""
Generate a subdataset with Athena, from an existing database and a list
of SWHIDs. Athena will generate a new dataset with the same tables as in
the base dataset, but only containing the objects present in the SWHID
list.
"""
from swh.dataset.athena import generate_subdataset
generate_subdataset(
database,
subdataset_database,
subdataset_location,
swhids,
os.path.join(subdataset_location, "queries"),
)