# Copyright (C) 2025-2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
r"""
Luigi tasks to measure institutional impact
===========================================
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
computing the impact of an institution across all origins.
Institutions are identified by a regular expression on authors' email address,
such as ``.*@softwareheritage.org``.
Output is written to the directory configured with ``impact_dir``, which contains:
* :file:`persons.csv`: a CSV file with header ``fullname_base64,email_base64``.
It is filled from the "sensitive export"; which complements public exports
dated 2026-03-02 or newer. For older exports, it can be generated with::
$ psql service=swh
softwareheritage=> \copy (select translate(encode(fullname, 'base64'), E'\n', '') as fullname_base64, translate(encode(email, 'base64'), E'\n', '') as email_base64 from person where encode(email, 'escape') ~ '.*@(.*\.)?softwareheritage.org$') to '/path/to/impact/dir/persons.csv' with (delimiter ',', header true)
* :file:`raw_origins.csv.zst`: all origins that contain a commit authored or committed
by someone listed in :file:`persons.csv`
* :file:`indexed_origins.csv.zst`: similar to :file:`raw_origins.csv.zst` but with some
irrelevant origins filtered out, and enriched with metadata from :ref:`swh-indexer`
and :ref:`swh-scheduler`.
* :file:`revrels`: a directory containing all revisions in any origin listed in
:file:`indexed_origins.csv.zst`, as Parquet files
* :file:`origin_revrels`: a directory describing the many-to-many relation between origins
listed in :file:`indexed_origins.csv.zst` and revisions in :file:`revrels`. To avoid the
combinatorial explosion of writing every (origin, revrel) pair, the relation is decomposed
via an intermediate "oriset" (origin set) identifier:
* :file:`origin_revrels/orisets_of_origin`
* :file:`origin_revrels/revrels_in_oriset`
""" # noqa
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import datetime
import logging
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Type,
TypedDict,
TypeVar,
Union,
)
import luigi
from swh.export.luigi import LocalExport
from swh.graph.libs.luigi.topology import (
ComputeGenerations,
CountDescendants,
CountPaths,
)
from swh.graph.luigi.compressed_graph import LocalGraph
if TYPE_CHECKING:
from swh.indexer.storage.model import OriginExtrinsicMetadataRow
from swh.scheduler.interface import SchedulerInterface
class ParsedListedOrigin(TypedDict, total=False):
url: str
last_update: Optional[datetime.datetime]
enabled: bool
else:
ParsedListedOrigin = dict
logger = logging.getLogger(__name__)
[docs]
class SelectPersons(luigi.Task):
"""Extract persons from the output of :ref:`swh-export` based on the provided regexp"""
local_sensitive_export_path = luigi.PathParameter()
impact_dir = luigi.PathParameter()
email_regexp = luigi.StrParameter(
significant=False,
description="""
A regular expression, in the RE2 syntax (https://github.com/google/re2/wiki/Syntax)
to select which email addresses to include in this impact dataset.
It must match the whole email.
If it matches the empty string, persons with no parsable email are selected too.
Example: ``.*@(softwareheritage.org|inria.fr)``
""",
)
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`LocalExport`"""
return {
"export": LocalExport(
local_sensitive_export_path=self.local_sensitive_export_path,
),
}
[docs]
def output(self) -> luigi.LocalTarget:
"""Returns ``impact_dir / "persons.csv"``"""
return luigi.LocalTarget(self.impact_dir / "persons.csv")
[docs]
def run(self) -> None:
"""Reads the input .orc, filters persons matching the regexp, and writes them to
the output .csv"""
import base64
import csv
import os
import pyarrow.compute
import pyarrow.orc
import tqdm
# TODO: rewrite this using a single Datafusion SQL query, once we switch
# from ORC to Parquet
tmp_path = Path(f"{self.output().path}.tmp")
tmp_path.parent.mkdir(parents=True, exist_ok=True)
with tmp_path.open("wt") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(("fullname_base64", "email_base64"))
persons_dir = self.local_sensitive_export_path / "orc" / "person"
files = [pyarrow.orc.ORCFile(path) for path in persons_dir.iterdir()]
total_rows = sum(file.nrows for file in files)
with tqdm.tqdm(total=total_rows, desc="Filtering persons") as pbar:
for file in files:
for stripe_id in range(file.nstripes):
stripe = file.read_stripe(stripe_id, columns=["fullname"])
emails = pyarrow.compute.extract_regex(
stripe.column("fullname"), pattern=".*<(?P<email>.*?)>"
).field("email")
mask = pyarrow.compute.match_substring_regex(
emails,
pattern="^"
+ self.email_regexp.removeprefix("^").removesuffix("$")
+ "$",
)
emails = emails.filter(mask)
fullnames = stripe.column("fullname").filter(mask)
for fullname, email in zip(fullnames, emails):
writer.writerow(
(
base64.b64encode(fullname.as_py()).decode(),
base64.b64encode(email.as_py()).decode(),
)
)
pbar.update(stripe.num_rows)
os.rename(tmp_path, self.output().path)
[docs]
class ComputeRawImpact(luigi.Task):
"""Creates a file that list all origins that contains revrels from a given set
of persons, as well as the number of revrels and first/latest timestamp for each origin.
"""
local_graph_path = luigi.PathParameter()
local_sensitive_graph_path = luigi.OptionalPathParameter(default=None)
graph_name = luigi.StrParameter(default="graph")
impact_dir = luigi.PathParameter()
output_emails = luigi.BoolParameter(default=False)
include_ranges = luigi.StrParameter(
default="",
description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or "
"date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). "
"If provided, only revisions and releases with an author *or* committer date in "
"one of these ranges are considered.",
)
exclude_ranges = luigi.StrParameter(
default="",
description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or "
"date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). "
"Author and committer dates in these ranges are not considered for --include-ranges.",
)
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`,
:class:`swh.graph.libs.luigi.topology.ComputeGenerations`,
and two instances of each of :class:`swh.graph.libs.luigi.topology.CountPaths`
and :class:`swh.graph.libs.luigi.topology.CountDescendants` (forward and backward)
"""
all_object_types = "cnt,dir,rev,rel,snp,ori"
histhost_object_types = "rev,rel,snp,ori"
deps: Dict[str, luigi.Task] = {
"graph": LocalGraph(
local_graph_path=self.local_graph_path,
local_sensitive_graph_path=self.local_sensitive_graph_path,
),
"persons": SelectPersons(
impact_dir=self.impact_dir,
),
}
# in this task, we only use forward counts, because we only annotate origins,
# which have no predecessor, so their backward counts would always be 1
direction = "forward"
kwargs = dict(
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
write_parquet=False,
direction=direction,
)
deps[f"{direction}_paths"] = CountPaths(object_types=all_object_types, **kwargs)
deps[f"{direction}_descendants"] = CountDescendants(
object_types=all_object_types,
**kwargs,
)
deps[f"histhost_{direction}_paths"] = CountPaths(
object_types=histhost_object_types, **kwargs
)
deps[f"histhost_{direction}_descendants"] = CountDescendants(
object_types=histhost_object_types,
**kwargs,
)
return deps
[docs]
def output(self) -> Dict[str, luigi.LocalTarget]:
"""``{"origin_csv": self.impact_dir / "raw_origins.csv.zst"}``"""
return {
"origin_csv": luigi.LocalTarget(self.impact_dir / "raw_origins.csv.zst")
}
[docs]
def run(self) -> None:
"""Runs 'impact' and compresses"""
from swh.datasets.shell import Rust
from swh.graph.shell import AtomicFileSink, Command
args = [
"impact",
"--graph",
self.local_graph_path / self.graph_name,
]
if self.local_sensitive_graph_path:
args.extend(
["--sensitive-graph", self.local_sensitive_graph_path / self.graph_name]
)
if self.output_emails:
args.append("--output-emails")
if self.include_ranges:
for range_ in self.include_ranges.split():
args.extend(["--include-range", range_])
if self.exclude_ranges:
for range_ in self.exclude_ranges.split():
args.extend(["--exclude-range", range_])
direction = "forward" # ditto
for prefix in ("", "histhost_"):
paths_dir = self.input()[f"{prefix}{direction}_paths"]["arrays"].path
descendants_dir = self.input()[f"{prefix}{direction}_descendants"][
"bitfieldvec"
].path
args.extend(
[
"--paths-dir",
f"{prefix}{direction}={paths_dir}",
"--descendants-dir",
f"{prefix}{direction}={descendants_dir}",
]
)
# fmt: off
(
Command.cat(self.input()["persons"])
| Rust(*args)
| Command.zstdmt("-19")
> AtomicFileSink(self.output()["origin_csv"].path)
).run()
# fmt: on
[docs]
def opt_int(s: str) -> Optional[int]:
if s == "":
return None
else:
return int(s)
D = TypeVar("D")
[docs]
def typeddict_parser(dt: Type[D]) -> Callable[[Dict[str, str]], D]:
import inspect
converters = {}
for key, type_ in inspect.get_annotations(dt).items():
if type_ is str:
converters[key] = lambda x: x
elif type_ is int:
converters[key] = int
elif type_ is Optional[int]:
converters[key] = opt_int
elif type_ is float:
converters[key] = float
else:
raise TypeError(f"Unsupported type {type_} for key {key} of {dt.__name__}")
def parse(d):
parsed = {}
unknown_keys = d.keys() - converters.keys()
missing_keys = converters.keys() - d.keys()
if unknown_keys:
raise TypeError(f"Dict has extra keys for {dt.__name__}: {unknown_keys}")
if missing_keys:
raise TypeError(f"Dict is missing keys for {dt.__name__}: {missing_keys}")
for key, value in d.items():
if key in converters:
parsed[key] = converters[key](value)
return parsed
return parse
[docs]
class OriginWithEmails(TypedDict):
origin_id: int
origin_url: str
num_contributed_revs: int
num_contributed_rels: int
num_contributed_revs_in_main_branch: Optional[int]
total_revs: int
total_rels: int
total_revs_in_main_branch: Optional[int]
first_contributed_revrel: str
first_contributed_revrel_ts: Optional[int]
first_contributed_revrel_author: str
first_contributed_revrel_committer: str
last_contributed_revrel: str
last_contributed_revrel_ts: Optional[int]
last_contributed_revrel_author: str
last_contributed_revrel_committer: str
first_revrel_ts: Optional[int]
last_revrel_ts: Optional[int]
codemeta_json: str
citation_cff: str
forward_paths: float
forward_paths_to_roots: float
forward_descendants: int
histhost_forward_paths: float
histhost_forward_paths_to_roots: float
histhost_forward_descendants: int
[docs]
class OriginWithoutEmails(TypedDict):
origin_id: int
origin_url: str
num_contributed_revs: int
num_contributed_rels: int
num_contributed_revs_in_main_branch: Optional[int]
total_revs: int
total_rels: int
total_revs_in_main_branch: Optional[int]
first_contributed_revrel: str
first_contributed_revrel_ts: Optional[int]
last_contributed_revrel: str
last_contributed_revrel_ts: Optional[int]
first_revrel_ts: Optional[int]
last_revrel_ts: Optional[int]
codemeta_json: str
citation_cff: str
forward_paths: float
forward_paths_to_roots: float
forward_descendants: int
histhost_forward_paths: float
histhost_forward_paths_to_roots: float
histhost_forward_descendants: int
[docs]
class ComputeIndexedImpact(luigi.Task):
"""Removes forks from :class:`ComputeRawImpact`'s output, unless they contain more
revrels (or older/newer ones) than the upstream origin."""
indexer_storage_url = luigi.StrParameter(significant=False)
swh_scheduler_url = luigi.StrParameter(significant=False)
FORK_FILTERS = [
"all",
"none",
"without-upstream-contribution",
"with-original-content",
]
local_graph_path = luigi.PathParameter()
graph_name = luigi.StrParameter(default="graph")
impact_dir = luigi.PathParameter()
output_emails = luigi.BoolParameter(default=False)
fork_filter = luigi.ChoiceParameter(
choices=FORK_FILTERS,
default="with-original-content",
description="'all' includes all forks. 'none' excludes all forks. "
"'without-upstream-contribution' excludes forks of an upstream that does not have "
"original content from one of the persons."
"'with-original-content' extends this to include forks that have more commits "
"from the persons than the upstream origin.",
)
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`
and :class:`swh.graph.libs.luigi.topology.ComputeGenerations`."""
return {
"graph": LocalGraph(local_graph_path=self.local_graph_path),
"raw_impact": ComputeRawImpact(
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
impact_dir=self.impact_dir,
output_emails=self.output_emails,
),
}
[docs]
def output(self) -> Dict[str, luigi.Target]:
"""``{"origin_csv": self.impact_dir / "indexed_origins.csv.zst"}``"""
return {
"origin_csv": luigi.LocalTarget(self.impact_dir / "indexed_origins.csv.zst")
}
[docs]
def run(self) -> None:
"""Reads input CSV, optionally filters out some lines (forks), adds columns,
writes it again."""
import csv
import inspect
import os
import pyzstd
import tqdm
with open(self.input()["raw_impact"]["origin_csv"].path, "rb") as in_fd:
with pyzstd.open(in_fd, "rt") as f:
reader = csv.DictReader(f)
origins: Union[
Dict[str, OriginWithEmails],
Dict[str, OriginWithoutEmails],
]
if self.output_emails:
header = list(inspect.get_annotations(OriginWithEmails))
parse_origin_with_emails = typeddict_parser(OriginWithEmails)
origins = {
row["origin_url"]: parse_origin_with_emails(row)
for row in tqdm.tqdm(reader, desc="Reading raw origin stats")
}
else:
header = list(inspect.get_annotations(OriginWithoutEmails))
parse_origin_without_emails = typeddict_parser(OriginWithoutEmails)
origins = {
row["origin_url"]: parse_origin_without_emails(row)
for row in tqdm.tqdm(reader, desc="Reading raw origin stats")
}
origins_metadata = self._get_origins_metadata(list(origins))
listed_origins = self._get_listed_origins(list(origins))
output_path = self.impact_dir / "indexed_origins.csv.zst"
tmp_output_path = f"{output_path}.tmp"
kept_origin_ids: set[int] = set()
with open(tmp_output_path, "wb") as out_fd:
with pyzstd.open(out_fd, "wt") as f:
writer = csv.DictWriter(
f,
(
*header,
"last_visit_ts",
"visits_enabled",
"num_forks",
"num_likes",
"num_followers",
"creation_date",
"modification_date",
"licenses",
"keywords",
"programming_languages",
"description",
"forked_from",
"is_fork_with_newer_contributions",
),
)
writer.writeheader()
for origin_url in tqdm.tqdm(
origins,
desc=(
"Writing"
if self.fork_filter == "all"
else "Filtering out uninteresting forks and writing"
),
):
origin_metadata = origins_metadata.get(origin_url)
if origin_metadata is None:
extrinsic_metadata = {}
else:
extrinsic_metadata = self._parse_origin_metadata(
origin_url, origin_metadata.metadata
)
upstream_origin_url = extrinsic_metadata.get("forked_from")
num_likes = extrinsic_metadata.get("num_likes", 0)
num_followers = extrinsic_metadata.get("num_followers", 0)
assert upstream_origin_url is None or isinstance(
upstream_origin_url, str
), (origin_url, upstream_origin_url)
assert isinstance(num_likes, int), (origin_url, num_likes)
assert isinstance(num_followers, int), (origin_url, num_followers)
origin = origins[origin_url]
listed_origin: ParsedListedOrigin = listed_origins.get(
origin_url, {}
)
if upstream_origin_url is None:
is_fork_with_newer_contributions = None
else:
try:
upstream = origins[upstream_origin_url]
except KeyError:
# unknown upstream, so let's assume the fork is interesting
is_fork_with_newer_contributions = True
else:
is_fork_with_newer_contributions = (
origin["num_contributed_revs"]
> upstream["num_contributed_revs"]
or origin["num_contributed_rels"]
> upstream["num_contributed_rels"]
or (
(
origin["last_contributed_revrel_ts"] is not None
and upstream["last_contributed_revrel_ts"]
is not None
and origin["last_contributed_revrel_ts"]
> upstream["last_contributed_revrel_ts"]
)
or (
origin["last_contributed_revrel_ts"] is not None
and upstream["last_contributed_revrel_ts"]
is None
)
)
)
if upstream_origin_url is None:
pass # not a fork, we keep it
elif self.fork_filter == "none":
continue # exclude all forks
elif self.fork_filter == "all":
pass # include all forks
elif self.fork_filter == "without-upstream-contribution":
if upstream_origin_url in origins:
continue # upstream has contributions
else:
pass # upstream does not
elif self.fork_filter == "with-original-content":
if (
num_likes <= 1
and num_followers <= 1
and not is_fork_with_newer_contributions
):
# uninteresting fork
continue
else:
assert False, f"unknown fork filter: {self.fork_filter!r}"
kept_origin_ids.add(origin["origin_id"])
writer.writerow(
{
**origin,
"last_visit_ts": listed_origin.get("last_update", ""),
"visits_enabled": listed_origin.get("enabled", ""),
**extrinsic_metadata,
"forked_from": upstream_origin_url or "",
# fmt: off
"is_fork_with_newer_contributions":
is_fork_with_newer_contributions,
# fmt: on
}
)
os.rename(tmp_output_path, output_path) # atomic write
def _parse_origin_metadata(self, origin_url, json_ld) -> Dict[str, Union[str, int]]:
import json
import rdflib
from swh.indexer.codemeta import expand
from swh.indexer.namespaces import ACTIVITYSTREAMS, FORGEFED, SCHEMA
origin = rdflib.term.URIRef(origin_url)
metadata: Dict[str, Union[str, int]] = {}
g: Any = rdflib.Graph().parse(
data=json.dumps(expand(json_ld)),
format="json-ld",
# replace invalid URIs with blank node ids, instead of discarding
# whole nodes:
generalized_rdf=True,
)
# Simple string literals
for term, column_name in [
(FORGEFED.forkedFrom, "forked_from"),
(FORGEFED.dateCreated, "creation_date"),
(FORGEFED.dateModified, "modification_date"),
(SCHEMA.description, "description"),
]:
for _, _, obj in g.triples((origin, term, None)):
metadata[column_name] = str(obj)
# Concatenated literals
for term, column_name, joiner in [
(SCHEMA.programmingLanguage, "programming_languages", " "),
(SCHEMA.license, "licenses", " "),
(SCHEMA.keywords, "keywords", ", "),
]:
metadata[column_name] = joiner.join(
str(obj) for _, _, obj in g.triples((origin, term, None))
)
# Collections
for collection_term, column_name in [
(FORGEFED.forks, "num_forks"),
(ACTIVITYSTREAMS.likes, "num_likes"),
(ACTIVITYSTREAMS.followers, "num_followers"),
]:
for _, _, collection in g.triples((origin, collection_term, None)):
for _, _, total_items in g.triples(
(collection, ACTIVITYSTREAMS.totalItems, None)
):
metadata[column_name] = total_items.value
# TODO: add number of issues
return metadata
def _get_origins_metadata(
self, origins: List[str]
) -> Dict[str, "OriginExtrinsicMetadataRow"]:
import itertools
import math
import multiprocessing.dummy
import tqdm
from swh.core.utils import grouper
from swh.indexer.storage import get_indexer_storage
from swh.storage.proxies.retry import swh_retry
idx_storage = get_indexer_storage("remote", url=self.indexer_storage_url)
num_concurrent_requests = 10
num_origins_per_request = 100
fetch_batch = swh_retry(idx_storage.origin_extrinsic_metadata_get)
with multiprocessing.dummy.Pool(num_concurrent_requests) as p:
return {
row.id: row
for row in itertools.chain.from_iterable(
tqdm.tqdm(
p.imap_unordered(
fetch_batch,
grouper(origins, num_origins_per_request),
),
desc="Fetching origin metadata",
total=math.ceil(len(origins) / num_origins_per_request),
),
)
}
def _get_listed_origins(self, origins: List[str]) -> Dict[str, ParsedListedOrigin]:
import functools
import itertools
import math
import multiprocessing.dummy
import tqdm
from swh.core.utils import grouper
from swh.scheduler import get_scheduler
scheduler = get_scheduler("remote", url=self.swh_scheduler_url)
num_concurrent_requests = 10
num_origins_per_request = 100
with multiprocessing.dummy.Pool(num_concurrent_requests) as p:
return {
listed_origin["url"]: listed_origin
for listed_origin in itertools.chain.from_iterable(
tqdm.tqdm(
p.imap_unordered(
functools.partial(
self._get_listed_origins_group, scheduler=scheduler
),
grouper(origins, num_origins_per_request),
),
desc="Fetching scheduler info",
total=math.ceil(len(origins) / num_origins_per_request),
),
)
}
def _get_listed_origins_group(
self, origins: List[str], scheduler: "SchedulerInterface"
) -> List[ParsedListedOrigin]:
from swh.core.api.classes import stream_results
results: Dict[str, ParsedListedOrigin] = {}
for listed_origin in stream_results(scheduler.get_listed_origins, urls=origins):
origin_result = results.setdefault(
listed_origin.url,
{"url": listed_origin.url, "last_update": None, "enabled": False},
)
if listed_origin.last_update:
if origin_result["last_update"] is None:
origin_result["last_update"] = listed_origin.last_update
else:
origin_result["last_update"] = max(
origin_result["last_update"], listed_origin.last_update
)
origin_result["enabled"] = origin_result["enabled"] or listed_origin.enabled
return list(results.values())
[docs]
class DenormalizeImpactedRevrels(luigi.Task):
"""Denormalizes :class:`ComputeIndexedImpact`'s output by writing Parquet files
listing individual revrels and origin-to-revrel mappings."""
indexer_storage_url = luigi.StrParameter(significant=False)
swh_scheduler_url = luigi.StrParameter(significant=False)
local_graph_path = luigi.PathParameter()
local_sensitive_graph_path = luigi.OptionalPathParameter(default=None)
graph_name = luigi.StrParameter(default="graph")
impact_dir = luigi.PathParameter()
write_revrels = luigi.BoolParameter(
default=True,
description="Whether to export revision and release properties to a Parquet table",
)
write_origin_revrels = luigi.BoolParameter(
default=True,
description="Whether to enumerate all revision and releases in each origin to "
"a Parquet table",
)
revrel_filter = luigi.ChoiceParameter(
choices=["contributed", "in-origin"],
default="contributed",
description="'contributed' (the default) includes only revrels contributed "
"by the institution. "
"'in-origin' includes all revrels in any origin that contain an other "
"revrel that was contributed by the institution, which can be a very large set.",
)
algorithm = luigi.ChoiceParameter(
choices=["oriset", "bfs"],
default="oriset",
description="Denormalization backend. 'oriset' (the default) writes compact "
"orisets_of_origin + revrels_in_oriset tables via a topological map-reduce "
"(O(num_ori + num_revrel) storage, but single-threaded and slow per run). "
"'bfs' writes flat (origin, revrel) pairs via a per-origin parallel traversal "
"(larger output, but much faster per run, and needs no topological order).",
)
include_ranges = luigi.StrParameter(
default="",
description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or "
"date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). "
"If provided, only revisions and releases with an author *or* committer date in "
"one of these ranges are considered.",
)
exclude_ranges = luigi.StrParameter(
default="",
description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or "
"date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). "
"Author and committer dates in these ranges are not considered for include_ranges.",
)
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`,
:class:`swh.graph.libs.luigi.topology.ComputeGenerations`,
and two instances of each of :class:`swh.graph.libs.luigi.topology.CountPaths`
and :class:`swh.graph.libs.luigi.topology.CountDescendants` (forward and backward)
"""
deps: Dict[str, luigi.Task] = {
"graph": LocalGraph(
local_graph_path=self.local_graph_path,
local_sensitive_graph_path=self.local_sensitive_graph_path,
),
"persons": SelectPersons(
impact_dir=self.impact_dir,
),
"indexed_impact": ComputeIndexedImpact(
indexer_storage_url=self.indexer_storage_url,
swh_scheduler_url=self.swh_scheduler_url,
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
impact_dir=self.impact_dir,
),
}
# Only the oriset backend consumes a topological order.
if self.algorithm == "oriset":
deps["toposort"] = ComputeGenerations(
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
direction="forward",
object_types="rev,rel,snp,ori",
)
all_object_types = "cnt,dir,rev,rel,snp,ori"
histhost_object_types = "rev,rel,snp,ori"
for direction in ("forward", "backward"):
kwargs = dict(
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
direction=direction,
)
deps[f"{direction}_paths"] = CountPaths(
object_types=all_object_types, **kwargs
)
deps[f"{direction}_descendants"] = CountDescendants(
object_types=all_object_types,
**kwargs,
)
deps[f"histhost_{direction}_paths"] = CountPaths(
object_types=histhost_object_types, **kwargs
)
deps[f"histhost_{direction}_descendants"] = CountDescendants(
object_types=histhost_object_types,
**kwargs,
)
return deps
[docs]
def output(self) -> Dict[str, luigi.LocalTarget]:
"""``self.impact_dir / "revrels"`` and/or ``self.impact_dir / "origin_revrels"``
depending on ``self.write_revrels`` and ``self.write_origin_revrels``.
For ``algorithm="oriset"``, ``origin_revrels`` is a directory containing two
subdirectories, ``orisets_of_origin`` and ``revrels_in_oriset``, that together
encode the many-to-many origin->revrel relation. For ``algorithm="bfs"`` it
contains flat ``(origin, revrel)`` Parquet files directly."""
targets: Dict[str, luigi.LocalTarget] = {}
if self.write_revrels:
targets["revrels"] = luigi.LocalTarget(self.impact_dir / "revrels")
if self.write_origin_revrels:
targets["origin_revrels"] = luigi.LocalTarget(
self.impact_dir / "origin_revrels"
)
return targets
[docs]
def run(self) -> None:
"""Runs impact-denormalize"""
import csv
import tempfile
import pyzstd
from swh.datasets.shell import Rust
from swh.graph.shell import Command
output = self.output()
args = [
"impact-denormalize",
"--graph",
self.local_graph_path / self.graph_name,
"--revrel-filter",
self.revrel_filter,
"--algorithm",
self.algorithm,
]
# The topological order is only consumed by the oriset backend; bfs ignores it.
if self.algorithm == "oriset":
args.extend(["--order", self.input()["toposort"]["topo_order"].path])
if self.local_sensitive_graph_path:
args.extend(
["--sensitive-graph", self.local_sensitive_graph_path / self.graph_name]
)
for direction in ("forward", "backward"):
for prefix in ("", "histhost_"):
paths_dir = self.input()[f"{prefix}{direction}_paths"]["arrays"].path
descendants_dir = self.input()[f"{prefix}{direction}_descendants"][
"bitfieldvec"
].path
args.extend(
[
"--paths-dir",
f"{prefix}{direction}={paths_dir}",
"--descendants-dir",
f"{prefix}{direction}={descendants_dir}",
]
)
if self.include_ranges:
for range_ in self.include_ranges.split():
args.extend(["--include-range", range_])
if self.exclude_ranges:
for range_ in self.exclude_ranges.split():
args.extend(["--exclude-range", range_])
if "revrels" in output:
args.extend(["--revrels-out", output["revrels"].path])
if "origin_revrels" in output:
args.extend(["--origin-revrels-out", output["origin_revrels"].path])
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt") as origins:
# Extract origin URLs from the indexed impact CSV
with open(self.input()["indexed_impact"]["origin_csv"].path, "rb") as f:
with pyzstd.open(f, "rt") as zf:
reader = csv.DictReader(zf)
for row in reader:
origins.write(row["origin_url"] + "\n")
origins.flush()
args.extend(["--origins", origins.name])
# fmt: off
(
Command.cat(self.input()["persons"])
| Rust(*args)
).run()
# fmt: on