Source code for swh.graph.luigi.origin_contributors

# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""
Luigi tasks for contribution graph
==================================

This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
driving the creation of the graph of contributions of people (pseudonymized
by default).

File layout
-----------

This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`)
is present, and generates/manipulates the following files::

    base_dir/
        <date>[_<flavor>]/
            datasets/
                contribution_graph.csv.zst
            topology/
                topological_order_dfs.csv.zst

And optionally::

    sensitive_base_dir/
        <date>[_<flavor>]/
            persons_sha256_to_name.csv.zst
            datasets/
                contribution_graph.deanonymized.csv.zst
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
from pathlib import Path
from typing import Dict, Iterable, List, Tuple, cast

import luigi

from .compressed_graph import LocalGraph
from .topology import ComputeGenerations
from .utils import count_nodes

logger = logging.getLogger(__name__)


[docs] class ListOriginContributors(luigi.Task): """Creates a file that contains all SWHIDs in topological order from a compressed graph.""" local_graph_path = luigi.PathParameter() topological_order_dir = luigi.PathParameter() origin_contributors_path = luigi.PathParameter() origin_urls_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") max_ram_mb = luigi.IntParameter(default=500_000) @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self.max_ram_mb}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.luigi.topology.ComputeGenerations`.""" return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "toposort": ComputeGenerations( local_graph_path=self.local_graph_path, topological_order_dir=self.topological_order_dir, graph_name=self.graph_name, direction="backward", object_types="rev,rel,snp,ori", ), }
[docs] def output(self) -> List[luigi.Target]: """.csv.zst file that contains the origin_id<->contributor_id map and the list of origins""" return [ luigi.LocalTarget(self.origin_contributors_path), luigi.LocalTarget(self.origin_urls_path), ]
[docs] def run(self) -> None: """Runs org.softwareheritage.graph.utils.ListOriginContributors and compresses""" import tempfile from ..shell import AtomicFileSink, Command, Rust from .utils import count_nodes generations_path = Path(self.input()["toposort"]["topo_order"].path) num_nodes = count_nodes( self.local_graph_path, self.graph_name, "rev,rel,snp,ori" ) with tempfile.NamedTemporaryFile( prefix="origin_urls_", suffix=".csv" ) as origin_urls_fd: # fmt: off ( Rust( "origin-contributors", self.local_graph_path / self.graph_name, "--num-nodes", str(num_nodes), "--order", generations_path, "--origins-out", origin_urls_fd.name, ) | Command.zstdmt("-19") > AtomicFileSink(self.origin_contributors_path) ).run() ( Command.pv(origin_urls_fd.name) | Command.zstdmt("-19") > AtomicFileSink(self.origin_urls_path) ).run()
# fmt: on
[docs] class ExportDeanonymizationTable(luigi.Task): """Exports (from swh-storage) a .csv.zst file that contains the columns: ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. The first column is the anonymized full name found in :file:`graph.persons.csv.zst` in the compressed graph, and the latter two are the original name.""" storage_dsn = luigi.Parameter( default="service=swh", description="postgresql DSN of the swh-storage database to read from.", ) deanonymization_table_path = luigi.PathParameter()
[docs] def output(self) -> luigi.Target: """.csv.zst file that contains the table.""" return luigi.LocalTarget(self.deanonymization_table_path)
[docs] def run(self) -> None: """Runs a postgresql query to compute the table.""" import shutil from ..shell import AtomicFileSink, Command if shutil.which("psql") is None: raise RuntimeError("psql CLI is not installed") query = """ COPY ( SELECT encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \ encode(fullname, 'base64') as base64, \ encode(fullname, 'escape') as escaped \ FROM person \ ) TO STDOUT CSV HEADER \ """ # fmt: off ( Command.psql(self.storage_dsn, "-c", query) | Command.zstdmt("-19") > AtomicFileSink(self.deanonymization_table_path) ).run()
# fmt: on
[docs] class DeanonymizeOriginContributors(luigi.Task): """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, but with ``contributor_base64`` and ``contributor_escaped`` columns in addition to ``contributor_id``. This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names instead of names); which may not be true depending on how the swh-dataset export was configured. """ local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") origin_contributors_path = luigi.PathParameter() deanonymization_table_path = luigi.PathParameter() deanonymized_origin_contributors_path = luigi.PathParameter() mph_algo = luigi.ChoiceParameter(choices=["cmph", "pthash"], default="pthash")
[docs] def requires(self) -> List[luigi.Task]: """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, and :class:`ExportDeanonymizationTable`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), ListOriginContributors( local_graph_path=self.local_graph_path, origin_contributors_path=self.origin_contributors_path, ), ExportDeanonymizationTable( deanonymization_table_path=self.deanonymization_table_path, ), ]
[docs] def output(self) -> luigi.Target: """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, but with ``contributor_base64`` and ``contributor_escaped`` columns in addition to ``contributor_id``""" return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
[docs] def run(self) -> None: """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset and the deanonymization table in memory, then uses them to map each row in the original (anonymized) contributors list to the deanonymized one.""" # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export # was configured to do so); this should add support for it. import base64 import csv import pyzstd import tqdm from ..shell import Command, Rust, Sink # Load the deanonymization table, to map sha256(name) to base64(name) # and escape(name) sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} with pyzstd.open(self.deanonymization_table_path, "rt") as fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], fd)) header = next(csv_reader) assert header == ["sha256_base64", "base64", "escaped"], header for csv_line in tqdm.tqdm( csv_reader, unit_scale=True, desc="Loading deanonymization table" ): (base64_sha256_name, base64_name, escaped_name) = csv_line sha256_name = base64.b64decode(base64_sha256_name) name = base64.b64decode(base64_name) sha256_to_names[sha256_name] = (name, escaped_name) # Combine with the list of sha256(name), to get the list of base64(name) # and escape(name) logger.info("Computing person ids using MPH...") persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" if persons_path.exists(): # 2024-08-23 graph persons_paths = [persons_path] else: # other graphs (before and after) persons_dir = self.local_graph_path / f"{self.graph_name}.persons" persons_paths = list(persons_dir.glob("*.zst")) assert persons_paths, f"{persons_dir} is empty" # fmt: off person_ids = ( Command.pv(*persons_paths) | Command.zstdcat() | Rust( "swh-graph-hash", "persons", "--mph-algo", self.mph_algo, "--mph", self.local_graph_path / f"{self.graph_name}.persons.{self.mph_algo}", *(["--workaround-2024-08-23"] if self.graph_name == "2024-08-23" else []), ) > Sink() ).run() nb_persons = person_ids.count(b"\n") person_ids_it = iter(person_ids.decode("ascii").split("\n")) # fmt: on person_id_to_names: Dict[int, Tuple[bytes, str]] = {} for persons_path in persons_paths: with pyzstd.open(persons_path, "rb") as fd: for line in tqdm.tqdm( fd, unit_scale=True, total=nb_persons - len(person_id_to_names), desc="Getting person ids", ): person_id_to_names[int(next(person_ids_it))] = sha256_to_names.pop( base64.b64decode(line.strip()), (b"", "") ) assert ( next(person_ids_it) == "" ), "swh-graph-hash output has fewer lines than its input" # Read the set of person ids from the main table person_ids = set() with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], input_fd)) header = next(csv_reader) assert header == ["origin_id", "contributor_id", "years"], header for origin_id, person_id_str, years in tqdm.tqdm( csv_reader, unit_scale=True, desc="Reading set of contributor ids" ): if person_id_str == "null": # FIXME: workaround for a bug in contribution graphs generated # before 2022-12-01. Those were only used in tests and never # published, so the conditional can be removed when this is # productionized continue person_ids.add(int(person_id_str)) # Finally, write a new table of all persons. tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") tmp_output_path.parent.mkdir(parents=True, exist_ok=True) with pyzstd.open(tmp_output_path, "wt") as output_fd: csv_writer = csv.writer(output_fd, lineterminator="\r\n") # write header csv_writer.writerow( ("contributor_id", "contributor_base64", "contributor_escaped") ) for person_id in tqdm.tqdm( sorted(person_ids), unit_scale=True, desc="Writing contributor names" ): v = person_id_to_names.get(person_id) if v is None: logger.error("No person with id %s", person_id) continue (name, escaped_name) = v base64_name = base64.b64encode(name).decode("ascii") csv_writer.writerow((person_id, base64_name, escaped_name)) tmp_output_path.replace(self.deanonymized_origin_contributors_path)
[docs] class RunOriginContributors(luigi.Task): local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") origin_urls_path = luigi.PathParameter() origin_contributors_path = luigi.PathParameter() deanonymized_origin_contributors_path = luigi.PathParameter() skip_integrity_check = luigi.BoolParameter() test_origin = luigi.Parameter( default="https://forge.softwareheritage.org/source/swh-graph.git" ) test_person = luigi.Parameter(default="vlorentz") test_years = luigi.Parameter(default="2021 2022")
[docs] def requires(self) -> List[luigi.Task]: """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, and :class:`ExportDeanonymizationTable`.""" return [ ListOriginContributors( graph_name=self.graph_name, origin_urls_path=self.origin_urls_path, origin_contributors_path=self.origin_contributors_path, ), DeanonymizeOriginContributors( graph_name=self.graph_name, deanonymized_origin_contributors_path=self.deanonymized_origin_contributors_path, ), ]
[docs] def run(self) -> None: """Checks integrity of the produced dataset using a well-known example""" import base64 import csv import pyzstd import tqdm if self.skip_integrity_check: return origin_count = count_nodes(self.local_graph_path, self.graph_name, "ori") person_count = int( (self.local_graph_path / f"{self.graph_name}.persons.count.txt") .read_text() .strip() ) origin_id = None with pyzstd.open(self.origin_urls_path, "rt") as fd: reader = csv.reader(cast(Iterable[str], fd)) header = next(reader) assert header == ["origin_id", "origin_url_base64"], header encoded_origin_url = base64.b64encode(self.test_origin.encode()).decode() for line in tqdm.tqdm( reader, unit_scale=True, desc="Reading origin URLs", total=origin_count ): if line[1] == encoded_origin_url: assert ( origin_id is None ), f"Duplicate origin {self.test_origin}: has ids {origin_id} and {line[0]}" origin_id = line[0] if origin_id is None: assert f"{self.test_origin} is absent from the list of origins" approx_contributors_per_origin = 8.5 # in 2022-12-07 contributors_by_id = {} with pyzstd.open(self.origin_contributors_path, "rt") as fd: reader = csv.reader(cast(Iterable[str], fd)) header = next(reader) assert header == ["origin_id", "contributor_id", "years"], header for line in tqdm.tqdm( reader, unit_scale=True, desc="Reading contributors", total=origin_count * approx_contributors_per_origin, ): if line[0] == origin_id: contributors_by_id[line[1]] = line[2] assert ( len(contributors_by_id) < 10000 ), "Unexpectedly many contributors to {self.test_origin}" assert ( len(contributors_by_id) > 10 ), f"Unexpectedly few contributors to {self.test_origin}: {contributors_by_id}" years = set() contributors = [] with pyzstd.open(self.deanonymized_origin_contributors_path, "rt") as fd: reader = csv.reader(cast(Iterable[str], fd)) header = next(reader) assert header == [ "contributor_id", "contributor_base64", "contributor_escaped", ], header for line in tqdm.tqdm( reader, unit_scale=True, desc="Reading person names", total=person_count, # reasonably-tight upper bound ): if line[0] in contributors_by_id: if self.test_person in line[0]: years |= set(contributors_by_id.pop(line[0]).split(" ")) contributors.append(line[2]) del contributors_by_id[line[0]] assert ( not contributors_by_id ), f"Person ids with no person: {contributors_by_id} (all contributors: {contributors})" assert any( self.test_person in contributor for contributor in contributors ), "{self.test_person} is not among the contributors to {self.test_origin}" missing_years = years - set(self.test_years.split()) assert not missing_years, ( f"{missing_years} absent from {self.test_person}'s years: {years!r} " f"(contributor_id={line[0]}, origin_id={origin_id})" )