Source code for swh.datasets.luigi.impact

# Copyright (C) 2025-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

r"""
Luigi tasks to measure institutional impact
===========================================

This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
computing the impact of an institution across all origins.
Institutions are identified by a regular expression on authors' email address,
such as ``.*@softwareheritage.org``.

Output is written to the directory configured with ``impact_dir``, which contains:

* :file:`persons.csv`: a CSV file with header ``fullname_base64,email_base64``.
  It is filled from the "sensitive export"; which complements public exports
  dated 2026-03-02 or newer. For older exports, it can be generated with::

      $ psql service=swh
      softwareheritage=> \copy (select translate(encode(fullname, 'base64'), E'\n', '') as fullname_base64, translate(encode(email, 'base64'), E'\n', '') as email_base64 from person where encode(email, 'escape') ~ '.*@(.*\.)?softwareheritage.org$') to '/path/to/impact/dir/persons.csv' with (delimiter ',', header true)

* :file:`raw_origins.csv.zst`: all origins that contain a commit authored or committed
  by someone listed in :file:`persons.csv`
* :file:`indexed_origins.csv.zst`: similar to :file:`raw_origins.csv.zst` but with some
  irrelevant origins filtered out, and enriched with metadata from :ref:`swh-indexer`
  and :ref:`swh-scheduler`.
* :file:`revrels`: a directory containing all revisions in any origin listed in
  :file:`indexed_origins.csv.zst`, as Parquet files
* :file:`origin_revrels`: a directory describing the many-to-many relation between origins
  listed in :file:`indexed_origins.csv.zst` and revisions in :file:`revrels`. To avoid the
  combinatorial explosion of writing every (origin, revrel) pair, the relation is decomposed
  via an intermediate "oriset" (origin set) identifier:

  * :file:`origin_revrels/orisets_of_origin`
  * :file:`origin_revrels/revrels_in_oriset`
"""  # noqa

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import datetime
import logging
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Type,
    TypedDict,
    TypeVar,
    Union,
)

import luigi

from swh.export.luigi import LocalExport
from swh.graph.libs.luigi.topology import (
    ComputeGenerations,
    CountDescendants,
    CountPaths,
)
from swh.graph.luigi.compressed_graph import LocalGraph

if TYPE_CHECKING:
    from swh.indexer.storage.model import OriginExtrinsicMetadataRow
    from swh.scheduler.interface import SchedulerInterface

    class ParsedListedOrigin(TypedDict, total=False):
        url: str
        last_update: Optional[datetime.datetime]
        enabled: bool

else:
    ParsedListedOrigin = dict

logger = logging.getLogger(__name__)


