Source code for swh.datasets.luigi.impact

# Copyright (C) 2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""
Luigi tasks to measure institutional impact
===========================================

This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
computing the impact of an institution across all origins
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import datetime
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypedDict, Union

import luigi

from swh.graph.luigi.compressed_graph import LocalGraph

if TYPE_CHECKING:
    from swh.indexer.storage.model import OriginExtrinsicMetadataRow
    from swh.scheduler.interface import SchedulerInterface

    class ParsedListedOrigin(TypedDict, total=False):
        url: str
        last_update: datetime.datetime
        enabled: bool

else:
    ParsedListedOrigin = dict

logger = logging.getLogger(__name__)


[docs] class ComputeRawImpact(luigi.Task): """Creates a file that list all origins that contains revrels from a given set of persons, as well as the number of revrels and first/latest timestamp for each origin. """ local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") persons_path = luigi.PathParameter() raw_impact_path = luigi.PathParameter() output_emails = luigi.BoolParameter(default=False) include_ranges = luigi.Parameter( default="", description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or " "date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). " "If provided, only revisions and releases with an author *or* committer date in " "one of these ranges are considered.", ) exclude_ranges = luigi.Parameter( default="", description="Space-separated list of ISO8601 dates ('YYYY-MM-DD') or " "date intervals ('YYYY-MM-DD/YYYY-MM-DD', inclusive/exclusive). " "Author and committer dates in these ranges are not considered for --include-ranges.", )
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.libs.luigi.topology.ComputeGenerations`.""" class PersonsTask(luigi.ExternalTask): def output(self2): return luigi.LocalTarget(self.persons_path) return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "persons": PersonsTask(), }
[docs] def output(self) -> List[luigi.Target]: """.csv.zst file that contains the origin_id<->contributor_id map and the list of origins""" return [ luigi.LocalTarget(self.raw_impact_path), ]
[docs] def run(self) -> None: """Runs org.softwareheritage.graph.utils.ListOriginContributors and compresses""" import itertools from swh.datasets.shell import Rust from swh.graph.shell import AtomicFileSink, Command # fmt: off ( Command.cat(self.persons_path) | Rust( "impact", self.local_graph_path / self.graph_name, *(["--output-emails"] if self.output_emails else []), *itertools.chain.from_iterable( ["--include-range", range_] for range_ in self.include_ranges.split() ), *itertools.chain.from_iterable( ["--exclude-range", range_] for range_ in self.exclude_ranges.split() ), ) | Command.zstdmt("-19") > AtomicFileSink(self.raw_impact_path) ).run()
# fmt: on
[docs] class ComputeIndexedImpact(luigi.Task): """Removes forks from :class:`ComputeRawImpact`'s output, unless they contain more revrels (or older/newer ones) than the upstream origin.""" indexer_storage_url = luigi.Parameter(significant=False) swh_scheduler_url = luigi.Parameter(significant=False) FORK_FILTERS = [ "all", "none", "without-upstream-contribution", "with-original-content", ] local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") persons_path = luigi.PathParameter() raw_impact_path = luigi.PathParameter() output_emails = luigi.BoolParameter(default=False) indexed_impact_path = luigi.PathParameter() fork_filter = luigi.ChoiceParameter( choices=FORK_FILTERS, default="with-original-content", description="'all' includes all forks. 'none' excludes all forks. " "'without-known-upstream' excludes forks of an upstream that does not have " "original content from one of the persons." "'with-original-content' extends this to include forks that have more commits " "from the persons than the upstream origin.", )
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.libs.luigi.topology.ComputeGenerations`.""" return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "raw_impact": ComputeRawImpact( local_graph_path=self.local_graph_path, graph_name=self.graph_name, persons_path=self.persons_path, raw_impact_path=self.raw_impact_path, output_emails=self.output_emails, ), }
[docs] def output(self) -> List[luigi.Target]: """.csv.zst file that contains the origin_id<->contributor_id map and the list of origins""" return [ luigi.LocalTarget(self.indexed_impact_path), ]
[docs] def run(self) -> None: """Runs org.softwareheritage.graph.utils.ListOriginContributors and compresses""" import csv import os import pyzstd import tqdm def opt_int(s: str) -> Optional[int]: if s == "": return None else: return int(s) with open(self.raw_impact_path, "rb") as in_fd: with pyzstd.open(in_fd, "rt") as f: reader = csv.reader(f) header = next(reader) origins: Union[ Dict[ str, Tuple[ str, # origin_id int, # num_contributed_revrels Optional[int], # num_contributed_revs_in_main_branch int, # total_revrels Optional[int], # total_revs_in_main_branch str, # first_contributed_revrel Optional[int], # first_contributed_revrel_ts str, # first_contributed_revrel_author str, # first_contributed_revrel_committer str, # last_contributed_revrel Optional[int], # last_contributed_revrel_ts str, # last_contributed_revrel_author str, # last_contributed_revrel_committer Optional[int], # first_revrel_ts Optional[int], # last_revrel_ts ], ], Dict[ str, Tuple[ str, # origin_id int, # num_contributed_revrels Optional[int], # num_contributed_revs_in_main_branch int, # total_revrels Optional[int], # total_revs_in_main_branch str, # first_contributed_revrel Optional[int], # first_contributed_revrel_ts None, # first_contributed_revrel_author None, # first_contributed_revrel_committer str, # last_contributed_revrel Optional[int], # last_contributed_revrel_ts None, # last_contributed_revrel_author None, # last_contributed_revrel_committer Optional[int], # first_revrel_ts Optional[int], # last_revrel_ts ], ], ] if self.output_emails: assert header == [ "origin_id", "origin_url", "num_contributed_revrels", "num_contributed_revs_in_main_branch", "total_revrels", "total_revs_in_main_branch", "first_contributed_revrel", "first_contributed_revrel_ts", "first_contributed_revrel_author", "first_contributed_revrel_committer", "last_contributed_revrel", "last_contributed_revrel_ts", "last_contributed_revrel_author", "last_contributed_revrel_committer", "first_revrel_ts", "last_revrel_ts", ], f"Unexpected header: {header}" origins = { origin_url: ( origin_id, int(num_contributed_revrels), opt_int(num_contributed_revs_in_main_branch), int(total_revrels), opt_int(total_revs_in_main_branch), first_contributed_revrel, opt_int(first_contributed_revrel_ts), first_contributed_revrel_author, first_contributed_revrel_committer, last_contributed_revrel, opt_int(last_contributed_revrel_ts), last_contributed_revrel_author, last_contributed_revrel_committer, opt_int(first_revrel_ts), opt_int(last_revrel_ts), ) for ( origin_id, origin_url, num_contributed_revrels, num_contributed_revs_in_main_branch, total_revrels, total_revs_in_main_branch, first_contributed_revrel, first_contributed_revrel_ts, first_contributed_revrel_author, first_contributed_revrel_committer, last_contributed_revrel, last_contributed_revrel_ts, last_contributed_revrel_author, last_contributed_revrel_committer, first_revrel_ts, last_revrel_ts, ) in tqdm.tqdm(reader, desc="Reading raw origin stats") } else: assert header == [ "origin_id", "origin_url", "num_contributed_revrels", "num_contributed_revs_in_main_branch", "total_revrels", "total_revs_in_main_branch", "first_contributed_revrel", "first_contributed_revrel_ts", "last_contributed_revrel", "last_contributed_revrel_ts", "first_revrel_ts", "last_revrel_ts", ], f"Unexpected header: {header}" origins = { origin_url: ( origin_id, int(num_contributed_revrels), opt_int(num_contributed_revs_in_main_branch), int(total_revrels), opt_int(total_revs_in_main_branch), first_contributed_revrel, opt_int(first_contributed_revrel_ts), None, # first_contributed_revrel_author None, # first_contributed_revrel_committer last_contributed_revrel, opt_int(last_contributed_revrel_ts), None, # last_contributed_revrel_author None, # last_contributed_revrel_committer opt_int(first_revrel_ts), opt_int(last_revrel_ts), ) for ( origin_id, origin_url, num_contributed_revrels, num_contributed_revs_in_main_branch, total_revrels, total_revs_in_main_branch, first_contributed_revrel, first_contributed_revrel_ts, last_contributed_revrel, last_contributed_revrel_ts, first_revrel_ts, last_revrel_ts, ) in tqdm.tqdm(reader, desc="Reading raw origin stats") } origins_metadata = self._get_origins_metadata(list(origins)) listed_origins = self._get_listed_origins(list(origins)) tmp_path = f"{self.indexed_impact_path}.tmp" with open(tmp_path, "wb") as out_fd: with pyzstd.open(out_fd, "wt") as f: writer = csv.writer(f) writer.writerow( ( *header, "last_visit_ts", "visits_enabled", "num_forks", "num_stars", "num_followers", "forked_from", ) ) for origin_url in tqdm.tqdm( origins, desc=( "Writing" if self.fork_filter == "all" else "Filtering out uninteresting forks and writing" ), ): origin_metadata = origins_metadata.get(origin_url) if origin_metadata is None: upstream_origin_url = None num_forks = num_likes = num_followers = 0 else: (upstream_origin_url, num_forks, num_likes, num_followers) = ( self._parse_origin_metadata( origin_url, origin_metadata.metadata ) ) ( origin_id, num_contributed_revrels, num_contributed_revs_in_main_branch, total_revrels, total_revs_in_main_branch, first_contributed_revrel, first_contributed_revrel_ts, first_contributed_revrel_author, first_contributed_revrel_committer, last_contributed_revrel, last_contributed_revrel_ts, last_contributed_revrel_author, last_contributed_revrel_committer, first_revrel_ts, last_revrel_ts, ) = origins[origin_url] listed_origin: ParsedListedOrigin = listed_origins.get( origin_url, {} ) if upstream_origin_url is None: pass # not a fork, we keep it elif self.fork_filter == "none": continue # exclude all forks elif self.fork_filter == "all": pass # include all forks elif self.fork_filter == "without-upstream-contribution": if upstream_origin_url in origins: continue # upstream has contributions else: pass # upstream does not else: # check for new content try: ( _origin_id, num_upstream_revrels, _num_contributed_revs_in_main_branch, _total_revrels, _total_revs_in_main_branch, _first_contributed_revrel, first_upstream_revrel_ts, _first_upstream_contributed_revrel_author, _first_upstream_contributed_revrel_committer, _last_upstream_contributed_revrel, last_upstream_contributed_revrel_ts, _last_upstream_contributed_revrel_author, _last_upstream_contributed_revrel_committer, _first_revrel_ts, _last_revrel_ts, ) = origins[upstream_origin_url] except KeyError: pass # unknown upstream, so let's keep the fork else: if ( # the fork does not have more revrels num_contributed_revrels <= num_upstream_revrels and ( ( # the fork does not have a newer revrel last_contributed_revrel_ts is not None and last_upstream_contributed_revrel_ts is not None and last_contributed_revrel_ts <= last_upstream_contributed_revrel_ts ) or ( # the fork has no timestamped revrel, but upstream does last_contributed_revrel_ts is None and last_upstream_contributed_revrel_ts is not None ) ) and num_likes <= 1 and num_followers <= 1 ): # uninteresting fork continue if self.output_emails: writer.writerow( ( origin_id, origin_url, num_contributed_revrels, num_contributed_revs_in_main_branch, total_revrels, total_revs_in_main_branch, first_contributed_revrel, first_contributed_revrel_ts, first_contributed_revrel_author, first_contributed_revrel_committer, last_contributed_revrel, last_contributed_revrel_ts, last_contributed_revrel_author, last_contributed_revrel_committer, first_revrel_ts, last_revrel_ts, listed_origin.get("last_update", ""), listed_origin.get("enabled", ""), num_forks, num_likes, num_followers, upstream_origin_url or "", ) ) else: writer.writerow( ( origin_id, origin_url, num_contributed_revrels, num_contributed_revs_in_main_branch, total_revrels, total_revs_in_main_branch, first_contributed_revrel, first_contributed_revrel_ts, last_contributed_revrel, last_contributed_revrel_ts, first_revrel_ts, last_revrel_ts, listed_origin.get("last_update", ""), listed_origin.get("enabled", ""), num_forks, num_likes, num_followers, upstream_origin_url or "", ) ) os.rename(tmp_path, self.indexed_impact_path) # atomic write
def _parse_origin_metadata( self, origin_url, json_ld ) -> Tuple[Optional[str], int, int, int]: import json import rdflib from swh.indexer.codemeta import expand from swh.indexer.namespaces import ACTIVITYSTREAMS, FORGEFED origin = rdflib.term.URIRef(origin_url) g: Any = rdflib.Graph().parse( data=json.dumps(expand(json_ld)), format="json-ld", # replace invalid URIs with blank node ids, instead of discarding # whole nodes: generalized_rdf=True, ) for _, _, forked_from in g.triples((origin, FORGEFED.forkedFrom, None)): upstream_origin_url = str(forked_from) break else: # not a fork, as far as we know upstream_origin_url = None num_forks = 0 for _, _, forks in g.triples((origin, FORGEFED.forks, None)): for _, _, total_items in g.triples( (forks, ACTIVITYSTREAMS.totalItems, None) ): num_forks = total_items.value num_likes = 0 for _, _, likes in g.triples((origin, ACTIVITYSTREAMS.likes, None)): for _, _, total_items in g.triples( (likes, ACTIVITYSTREAMS.totalItems, None) ): num_likes = total_items.value num_followers = 0 for _, _, followers in g.triples((origin, ACTIVITYSTREAMS.followers, None)): for _, _, total_items in g.triples( (followers, ACTIVITYSTREAMS.totalItems, None) ): num_followers = total_items.value return (upstream_origin_url, num_forks, num_likes, num_followers) def _get_origins_metadata( self, origins: List[str] ) -> Dict[str, "OriginExtrinsicMetadataRow"]: import itertools import math import multiprocessing.dummy import tqdm from swh.core.utils import grouper from swh.indexer.storage import get_indexer_storage idx_storage = get_indexer_storage("remote", url=self.indexer_storage_url) num_concurrent_requests = 10 num_origins_per_request = 100 with multiprocessing.dummy.Pool(num_concurrent_requests) as p: return { row.id: row for row in itertools.chain.from_iterable( tqdm.tqdm( p.imap_unordered( idx_storage.origin_extrinsic_metadata_get, grouper(origins, num_origins_per_request), ), desc="Fetching origin metadata", total=math.ceil(len(origins) / num_origins_per_request), ), ) } def _get_listed_origins(self, origins: List[str]) -> Dict[str, ParsedListedOrigin]: import functools import itertools import math import multiprocessing.dummy import tqdm from swh.core.utils import grouper from swh.scheduler import get_scheduler scheduler = get_scheduler("remote", url=self.swh_scheduler_url) num_concurrent_requests = 10 num_origins_per_request = 100 with multiprocessing.dummy.Pool(num_concurrent_requests) as p: return { listed_origin["url"]: listed_origin for listed_origin in itertools.chain.from_iterable( tqdm.tqdm( p.imap_unordered( functools.partial( self._get_listed_origins_group, scheduler=scheduler ), grouper(origins, num_origins_per_request), ), desc="Fetching scheduler info", total=math.ceil(len(origins) / num_origins_per_request), ), ) } def _get_listed_origins_group( self, origins: List[str], scheduler: "SchedulerInterface" ) -> List[ParsedListedOrigin]: import datetime from swh.core.api.classes import stream_results results: Dict[str, ParsedListedOrigin] = {} for listed_origin in stream_results(scheduler.get_listed_origins, urls=origins): origin_result = results.setdefault( listed_origin.url, { "url": listed_origin.url, "last_update": datetime.datetime.max.replace( tzinfo=datetime.timezone.utc ), "enabled": False, }, ) if listed_origin.last_update: origin_result["last_update"] = min( origin_result["last_update"], listed_origin.last_update ) origin_result["enabled"] = origin_result["enabled"] or listed_origin.enabled return list(results.values())