# Copyright (C) 2019-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver"""
from datetime import datetime
from enum import Enum
import logging
import os
from pathlib import Path
import subprocess
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Set,
TypeVar,
Union,
)
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from swh.graph.config import check_config_compress
from .shell import AtomicFileSink, Command, CommandException, Rust
if TYPE_CHECKING:
from .shell import RunResult
logger = logging.getLogger(__name__)
[docs]
class CompressionSubprocessError(Exception):
def __init__(self, message: str, log_path: Path):
super().__init__(f"{message}; full logs at {log_path}")
self.message = message
self.log_path = log_path
[docs]
class CompressionStep(Enum):
EXTRACT_NODES = -20
EXTRACT_LABELS = -10
NODE_STATS = 0
EDGE_STATS = 3
LABEL_STATS = 6
MPH = 10
INITIAL_ORDER = 20
BV = 30
BV_EF = 40
BFS_ROOTS = 50
BFS = 60
PERMUTE_AND_SIMPLIFY_BFS = 70
BFS_EF = 80
BFS_DCF = 90
LLP = 100
COMPOSE_ORDERS = 110
PERMUTE_LLP = 120
EF = 140
TRANSPOSE = 160
TRANSPOSE_EF = 170
MAPS = 180
EXTRACT_PERSONS = 190
PERSONS_STATS = 195
MPH_PERSONS = 200
EXTRACT_FULLNAMES = 203
FULLNAMES_EF = 207
NODE_PROPERTIES = 210
MPH_LABELS = 220
LABELS_ORDER = 225
FCL_LABELS = 230
EDGE_LABELS = 240
EDGE_LABELS_TRANSPOSE = 250
EDGE_LABELS_EF = 270
EDGE_LABELS_TRANSPOSE_EF = 280
STATS = 290
E2E_CHECK = 295
CLEAN_TMP = 300
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
CompressionStepCommand = Union[
Callable[[dict, dict], Optional[Command]],
Callable[[dict, dict], Optional[AtomicFileSink]],
Callable[[dict, dict], Union[Command, AtomicFileSink]],
Callable[[dict, dict], Callable[[logging.Logger], None]],
]
COMP_CMD: Dict[CompressionStep, CompressionStepCommand] = {}
T = TypeVar("T", bound=CompressionStepCommand)
def _compression_step(f: T) -> T:
step_name = f.__name__.upper().lstrip("_")
try:
step = getattr(CompressionStep, step_name)
except AttributeError as e:
raise Exception(f"Unknown compression step name: {step_name}") from e
COMP_CMD[step] = f
return f
@_compression_step
def _extract_nodes(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-extract",
"extract-nodes",
"--format",
"orc",
"--allowed-node-types",
conf.get("object_types", "*"),
f"{conf['in_dir']}",
f"{conf['out_dir']}/{conf['graph_name']}.nodes/",
conf=conf,
env=env,
)
@_compression_step
def _extract_labels(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"dir", "snp", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return None
return Rust(
"swh-graph-extract",
"extract-labels",
"--format",
"orc",
"--allowed-node-types",
conf.get("object_types", "*"),
conf["in_dir"],
conf=conf,
env=env,
) | Command.zstdmt() > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.labels.csv.zst")
)
@_compression_step
def _node_stats(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-extract",
"node-stats",
"--format",
"orc",
"--swhids-dir",
f"{conf['out_dir']}/{conf['graph_name']}.nodes/",
"--target-stats",
f"{conf['out_dir']}/{conf['graph_name']}.nodes.stats.txt",
"--target-count",
f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt",
conf=conf,
env=env,
)
@_compression_step
def _edge_stats(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-extract",
"edge-stats",
"--format",
"orc",
"--allowed-node-types",
conf.get("object_types", "*"),
"--dataset-dir",
f"{conf['in_dir']}",
"--target-stats",
f"{conf['out_dir']}/{conf['graph_name']}.edges.stats.txt",
"--target-count",
f"{conf['out_dir']}/{conf['graph_name']}.edges.count.txt",
conf=conf,
env=env,
)
@_compression_step
def _label_stats(conf: Dict[str, Any], env: Dict[str, str]) -> AtomicFileSink:
if {"dir", "snp", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return Command.echo("0") > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt")
)
return Command.zstdcat(
f"{conf['out_dir']}/{conf['graph_name']}.labels.csv.zst"
) | Command.wc("-l") > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt")
)
@_compression_step
def _mph(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
with open(
f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt", "r"
) as nodes_count:
num_nodes = nodes_count.readline().splitlines()
assert len(num_nodes) == 1
return Rust(
"swh-graph-compress",
"pthash-swhids",
"--num-nodes",
num_nodes[0],
f"{conf['out_dir']}/{conf['graph_name']}.nodes/",
f"{conf['out_dir']}/{conf['graph_name']}.pthash",
conf=conf,
env=env,
)
@_compression_step
def _initial_order(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if not conf.get("previous_graph_path"):
return None
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
return Rust(
"swh-graph-compress",
"initial-order",
"--mph-algo",
"pthash",
"--function",
f"{conf['out_dir']}/{conf['graph_name']}.pthash",
"--num-nodes",
num_nodes,
"--previous-node2swhid",
f"{conf['previous_graph_path']}.node2swhid.bin",
"--target-order",
f"{conf['out_dir']}/{conf['graph_name']}-base.order",
)
@_compression_step
def _bv(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
order = (
["--order", f"{conf['out_dir']}/{conf['graph_name']}-base.order"]
if "previous_graph_path" in conf
else []
)
return Rust(
"swh-graph-extract",
"bv",
"--allowed-node-types",
conf.get("object_types", "*"),
"--mph-algo",
"pthash",
"--function",
f"{conf['out_dir']}/{conf['graph_name']}",
"--num-nodes",
num_nodes,
*order,
f"{conf['in_dir']}",
f"{conf['out_dir']}/{conf['graph_name']}-base",
conf=conf,
env=env,
)
@_compression_step
def _bv_ef(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-index",
"ef",
f"{conf['out_dir']}/{conf['graph_name']}-base",
conf=conf,
env=env,
)
@_compression_step
def _bfs_roots(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-extract",
"bfs-roots",
"--allowed-node-types",
conf.get("object_types", "*"),
f"{conf['in_dir']}",
f"{conf['out_dir']}/{conf['graph_name']}-bfs.roots.txt",
conf=conf,
env=env,
)
@_compression_step
def _bfs(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
order = (
["--order", f"{conf['out_dir']}/{conf['graph_name']}-base.order"]
if "previous_graph_path" in conf
else []
)
return Rust(
"swh-graph-compress",
"bfs",
"--mph-algo",
"pthash",
"--function",
f"{conf['out_dir']}/{conf['graph_name']}.pthash",
"--init-roots",
f"{conf['out_dir']}/{conf['graph_name']}-bfs.roots.txt",
*order,
f"{conf['out_dir']}/{conf['graph_name']}-base",
f"{conf['out_dir']}/{conf['graph_name']}-bfs.order",
conf=conf,
env=env,
)
@_compression_step
def _permute_and_simplify_bfs(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-compress",
"permute-and-symmetrize",
f"{conf['out_dir']}/{conf['graph_name']}-base",
f"{conf['out_dir']}/{conf['graph_name']}-bfs-simplified",
"--permutation",
f"{conf['out_dir']}/{conf['graph_name']}-bfs.order",
conf=conf,
env=env,
)
@_compression_step
def _bfs_ef(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-index",
"ef",
f"{conf['out_dir']}/{conf['graph_name']}-bfs-simplified",
conf=conf,
env=env,
)
@_compression_step
def _bfs_dcf(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-index",
"dcf",
f"{conf['out_dir']}/{conf['graph_name']}-bfs-simplified",
conf=conf,
env=env,
)
@_compression_step
def _llp(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-compress",
"llp",
"-g",
f"{conf['llp_gammas']}",
f"{conf['out_dir']}/{conf['graph_name']}-bfs-simplified",
f"{conf['out_dir']}/{conf['graph_name']}-llp.order",
conf=conf,
env=env,
)
@_compression_step
def _compose_orders(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
base_order = (
["--input", f"{conf['out_dir']}/{conf['graph_name']}-base.order"]
if "previous_graph_path" in conf
else []
)
return Rust(
"swh-graph-compress",
"compose-orders",
"--num-nodes",
num_nodes,
*base_order,
"--input",
f"{conf['out_dir']}/{conf['graph_name']}-bfs.order",
"--input",
f"{conf['out_dir']}/{conf['graph_name']}-llp.order",
"--output",
f"{conf['out_dir']}/{conf['graph_name']}.pthash.order",
conf=conf,
env=env,
)
@_compression_step
def _permute_llp(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
# must not apply -base.order if present, because it was already applied
# to generate -base.graph
# Note that in the case where -base.order does not exist, we could apply
# .pthash.order only instead of applying both -bfs.order and -llp.order
# because it may be slightly faster, but we don't for the sake of simplicity.
return Rust(
"swh-graph-compress",
"permute",
f"{conf['out_dir']}/{conf['graph_name']}-base",
f"{conf['out_dir']}/{conf['graph_name']}",
"--permutation",
f"{conf['out_dir']}/{conf['graph_name']}-bfs.order",
"--permutation",
f"{conf['out_dir']}/{conf['graph_name']}-llp.order",
conf=conf,
env=env,
)
@_compression_step
def _ef(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-index",
"ef",
f"{conf['out_dir']}/{conf['graph_name']}",
conf=conf,
env=env,
)
@_compression_step
def _transpose(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-compress",
"transpose",
f"{conf['out_dir']}/{conf['graph_name']}",
f"{conf['out_dir']}/{conf['graph_name']}-transposed",
conf=conf,
env=env,
)
@_compression_step
def _transpose_ef(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-index",
"ef",
f"{conf['out_dir']}/{conf['graph_name']}-transposed",
conf=conf,
env=env,
)
@_compression_step
def _maps(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
return Rust(
"swh-graph-compress",
"maps",
"--num-nodes",
num_nodes,
"--swhids-dir",
f"{conf['out_dir']}/{conf['graph_name']}.nodes/",
"--mph-algo",
"pthash",
"--function",
f"{conf['out_dir']}/{conf['graph_name']}.pthash",
"--order",
f"{conf['out_dir']}/{conf['graph_name']}.pthash.order",
"--node2swhid",
f"{conf['out_dir']}/{conf['graph_name']}.node2swhid.bin",
"--node2type",
f"{conf['out_dir']}/{conf['graph_name']}.node2type.bin",
conf=conf,
env=env,
)
@_compression_step
def _extract_persons(conf: Dict[str, Any], env: Dict[str, str]) -> AtomicFileSink:
if {"rel", "rev", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return Command.echo("") | Command.zstdmt() > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.persons.csv.zst")
)
return Rust(
"swh-graph-extract",
"extract-persons",
"--allowed-node-types",
conf.get("object_types", "*"),
f"{conf['in_dir']}",
conf=conf,
env=env,
) | Command.zstdmt() > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.persons.csv.zst")
)
@_compression_step
def _mph_persons(
conf: Dict[str, Any], env: Dict[str, str]
) -> Union[Command, AtomicFileSink]:
with open(
f"{conf['out_dir']}/{conf['graph_name']}.persons.count.txt"
) as persons_count:
num_persons = persons_count.readline().splitlines()
assert len(num_persons) == 1
if num_persons[0] == "0":
return Command.echo("") > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.persons.pthash")
)
return Rust(
"swh-graph-compress",
"pthash-persons",
"--num-persons",
num_persons[0],
Command.zstdcat(
f"{conf['out_dir']}/{conf['graph_name']}.persons.csv.zst",
),
f"{conf['out_dir']}/{conf['graph_name']}.persons.pthash",
conf=conf,
env=env,
)
@_compression_step
def _extract_fullnames(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"rel", "rev", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return None
if "sensitive_in_dir" not in conf:
return None
if "sensitive_out_dir" not in conf:
return None
if not (
Path(f"{conf['out_dir']}/{conf['graph_name']}.persons.count.txt").exists()
and Path(f"{conf['sensitive_in_dir']}/orc/person").exists()
):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.persons.count.txt"
) as persons_count:
num_persons = persons_count.readline().splitlines()
assert len(num_persons) == 1
if num_persons[0] == "0":
return None
return Rust(
"swh-graph-extract",
"extract-fullnames",
"--person-function",
f"{conf['out_dir']}/{conf['graph_name']}.persons.pthash",
f"{conf['sensitive_in_dir']}/orc",
f"{conf['sensitive_out_dir']}/{conf['graph_name']}.persons",
f"{conf['sensitive_out_dir']}/{conf['graph_name']}.persons.lengths",
conf=conf,
env=env,
)
@_compression_step
def _fullnames_ef(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"rel", "rev", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return None
if "sensitive_in_dir" not in conf:
return None
if "sensitive_out_dir" not in conf:
return None
if not (
Path(f"{conf['out_dir']}/{conf['graph_name']}.persons.count.txt").exists()
and Path(f"{conf['sensitive_in_dir']}/orc/person").exists()
):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.persons.count.txt"
) as persons_count:
num_persons = persons_count.readline().splitlines()
assert len(num_persons) == 1
if num_persons[0] == "0":
return None
return Rust(
"swh-graph-index",
"fullnames-ef",
"--num-persons",
num_persons[0],
f"{conf['sensitive_out_dir']}/{conf['graph_name']}.persons",
f"{conf['sensitive_out_dir']}/{conf['graph_name']}.persons.lengths",
f"{conf['sensitive_out_dir']}/{conf['graph_name']}.persons.ef",
conf=conf,
env=env,
)
@_compression_step
def _persons_stats(conf: Dict[str, Any], env: Dict[str, str]) -> AtomicFileSink:
if {"rel", "rev", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return Command.echo("0") > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.persons.count.txt")
)
return Command.zstdcat(
f"{conf['out_dir']}/{conf['graph_name']}.persons.csv.zst"
) | Command.wc("-l") > AtomicFileSink(
Path(f"{conf['out_dir']}/{conf['graph_name']}.persons.count.txt")
)
@_compression_step
def _node_properties(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
return Rust(
"swh-graph-extract",
"node-properties",
"--format",
"orc",
"--allowed-node-types",
conf.get("object_types", "*"),
"--mph-algo",
"pthash",
"--function",
f"{conf['out_dir']}/{conf['graph_name']}",
"--order",
f"{conf['out_dir']}/{conf['graph_name']}.pthash.order",
"--person-function",
f"{conf['out_dir']}/{conf['graph_name']}.persons.pthash",
"--num-nodes",
num_nodes,
f"{conf['in_dir']}",
f"{conf['out_dir']}/{conf['graph_name']}",
conf=conf,
env=env,
)
@_compression_step
def _mph_labels(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"dir", "snp", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt"
) as labels_count:
num_labels = labels_count.readline().splitlines()
assert len(num_labels) == 1
if num_labels[0] == "0":
return None
return Rust(
"swh-graph-compress",
"pthash-labels",
"--num-labels",
num_labels[0],
Command.zstdcat(f"{conf['out_dir']}/{conf['graph_name']}.labels.csv.zst"),
f"{conf['out_dir']}/{conf['graph_name']}.labels.pthash",
conf=conf,
env=env,
)
@_compression_step
def _labels_order(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"dir", "snp", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt"
) as labels_count:
num_labels = labels_count.readline().splitlines()
assert len(num_labels) == 1
if num_labels[0] == "0":
return None
return Rust(
"swh-graph-compress",
"pthash-labels-order",
"--num-labels",
num_labels[0],
Command.zstdcat(f"{conf['out_dir']}/{conf['graph_name']}.labels.csv.zst"),
f"{conf['out_dir']}/{conf['graph_name']}.labels.pthash",
f"{conf['out_dir']}/{conf['graph_name']}.labels.pthash.order",
conf=conf,
env=env,
)
@_compression_step
def _fcl_labels(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"dir", "snp", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt"
) as labels_count:
num_labels = labels_count.readline().splitlines()
assert len(num_labels) == 1
return Rust(
"swh-graph-compress",
"fcl",
"--num-lines",
num_labels[0],
Command.zstdcat(f"{conf['out_dir']}/{conf['graph_name']}.labels.csv.zst"),
f"{conf['out_dir']}/{conf['graph_name']}.labels.fcl",
conf=conf,
env=env,
)
@_compression_step
def _edge_labels(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"dir", "snp", "ori", "*"}.isdisjoint(
set(conf.get("object_types", "*").split(","))
):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt"
) as labels_count:
num_labels = labels_count.readline().splitlines()
assert len(num_labels) == 1
if num_labels[0] == "0":
return None
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
if num_nodes == "0":
return None
return Rust(
"swh-graph-extract",
"edge-labels",
"--allowed-node-types",
conf.get("object_types", "*"),
"--mph-algo",
"pthash",
"--function",
f"{conf['out_dir']}/{conf['graph_name']}",
"--order",
f"{conf['out_dir']}/{conf['graph_name']}.pthash.order",
"--label-name-mphf",
f"{conf['out_dir']}/{conf['graph_name']}.labels.pthash",
"--label-name-order",
f"{conf['out_dir']}/{conf['graph_name']}.labels.pthash.order",
"--num-nodes",
num_nodes,
f"{conf['in_dir']}",
f"{conf['out_dir']}/{conf['graph_name']}",
conf=conf,
env=env,
)
@_compression_step
def _edge_labels_transpose(
conf: Dict[str, Any], env: Dict[str, str]
) -> Optional[Command]:
if {"dir", "snp", "ori", "*"}.isdisjoint(
set(conf.get("object_types", "*").split(","))
):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt"
) as labels_count:
num_labels = labels_count.readline().splitlines()
assert len(num_labels) == 1
if num_labels[0] == "0":
return None
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
if num_nodes == "0":
return None
return Rust(
"swh-graph-extract",
"edge-labels",
"--allowed-node-types",
conf.get("object_types", "*"),
"--mph-algo",
"pthash",
"--function",
f"{conf['out_dir']}/{conf['graph_name']}",
"--order",
f"{conf['out_dir']}/{conf['graph_name']}.pthash.order",
"--label-name-mphf",
f"{conf['out_dir']}/{conf['graph_name']}.labels.pthash",
"--label-name-order",
f"{conf['out_dir']}/{conf['graph_name']}.labels.pthash.order",
"--num-nodes",
num_nodes,
"--transposed",
f"{conf['in_dir']}",
f"{conf['out_dir']}/{conf['graph_name']}-transposed",
conf=conf,
env=env,
)
@_compression_step
def _edge_labels_ef(conf: Dict[str, Any], env: Dict[str, str]) -> Optional[Command]:
if {"dir", "snp", "ori", "*"}.isdisjoint(
set(conf.get("object_types", "*").split(","))
):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt"
) as labels_count:
(num_labels,) = labels_count.readline().splitlines()
if num_labels == "0":
return None
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
return Rust(
"swh-graph-index",
"labels-ef",
f"{conf['out_dir']}/{conf['graph_name']}-labelled",
num_nodes,
conf=conf,
env=env,
)
@_compression_step
def _edge_labels_transpose_ef(
conf: Dict[str, Any], env: Dict[str, str]
) -> Optional[Command]:
if {"dir", "snp", "*"}.isdisjoint(set(conf.get("object_types", "*").split(","))):
return None
with open(
f"{conf['out_dir']}/{conf['graph_name']}.labels.count.txt"
) as labels_count:
(num_labels,) = labels_count.readline().splitlines()
if num_labels == "0":
return None
with open(f"{conf['out_dir']}/{conf['graph_name']}.nodes.count.txt") as nodes_count:
(num_nodes,) = nodes_count.readline().splitlines()
return Rust(
"swh-graph-index",
"labels-ef",
f"{conf['out_dir']}/{conf['graph_name']}-transposed-labelled",
num_nodes,
conf=conf,
env=env,
)
@_compression_step
def _stats(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Rust(
"swh-graph-compress",
"stats",
"--graph",
f"{conf['out_dir']}/{conf['graph_name']}",
"--stats",
f"{conf['out_dir']}/{conf['graph_name']}.stats",
conf=conf,
env=env,
)
@_compression_step
def _e2e_check(
conf: Dict[str, Any], env: Dict[str, str]
) -> Callable[[logging.Logger], None]:
from swh.graph.e2e_check import run_e2e_check
return lambda logger: run_e2e_check(
graph_name=conf["graph_name"],
in_dir=conf["in_dir"],
out_dir=conf["out_dir"],
sensitive_in_dir=conf.get("sensitive_in_dir"),
sensitive_out_dir=conf.get("sensitive_out_dir"),
check_flavor=conf.get("check_flavor", "full"),
profile=conf.get("profile", "release"),
logger=logger,
)
@_compression_step
def _clean_tmp(conf: Dict[str, Any], env: Dict[str, str]) -> Command:
return Command.rm(
"-rf",
*Path(conf["out_dir"]).glob(f"{conf['graph_name']}-base.*"),
*Path(conf["out_dir"]).glob(f"{conf['graph_name']}-bfs-simplified.*"),
f"{conf['out_dir']}/{conf['graph_name']}-bfs.order",
f"{conf['out_dir']}/{conf['graph_name']}-llp.order",
f"{conf['out_dir']}/{conf['graph_name']}.nodes/",
f"{conf['out_dir']}/{conf['graph_name']}-bfs.roots.txt",
f"{conf['out_dir']}/{conf['graph_name']}.labels.csv.zst",
f"{conf['out_dir']}/{conf['graph_name']}.persons.csv.zst",
f"{conf['tmp_dir']}",
)
[docs]
def do_step(step, conf, env=None) -> "List[RunResult]":
if env is None:
env = os.environ.copy()
env["TMPDIR"] = conf["tmp_dir"]
env["TZ"] = "UTC"
log_dir = Path(conf["out_dir"]) / "logs"
log_dir.mkdir(exist_ok=True)
step_logger = logger.getChild(f"steps.{step.name.lower()}")
if not step_logger.isEnabledFor(logging.INFO):
# Ensure that at least INFO messages are sent, because it is the level we use
# for the stdout of Rust processes. These processes can take a long time to
# run, and it would be very annoying to have to run them again just because
# they crashed with no log.
step_logger.setLevel(logging.INFO)
log_path = log_dir / (
f"{conf['graph_name']}"
f"-{int(datetime.now().timestamp() * 1000)}"
f"-{str(step).lower()}.log"
)
step_handler = logging.FileHandler(log_path)
step_logger.addHandler(step_handler)
step_start_time = datetime.now()
step_logger.info("Starting compression step %s at %s", step, step_start_time)
command = COMP_CMD[step](conf, env)
if command is None:
step_logger.info("Compression step %s skipped", step)
step_logger.removeHandler(step_handler)
step_handler.close()
return []
step_start_time = datetime.now()
step_logger.info("Starting compression step %s at %s", step, step_start_time)
step_logger.info("Running: %s", command.__str__())
if isinstance(command, (Command, AtomicFileSink)):
running_command = command._run(
stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
with running_command.stdout() as stdout:
for line in stdout:
step_logger.info(line.rstrip().decode(errors="replace"))
try:
results = running_command.wait()
except CommandException as e:
msg = f"Compression step {step} returned non-zero exit code {e.returncode}"
step_logger.critical(msg)
raise CompressionSubprocessError(msg, log_path)
step_end_time = datetime.now()
step_duration = step_end_time - step_start_time
step_logger.info(
"Compression step %s finished at %s (in %s)",
step,
step_end_time,
step_duration,
)
step_logger.removeHandler(step_handler)
step_handler.close()
else:
# This allows for calling Python functions directly
try:
results = command(step_logger)
except Exception as exc:
msg = f"Compression step {step} failed with the following error: {exc}"
step_logger.critical(msg)
raise CompressionSubprocessError(msg, log_path)
return results
[docs]
def compress(
graph_name: str,
in_dir: str,
out_dir: str,
sensitive_in_dir: Optional[str],
sensitive_out_dir: Optional[str],
check_flavor: Optional[str],
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {},
progress_cb: Callable[[int, CompressionStep], None] = lambda percentage, step: None,
):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
Args:
graph_name: graph base name, relative to in_dir
in_dir: input directory, where the uncompressed graph can be found
out_dir: output directory, where the compressed graph will be stored
sensitive_in_dir: sensitive input directory, where the uncompressed
sensitive graph can be found
sensitive_out_dir: sensitive output directory, where the compressed
sensitive graph will be stored
check_flavor: which flavor of checks to run
steps: compression steps to run (default: all steps)
conf: compression configuration, supporting the following keys (all are
optional, so an empty configuration is fine and is the default)
- tmp_dir: temporary directory, defaults to the "tmp" subdir of
out_dir
- object_types: comma-separated list of object types to extract
(eg. ``ori,snp,rel,rev``). Defaults to ``*``.
progress_cb: a callable taking a percentage and step as argument,
which is called every time a step starts.
"""
if not steps:
steps = set(COMP_SEQ)
conf = check_config_compress(
conf,
graph_name,
in_dir,
out_dir,
sensitive_in_dir,
sensitive_out_dir,
check_flavor,
)
compression_start_time = datetime.now()
logger.info("Starting compression at %s", compression_start_time)
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logger.debug("Skipping compression step %s", step)
continue
seq_no += 1
logger.info("Running compression step %s (%s/%s)", step, seq_no, len(steps))
progress_cb(seq_no * 100 // len(steps), step)
do_step(step, conf)
compression_end_time = datetime.now()
compression_duration = compression_end_time - compression_start_time
logger.info("Completed compression in %s", compression_duration)