[docs] class SelectPersons(luigi.Task): """Extract persons from the output of :ref:`swh-export` based on the provided regexp""" local_sensitive_export_path = luigi.PathParameter() impact_dir = luigi.PathParameter() email_regexp = luigi.StrParameter( significant=False, description=""" A regular expression, in the RE2 syntax (https://github.com/google/re2/wiki/Syntax) to select which email addresses to include in this impact dataset. It must match the whole email. If it matches the empty string, persons with no parsable email are selected too. Example: ``.*@(softwareheritage.org|inria.fr)`` """, )
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`LocalExport`""" return { "export": LocalExport( local_sensitive_export_path=self.local_sensitive_export_path, ), }
[docs] def output(self) -> luigi.LocalTarget: """Returns ``impact_dir / "persons.csv"``""" return luigi.LocalTarget(self.impact_dir / "persons.csv")
[docs] def run(self) -> None: """Reads the input .orc, filters persons matching the regexp, and writes them to the output .csv""" import base64 import csv import os import pyarrow.compute import pyarrow.orc import tqdm # TODO: rewrite this using a single Datafusion SQL query, once we switch # from ORC to Parquet tmp_path = Path(f"{self.output().path}.tmp") tmp_path.parent.mkdir(parents=True, exist_ok=True) with tmp_path.open("wt") as csvfile: writer = csv.writer(csvfile) writer.writerow(("fullname_base64", "email_base64")) persons_dir = self.local_sensitive_export_path / "orc" / "person" files = [pyarrow.orc.ORCFile(path) for path in persons_dir.iterdir()] total_rows = sum(file.nrows for file in files) with tqdm.tqdm(total=total_rows, desc="Filtering persons") as pbar: for file in files: for stripe_id in range(file.nstripes): stripe = file.read_stripe(stripe_id, columns=["fullname"]) emails = pyarrow.compute.extract_regex( stripe.column("fullname"), pattern=".*<(?P<email>.*?)>" ).field("email") mask = pyarrow.compute.match_substring_regex( emails, pattern="^" + self.email_regexp.removeprefix("^").removesuffix("$") + "$", ) emails = emails.filter(mask) fullnames = stripe.column("fullname").filter(mask) for fullname, email in zip(fullnames, emails): writer.writerow( ( base64.b64encode(fullname.as_py()).decode(), base64.b64encode(email.as_py()).decode(), ) ) pbar.update(stripe.num_rows) os.rename(tmp_path, self.output().path)
[docs] class ComputeRawImpact(luigi.Task): """Creates a file that list all origins that contains revrels from a given set of persons, as well as the number of revrels and first/latest timestamp for each origin. """ local_graph_path = luigi.PathParameter() local_sensitive_graph_path = luigi.OptionalPathParameter(default=None) graph_name = luigi.StrParameter(default="graph") impact_dir = luigi.PathParameter() output_emails = luigi.BoolParameter(default=False) include_ranges = luigi.StrParameter( default="", description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or " "date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). " "If provided, only revisions and releases with an author *or* committer date in " "one of these ranges are considered.", ) exclude_ranges = luigi.StrParameter( default="", description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or " "date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). " "Author and committer dates in these ranges are not considered for --include-ranges.", )
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`, :class:`swh.graph.libs.luigi.topology.ComputeGenerations`, and two instances of each of :class:`swh.graph.libs.luigi.topology.CountPaths` and :class:`swh.graph.libs.luigi.topology.CountDescendants` (forward and backward) """ all_object_types = "cnt,dir,rev,rel,snp,ori" histhost_object_types = "rev,rel,snp,ori" deps: Dict[str, luigi.Task] = { "graph": LocalGraph( local_graph_path=self.local_graph_path, local_sensitive_graph_path=self.local_sensitive_graph_path, ), "persons": SelectPersons( impact_dir=self.impact_dir, ), } # in this task, we only use forward counts, because we only annotate origins, # which have no predecessor, so their backward counts would always be 1 direction = "forward" kwargs = dict( local_graph_path=self.local_graph_path, graph_name=self.graph_name, write_parquet=False, direction=direction, ) deps[f"{direction}_paths"] = CountPaths(object_types=all_object_types, **kwargs) deps[f"{direction}_descendants"] = CountDescendants( object_types=all_object_types, **kwargs, ) deps[f"histhost_{direction}_paths"] = CountPaths( object_types=histhost_object_types, **kwargs ) deps[f"histhost_{direction}_descendants"] = CountDescendants( object_types=histhost_object_types, **kwargs, ) return deps
[docs] def output(self) -> Dict[str, luigi.LocalTarget]: """``{"origin_csv": self.impact_dir / "raw_origins.csv.zst"}``""" return { "origin_csv": luigi.LocalTarget(self.impact_dir / "raw_origins.csv.zst") }
[docs] def run(self) -> None: """Runs 'impact' and compresses""" from swh.datasets.shell import Rust from swh.graph.shell import AtomicFileSink, Command args = [ "impact", "--graph", self.local_graph_path / self.graph_name, ] if self.local_sensitive_graph_path: args.extend( ["--sensitive-graph", self.local_sensitive_graph_path / self.graph_name] ) if self.output_emails: args.append("--output-emails") if self.include_ranges: for range_ in self.include_ranges.split(): args.extend(["--include-range", range_]) if self.exclude_ranges: for range_ in self.exclude_ranges.split(): args.extend(["--exclude-range", range_]) direction = "forward" # ditto for prefix in ("", "histhost_"): paths_dir = self.input()[f"{prefix}{direction}_paths"]["arrays"].path descendants_dir = self.input()[f"{prefix}{direction}_descendants"][ "bitfieldvec" ].path args.extend( [ "--paths-dir", f"{prefix}{direction}={paths_dir}", "--descendants-dir", f"{prefix}{direction}={descendants_dir}", ] ) # fmt: off ( Command.cat(self.input()["persons"]) | Rust(*args) | Command.zstdmt("-19") > AtomicFileSink(self.output()["origin_csv"].path) ).run()
# fmt: on
[docs] def opt_int(s: str) -> Optional[int]: if s == "": return None else: return int(s)
D = TypeVar("D")
[docs] def typeddict_parser(dt: Type[D]) -> Callable[[Dict[str, str]], D]: import inspect converters = {} for key, type_ in inspect.get_annotations(dt).items(): if type_ is str: converters[key] = lambda x: x elif type_ is int: converters[key] = int elif type_ is Optional[int]: converters[key] = opt_int elif type_ is float: converters[key] = float else: raise TypeError(f"Unsupported type {type_} for key {key} of {dt.__name__}") def parse(d): parsed = {} unknown_keys = d.keys() - converters.keys() missing_keys = converters.keys() - d.keys() if unknown_keys: raise TypeError(f"Dict has extra keys for {dt.__name__}: {unknown_keys}") if missing_keys: raise TypeError(f"Dict is missing keys for {dt.__name__}: {missing_keys}") for key, value in d.items(): if key in converters: parsed[key] = converters[key](value) return parsed return parse
[docs] class OriginWithEmails(TypedDict): origin_id: int origin_url: str num_contributed_revs: int num_contributed_rels: int num_contributed_revs_in_main_branch: Optional[int] total_revs: int total_rels: int total_revs_in_main_branch: Optional[int] first_contributed_revrel: str first_contributed_revrel_ts: Optional[int] first_contributed_revrel_author: str first_contributed_revrel_committer: str last_contributed_revrel: str last_contributed_revrel_ts: Optional[int] last_contributed_revrel_author: str last_contributed_revrel_committer: str first_revrel_ts: Optional[int] last_revrel_ts: Optional[int] codemeta_json: str citation_cff: str forward_paths: float forward_paths_to_roots: float forward_descendants: int histhost_forward_paths: float histhost_forward_paths_to_roots: float histhost_forward_descendants: int
[docs] class OriginWithoutEmails(TypedDict): origin_id: int origin_url: str num_contributed_revs: int num_contributed_rels: int num_contributed_revs_in_main_branch: Optional[int] total_revs: int total_rels: int total_revs_in_main_branch: Optional[int] first_contributed_revrel: str first_contributed_revrel_ts: Optional[int] last_contributed_revrel: str last_contributed_revrel_ts: Optional[int] first_revrel_ts: Optional[int] last_revrel_ts: Optional[int] codemeta_json: str citation_cff: str forward_paths: float forward_paths_to_roots: float forward_descendants: int histhost_forward_paths: float histhost_forward_paths_to_roots: float histhost_forward_descendants: int
[docs] class ComputeIndexedImpact(luigi.Task): """Removes forks from :class:`ComputeRawImpact`'s output, unless they contain more revrels (or older/newer ones) than the upstream origin.""" indexer_storage_url = luigi.StrParameter(significant=False) swh_scheduler_url = luigi.StrParameter(significant=False) FORK_FILTERS = [ "all", "none", "without-upstream-contribution", "with-original-content", ] local_graph_path = luigi.PathParameter() graph_name = luigi.StrParameter(default="graph") impact_dir = luigi.PathParameter() output_emails = luigi.BoolParameter(default=False) fork_filter = luigi.ChoiceParameter( choices=FORK_FILTERS, default="with-original-content", description="'all' includes all forks. 'none' excludes all forks. " "'without-upstream-contribution' excludes forks of an upstream that does not have " "original content from one of the persons." "'with-original-content' extends this to include forks that have more commits " "from the persons than the upstream origin.", )
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.libs.luigi.topology.ComputeGenerations`.""" return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "raw_impact": ComputeRawImpact( local_graph_path=self.local_graph_path, graph_name=self.graph_name, impact_dir=self.impact_dir, output_emails=self.output_emails, ), }
[docs] def output(self) -> Dict[str, luigi.Target]: """``{"origin_csv": self.impact_dir / "indexed_origins.csv.zst"}``""" return { "origin_csv": luigi.LocalTarget(self.impact_dir / "indexed_origins.csv.zst") }
[docs] def run(self) -> None: """Reads input CSV, optionally filters out some lines (forks), adds columns, writes it again.""" import csv import inspect import os import pyzstd import tqdm with open(self.input()["raw_impact"]["origin_csv"].path, "rb") as in_fd: with pyzstd.open(in_fd, "rt") as f: reader = csv.DictReader(f) origins: Union[ Dict[str, OriginWithEmails], Dict[str, OriginWithoutEmails], ] if self.output_emails: header = list(inspect.get_annotations(OriginWithEmails)) parse_origin_with_emails = typeddict_parser(OriginWithEmails) origins = { row["origin_url"]: parse_origin_with_emails(row) for row in tqdm.tqdm(reader, desc="Reading raw origin stats") } else: header = list(inspect.get_annotations(OriginWithoutEmails)) parse_origin_without_emails = typeddict_parser(OriginWithoutEmails) origins = { row["origin_url"]: parse_origin_without_emails(row) for row in tqdm.tqdm(reader, desc="Reading raw origin stats") } origins_metadata = self._get_origins_metadata(list(origins)) listed_origins = self._get_listed_origins(list(origins)) output_path = self.impact_dir / "indexed_origins.csv.zst" tmp_output_path = f"{output_path}.tmp" kept_origin_ids: set[int] = set() with open(tmp_output_path, "wb") as out_fd: with pyzstd.open(out_fd, "wt") as f: writer = csv.DictWriter( f, ( *header, "last_visit_ts", "visits_enabled", "num_forks", "num_likes", "num_followers", "creation_date", "modification_date", "licenses", "keywords", "programming_languages", "description", "forked_from", "is_fork_with_newer_contributions", ), ) writer.writeheader() for origin_url in tqdm.tqdm( origins, desc=( "Writing" if self.fork_filter == "all" else "Filtering out uninteresting forks and writing" ), ): origin_metadata = origins_metadata.get(origin_url) if origin_metadata is None: extrinsic_metadata = {} else: extrinsic_metadata = self._parse_origin_metadata( origin_url, origin_metadata.metadata ) upstream_origin_url = extrinsic_metadata.get("forked_from") num_likes = extrinsic_metadata.get("num_likes", 0) num_followers = extrinsic_metadata.get("num_followers", 0) assert upstream_origin_url is None or isinstance( upstream_origin_url, str ), (origin_url, upstream_origin_url) assert isinstance(num_likes, int), (origin_url, num_likes) assert isinstance(num_followers, int), (origin_url, num_followers) origin = origins[origin_url] listed_origin: ParsedListedOrigin = listed_origins.get( origin_url, {} ) if upstream_origin_url is None: is_fork_with_newer_contributions = None else: try: upstream = origins[upstream_origin_url] except KeyError: # unknown upstream, so let's assume the fork is interesting is_fork_with_newer_contributions = True else: is_fork_with_newer_contributions = ( origin["num_contributed_revs"] > upstream["num_contributed_revs"] or origin["num_contributed_rels"] > upstream["num_contributed_rels"] or ( ( origin["last_contributed_revrel_ts"] is not None and upstream["last_contributed_revrel_ts"] is not None and origin["last_contributed_revrel_ts"] > upstream["last_contributed_revrel_ts"] ) or ( origin["last_contributed_revrel_ts"] is not None and upstream["last_contributed_revrel_ts"] is None ) ) ) if upstream_origin_url is None: pass # not a fork, we keep it elif self.fork_filter == "none": continue # exclude all forks elif self.fork_filter == "all": pass # include all forks elif self.fork_filter == "without-upstream-contribution": if upstream_origin_url in origins: continue # upstream has contributions else: pass # upstream does not elif self.fork_filter == "with-original-content": if ( num_likes <= 1 and num_followers <= 1 and not is_fork_with_newer_contributions ): # uninteresting fork continue else: assert False, f"unknown fork filter: {self.fork_filter!r}" kept_origin_ids.add(origin["origin_id"]) writer.writerow( { **origin, "last_visit_ts": listed_origin.get("last_update", ""), "visits_enabled": listed_origin.get("enabled", ""), **extrinsic_metadata, "forked_from": upstream_origin_url or "", # fmt: off "is_fork_with_newer_contributions": is_fork_with_newer_contributions, # fmt: on } ) os.rename(tmp_output_path, output_path) # atomic write
def _parse_origin_metadata(self, origin_url, json_ld) -> Dict[str, Union[str, int]]: import json import rdflib from swh.indexer.codemeta import expand from swh.indexer.namespaces import ACTIVITYSTREAMS, FORGEFED, SCHEMA origin = rdflib.term.URIRef(origin_url) metadata: Dict[str, Union[str, int]] = {} g: Any = rdflib.Graph().parse( data=json.dumps(expand(json_ld)), format="json-ld", # replace invalid URIs with blank node ids, instead of discarding # whole nodes: generalized_rdf=True, ) # Simple string literals for term, column_name in [ (FORGEFED.forkedFrom, "forked_from"), (FORGEFED.dateCreated, "creation_date"), (FORGEFED.dateModified, "modification_date"), (SCHEMA.description, "description"), ]: for _, _, obj in g.triples((origin, term, None)): metadata[column_name] = str(obj) # Concatenated literals for term, column_name, joiner in [ (SCHEMA.programmingLanguage, "programming_languages", " "), (SCHEMA.license, "licenses", " "), (SCHEMA.keywords, "keywords", ", "), ]: metadata[column_name] = joiner.join( str(obj) for _, _, obj in g.triples((origin, term, None)) ) # Collections for collection_term, column_name in [ (FORGEFED.forks, "num_forks"), (ACTIVITYSTREAMS.likes, "num_likes"), (ACTIVITYSTREAMS.followers, "num_followers"), ]: for _, _, collection in g.triples((origin, collection_term, None)): for _, _, total_items in g.triples( (collection, ACTIVITYSTREAMS.totalItems, None) ): metadata[column_name] = total_items.value # TODO: add number of issues return metadata def _get_origins_metadata( self, origins: List[str] ) -> Dict[str, "OriginExtrinsicMetadataRow"]: import itertools import math import multiprocessing.dummy import tqdm from swh.core.utils import grouper from swh.indexer.storage import get_indexer_storage from swh.storage.proxies.retry import swh_retry idx_storage = get_indexer_storage("remote", url=self.indexer_storage_url) num_concurrent_requests = 10 num_origins_per_request = 100 fetch_batch = swh_retry(idx_storage.origin_extrinsic_metadata_get) with multiprocessing.dummy.Pool(num_concurrent_requests) as p: return { row.id: row for row in itertools.chain.from_iterable( tqdm.tqdm( p.imap_unordered( fetch_batch, grouper(origins, num_origins_per_request), ), desc="Fetching origin metadata", total=math.ceil(len(origins) / num_origins_per_request), ), ) } def _get_listed_origins(self, origins: List[str]) -> Dict[str, ParsedListedOrigin]: import functools import itertools import math import multiprocessing.dummy import tqdm from swh.core.utils import grouper from swh.scheduler import get_scheduler scheduler = get_scheduler("remote", url=self.swh_scheduler_url) num_concurrent_requests = 10 num_origins_per_request = 100 with multiprocessing.dummy.Pool(num_concurrent_requests) as p: return { listed_origin["url"]: listed_origin for listed_origin in itertools.chain.from_iterable( tqdm.tqdm( p.imap_unordered( functools.partial( self._get_listed_origins_group, scheduler=scheduler ), grouper(origins, num_origins_per_request), ), desc="Fetching scheduler info", total=math.ceil(len(origins) / num_origins_per_request), ), ) } def _get_listed_origins_group( self, origins: List[str], scheduler: "SchedulerInterface" ) -> List[ParsedListedOrigin]: from swh.core.api.classes import stream_results results: Dict[str, ParsedListedOrigin] = {} for listed_origin in stream_results(scheduler.get_listed_origins, urls=origins): origin_result = results.setdefault( listed_origin.url, {"url": listed_origin.url, "last_update": None, "enabled": False}, ) if listed_origin.last_update: if origin_result["last_update"] is None: origin_result["last_update"] = listed_origin.last_update else: origin_result["last_update"] = max( origin_result["last_update"], listed_origin.last_update ) origin_result["enabled"] = origin_result["enabled"] or listed_origin.enabled return list(results.values())
[docs] class DenormalizeImpactedRevrels(luigi.Task): """Denormalizes :class:`ComputeIndexedImpact`'s output by writing Parquet files listing individual revrels and origin-to-revrel mappings.""" indexer_storage_url = luigi.StrParameter(significant=False) swh_scheduler_url = luigi.StrParameter(significant=False) local_graph_path = luigi.PathParameter() local_sensitive_graph_path = luigi.OptionalPathParameter(default=None) graph_name = luigi.StrParameter(default="graph") impact_dir = luigi.PathParameter() write_revrels = luigi.BoolParameter( default=True, description="Whether to export revision and release properties to a Parquet table", ) write_origin_revrels = luigi.BoolParameter( default=True, description="Whether to enumerate all revision and releases in each origin to " "a Parquet table", ) revrel_filter = luigi.ChoiceParameter( choices=["contributed", "in-origin"], default="contributed", description="'contributed' (the default) includes only revrels contributed " "by the institution. " "'in-origin' includes all revrels in any origin that contain an other " "revrel that was contributed by the institution, which can be a very large set.", ) algorithm = luigi.ChoiceParameter( choices=["oriset", "bfs"], default="oriset", description="Denormalization backend. 'oriset' (the default) writes compact " "orisets_of_origin + revrels_in_oriset tables via a topological map-reduce " "(O(num_ori + num_revrel) storage, but single-threaded and slow per run). " "'bfs' writes flat (origin, revrel) pairs via a per-origin parallel traversal " "(larger output, but much faster per run, and needs no topological order).", ) include_ranges = luigi.StrParameter( default="", description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or " "date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). " "If provided, only revisions and releases with an author *or* committer date in " "one of these ranges are considered.", ) exclude_ranges = luigi.StrParameter( default="", description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or " "date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). " "Author and committer dates in these ranges are not considered for include_ranges.", )
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`, :class:`swh.graph.libs.luigi.topology.ComputeGenerations`, and two instances of each of :class:`swh.graph.libs.luigi.topology.CountPaths` and :class:`swh.graph.libs.luigi.topology.CountDescendants` (forward and backward) """ deps: Dict[str, luigi.Task] = { "graph": LocalGraph( local_graph_path=self.local_graph_path, local_sensitive_graph_path=self.local_sensitive_graph_path, ), "persons": SelectPersons( impact_dir=self.impact_dir, ), "indexed_impact": ComputeIndexedImpact( indexer_storage_url=self.indexer_storage_url, swh_scheduler_url=self.swh_scheduler_url, local_graph_path=self.local_graph_path, graph_name=self.graph_name, impact_dir=self.impact_dir, ), } # Only the oriset backend consumes a topological order. if self.algorithm == "oriset": deps["toposort"] = ComputeGenerations( local_graph_path=self.local_graph_path, graph_name=self.graph_name, direction="forward", object_types="rev,rel,snp,ori", ) all_object_types = "cnt,dir,rev,rel,snp,ori" histhost_object_types = "rev,rel,snp,ori" for direction in ("forward", "backward"): kwargs = dict( local_graph_path=self.local_graph_path, graph_name=self.graph_name, direction=direction, ) deps[f"{direction}_paths"] = CountPaths( object_types=all_object_types, **kwargs ) deps[f"{direction}_descendants"] = CountDescendants( object_types=all_object_types, **kwargs, ) deps[f"histhost_{direction}_paths"] = CountPaths( object_types=histhost_object_types, **kwargs ) deps[f"histhost_{direction}_descendants"] = CountDescendants( object_types=histhost_object_types, **kwargs, ) return deps
[docs] def output(self) -> Dict[str, luigi.LocalTarget]: """``self.impact_dir / "revrels"`` and/or ``self.impact_dir / "origin_revrels"`` depending on ``self.write_revrels`` and ``self.write_origin_revrels``. For ``algorithm="oriset"``, ``origin_revrels`` is a directory containing two subdirectories, ``orisets_of_origin`` and ``revrels_in_oriset``, that together encode the many-to-many origin->revrel relation. For ``algorithm="bfs"`` it contains flat ``(origin, revrel)`` Parquet files directly.""" targets: Dict[str, luigi.LocalTarget] = {} if self.write_revrels: targets["revrels"] = luigi.LocalTarget(self.impact_dir / "revrels") if self.write_origin_revrels: targets["origin_revrels"] = luigi.LocalTarget( self.impact_dir / "origin_revrels" ) return targets
[docs] def run(self) -> None: """Runs impact-denormalize""" import csv import tempfile import pyzstd from swh.datasets.shell import Rust from swh.graph.shell import Command output = self.output() args = [ "impact-denormalize", "--graph", self.local_graph_path / self.graph_name, "--revrel-filter", self.revrel_filter, "--algorithm", self.algorithm, ] # The topological order is only consumed by the oriset backend; bfs ignores it. if self.algorithm == "oriset": args.extend(["--order", self.input()["toposort"]["topo_order"].path]) if self.local_sensitive_graph_path: args.extend( ["--sensitive-graph", self.local_sensitive_graph_path / self.graph_name] ) for direction in ("forward", "backward"): for prefix in ("", "histhost_"): paths_dir = self.input()[f"{prefix}{direction}_paths"]["arrays"].path descendants_dir = self.input()[f"{prefix}{direction}_descendants"][ "bitfieldvec" ].path args.extend( [ "--paths-dir", f"{prefix}{direction}={paths_dir}", "--descendants-dir", f"{prefix}{direction}={descendants_dir}", ] ) if self.include_ranges: for range_ in self.include_ranges.split(): args.extend(["--include-range", range_]) if self.exclude_ranges: for range_ in self.exclude_ranges.split(): args.extend(["--exclude-range", range_]) if "revrels" in output: args.extend(["--revrels-out", output["revrels"].path]) if "origin_revrels" in output: args.extend(["--origin-revrels-out", output["origin_revrels"].path]) with tempfile.NamedTemporaryFile(mode="w", suffix=".txt") as origins: # Extract origin URLs from the indexed impact CSV with open(self.input()["indexed_impact"]["origin_csv"].path, "rb") as f: with pyzstd.open(f, "rt") as zf: reader = csv.DictReader(zf) for row in reader: origins.write(row["origin_url"] + "\n") origins.flush() args.extend(["--origins", origins.name]) # fmt: off ( Command.cat(self.input()["persons"]) | Rust(*args) ).run()
# fmt: on