Source code for swh.datasets.luigi.origin_contributors

# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""
Luigi tasks for contribution graph
==================================

This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
driving the creation of the graph of contributions of people (pseudonymized
by default).

File layout
-----------

This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`)
is present, and generates/manipulates the following files::

    base_dir/
        <date>[_<flavor>]/
            datasets/
                contribution_graph.csv.zst
            topology/
                topological_order_dfs.csv.zst

And optionally::

    sensitive_base_dir/
        <date>[_<flavor>]/
            persons_sha256_to_name.csv.zst
            datasets/
                contribution_graph.deanonymized.csv.zst
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
from pathlib import Path
from typing import Dict, Iterable, List, Tuple, cast

import luigi

from swh.graph.libs.luigi.topology import ComputeGenerations
from swh.graph.luigi.compressed_graph import LocalGraph
from swh.graph.luigi.utils import count_nodes

logger = logging.getLogger(__name__)


[docs] class ListOriginContributors(luigi.Task): """Creates a file that contains all SWHIDs in topological order from a compressed graph.""" local_graph_path = luigi.PathParameter() topological_order_dir = luigi.PathParameter() origin_contributors_path = luigi.PathParameter() origin_urls_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") max_ram_mb = luigi.IntParameter(default=500_000) @property def resources(self): """Returns the value of ``self.max_ram_mb``""" import socket return {f"{socket.getfqdn()}_ram_mb": self.max_ram_mb}
[docs] def requires(self) -> Dict[str, luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.libs.luigi.topology.ComputeGenerations`.""" return { "graph": LocalGraph(local_graph_path=self.local_graph_path), "toposort": ComputeGenerations( local_graph_path=self.local_graph_path, topological_order_dir=self.topological_order_dir, graph_name=self.graph_name, direction="backward", object_types="rev,rel,snp,ori", ), }
[docs] def output(self) -> List[luigi.Target]: """.csv.zst file that contains the origin_id<->contributor_id map and the list of origins""" return [ luigi.LocalTarget(self.origin_contributors_path), luigi.LocalTarget(self.origin_urls_path), ]
[docs] def run(self) -> None: """Runs org.softwareheritage.graph.utils.ListOriginContributors and compresses""" import tempfile from swh.datasets.shell import Rust from swh.graph.luigi.utils import count_nodes from swh.graph.shell import AtomicFileSink, Command generations_path = Path(self.input()["toposort"]["topo_order"].path) num_nodes = count_nodes( self.local_graph_path, self.graph_name, "rev,rel,snp,ori" ) with tempfile.NamedTemporaryFile( prefix="origin_urls_", suffix=".csv" ) as origin_urls_fd: # fmt: off ( Rust( "origin-contributors", self.local_graph_path / self.graph_name, "--num-nodes", str(num_nodes), "--order", generations_path, "--origins-out", origin_urls_fd.name, ) | Command.zstdmt("-19") > AtomicFileSink(self.origin_contributors_path) ).run() ( Command.pv(origin_urls_fd.name) | Command.zstdmt("-19") > AtomicFileSink(self.origin_urls_path) ).run()
# fmt: on
[docs] class ExportDeanonymizationTable(luigi.Task): """Exports (from swh-storage) a .csv.zst file that contains the columns: ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. The first column is the anonymized full name found in :file:`graph.persons.csv.zst` in the compressed graph, and the latter two are the original name.""" storage_dsn = luigi.Parameter( default="service=swh", description="postgresql DSN of the swh-storage database to read from.", ) deanonymization_table_path = luigi.PathParameter()
[docs] def output(self) -> luigi.Target: """.csv.zst file that contains the table.""" return luigi.LocalTarget(self.deanonymization_table_path)
[docs] def run(self) -> None: """Runs a postgresql query to compute the table.""" import shutil from swh.graph.shell import AtomicFileSink, Command if shutil.which("psql") is None: raise RuntimeError("psql CLI is not installed") query = """ COPY ( SELECT encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \ encode(fullname, 'base64') as base64, \ encode(fullname, 'escape') as escaped \ FROM person \ ) TO STDOUT CSV HEADER \ """ # fmt: off ( Command.psql(self.storage_dsn, "-c", query) | Command.zstdmt("-19") > AtomicFileSink(self.deanonymization_table_path) ).run()
# fmt: on
[docs] class DeanonymizeContributors(luigi.Task): """Generates a .csv.zst file that contains the columns: ``contributor_id``, ``contributor_base64``, and ``contributor_escaped``. The first column is the id of the contributor in the compressed graph, and the latter two are the same as in the file produced by :class:`ExportDeanonymizationTable`.""" local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") deanonymization_table_path = luigi.PathParameter() mph_algo = luigi.ChoiceParameter(choices=["cmph", "pthash"], default="pthash") deanonymization_mapping_path = luigi.PathParameter()
[docs] def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`ExportDeanonymizationTable`.""" return [ ExportDeanonymizationTable( deanonymization_table_path=self.deanonymization_table_path ) ]
[docs] def output(self) -> luigi.Target: """.csv.zst file that contains the contributor_id<->contributor_name map, as well as the escaped version of the contributor name.""" return luigi.LocalTarget(self.deanonymization_mapping_path)
[docs] def run(self) -> None: """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset and the deanonymization table in memory, then uses them to map each row in the original (anonymized) contributors list to the deanonymized one.""" import base64 import csv import pyzstd import tqdm from swh.graph.shell import AtomicFileSink, Command, Rust, Sink # Load the deanonymization table, to map sha256(name) to base64(name) # and escape(name) sha256_to_names: Dict[bytes, Tuple[str, str]] = {} with pyzstd.open(self.deanonymization_table_path, "rt") as fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], fd)) header = next(csv_reader) assert header == ["sha256_base64", "base64", "escaped"], header for csv_line in tqdm.tqdm( csv_reader, unit_scale=True, desc="Loading deanonymization table" ): (base64_sha256_name, base64_name, escaped_name) = csv_line sha256_name = base64.b64decode(base64_sha256_name) sha256_to_names[sha256_name] = (base64_name, escaped_name) # Combine with the list of sha256(name), to get the list of base64(name) # and escape(name) logger.info("Computing person ids using MPH...") persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" if persons_path.exists(): # It can exist for some graphs (e.g. 2024-08-23) persons_paths = [persons_path] else: # some other graphs persons_dir = self.local_graph_path / f"{self.graph_name}.persons" persons_paths = list(persons_dir.glob("*.zst")) # Newer graphs don't have that file anymore, so we need to recreate it if not persons_paths: ( Rust( "swh-graph-extract", "extract-persons", self.local_graph_path.parent / "orc", ) | Command.zstdmt() > AtomicFileSink(persons_path) ).run() persons_paths = [persons_path] # fmt: off person_ids = ( Command.pv(*persons_paths) | Command.zstdcat() | Rust( "swh-graph-hash", "persons", "--mph-algo", self.mph_algo, "--mph", self.local_graph_path / f"{self.graph_name}.persons.{self.mph_algo}", *(["--workaround-2024-08-23"] if self.graph_name == "2024-08-23" else []), ) > Sink() ).run() nb_persons = person_ids.stdout.count(b"\n") person_ids_it = iter(person_ids.stdout.decode("ascii").split("\n")) # fmt: on person_id_to_names: Dict[int, Tuple[bytes, str]] = {} tmp_mapping_path = Path(f"{self.deanonymization_mapping_path.parent}.tmp") tmp_mapping_path.parent.mkdir(parents=True, exist_ok=True) for persons_path in persons_paths: with pyzstd.open(persons_path, "rt") as fd: with pyzstd.open(tmp_mapping_path, "wt") as output_fd: csv_writer = csv.writer(output_fd, lineterminator="\r\n") # write header csv_writer.writerow(("id", "base64", "escaped")) for line in tqdm.tqdm( fd, unit_scale=True, total=nb_persons - len(person_id_to_names), desc="Getting person ids", ): base64_name, name = sha256_to_names.pop( base64.b64decode(line.strip()), ("", "") ) csv_writer.writerow( (int(next(person_ids_it)), base64_name, name) ) assert ( next(person_ids_it) == "" ), "swh-graph-hash output has fewer lines than its input" tmp_mapping_path.replace(self.deanonymization_mapping_path)
[docs] class DeanonymizeOriginContributors(luigi.Task): """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, but with ``contributor_base64`` and ``contributor_escaped`` columns in addition to ``contributor_id``. This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names instead of names); which may not be true depending on how the swh-dataset export was configured. """ local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") origin_contributors_path = luigi.PathParameter() deanonymization_table_path = luigi.PathParameter() deanonymized_origin_contributors_path = luigi.PathParameter() mph_algo = luigi.ChoiceParameter(choices=["cmph", "pthash"], default="pthash") deanonymization_mapping_path = luigi.PathParameter()
[docs] def requires(self) -> List[luigi.Task]: """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, :class:`ExportDeanonymizationTable` and :class:`DeanonymizeContributors`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), ListOriginContributors( local_graph_path=self.local_graph_path, origin_contributors_path=self.origin_contributors_path, ), ExportDeanonymizationTable( deanonymization_table_path=self.deanonymization_table_path, ), DeanonymizeContributors( local_graph_path=self.local_graph_path, graph_name=self.graph_name, deanonymization_table_path=self.deanonymization_table_path, mph_algo=self.mph_algo, deanonymization_mapping_path=self.deanonymization_mapping_path, ), ]
[docs] def output(self) -> luigi.Target: """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, but with ``contributor_base64`` and ``contributor_escaped`` columns in addition to ``contributor_id``""" return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
[docs] def run(self) -> None: """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset and the deanonymization table in memory, then uses them to map each row in the original (anonymized) contributors list to the deanonymized one.""" # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export # was configured to do so); this should add support for it. import base64 import csv import pyzstd import tqdm # Read the set of person ids from the main table person_ids = set() with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], input_fd)) header = next(csv_reader) assert header == ["origin_id", "contributor_id", "years"], header for origin_id, person_id_str, years in tqdm.tqdm( csv_reader, unit_scale=True, desc="Reading set of contributor ids" ): if person_id_str == "null": # FIXME: workaround for a bug in contribution graphs generated # before 2022-12-01. Those were only used in tests and never # published, so the conditional can be removed when this is # productionized continue person_ids.add(int(person_id_str)) person_id_to_names: Dict[int, Tuple[bytes, str]] = {} with pyzstd.open(self.deanonymization_mapping_path, "rt") as mapping_fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], mapping_fd)) header = next(csv_reader) assert header == ["id", "base64", "escaped"], header for id, base64_name, escaped in tqdm.tqdm( csv_reader, unit_scale=True, desc="Reading mapping from id to name" ): person_id_to_names[int(id)] = (base64.b64decode(base64_name), escaped) # Finally, write a new table of all persons. tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") tmp_output_path.parent.mkdir(parents=True, exist_ok=True) with pyzstd.open(tmp_output_path, "wt") as output_fd: csv_writer = csv.writer(output_fd, lineterminator="\r\n") # write header csv_writer.writerow( ("contributor_id", "contributor_base64", "contributor_escaped") ) for person_id in tqdm.tqdm( sorted(person_ids), unit_scale=True, desc="Writing contributor names" ): v = person_id_to_names.get(person_id) if v is None: logger.error("No person with id %s", person_id) continue (name, escaped_name) = v base64_name = base64.b64encode(name).decode("ascii") csv_writer.writerow((person_id, base64_name, escaped_name)) tmp_output_path.replace(self.deanonymized_origin_contributors_path)
[docs] class RunOriginContributors(luigi.Task): local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") origin_urls_path = luigi.PathParameter() origin_contributors_path = luigi.PathParameter() deanonymized_origin_contributors_path = luigi.PathParameter() skip_integrity_check = luigi.BoolParameter() test_origin = luigi.Parameter( default="https://forge.softwareheritage.org/source/swh-graph.git" ) test_person = luigi.Parameter(default="vlorentz") test_years = luigi.Parameter(default="2021 2022")
[docs] def requires(self) -> List[luigi.Task]: """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, and :class:`ExportDeanonymizationTable`.""" return [ ListOriginContributors( graph_name=self.graph_name, origin_urls_path=self.origin_urls_path, origin_contributors_path=self.origin_contributors_path, ), DeanonymizeOriginContributors( graph_name=self.graph_name, deanonymized_origin_contributors_path=self.deanonymized_origin_contributors_path, ), ]
[docs] def run(self) -> None: """Checks integrity of the produced dataset using a well-known example""" import base64 import csv import pyzstd import tqdm if self.skip_integrity_check: return origin_count = count_nodes(self.local_graph_path, self.graph_name, "ori") person_count = int( (self.local_graph_path / f"{self.graph_name}.persons.count.txt") .read_text() .strip() ) origin_id = None with pyzstd.open(self.origin_urls_path, "rt") as fd: reader = csv.reader(cast(Iterable[str], fd)) header = next(reader) assert header == ["origin_id", "origin_url_base64"], header encoded_origin_url = base64.b64encode(self.test_origin.encode()).decode() for line in tqdm.tqdm( reader, unit_scale=True, desc="Reading origin URLs", total=origin_count ): if line[1] == encoded_origin_url: assert ( origin_id is None ), f"Duplicate origin {self.test_origin}: has ids {origin_id} and {line[0]}" origin_id = line[0] if origin_id is None: assert f"{self.test_origin} is absent from the list of origins" approx_contributors_per_origin = 8.5 # in 2022-12-07 contributors_by_id = {} with pyzstd.open(self.origin_contributors_path, "rt") as fd: reader = csv.reader(cast(Iterable[str], fd)) header = next(reader) assert header == ["origin_id", "contributor_id", "years"], header for line in tqdm.tqdm( reader, unit_scale=True, desc="Reading contributors", total=origin_count * approx_contributors_per_origin, ): if line[0] == origin_id: contributors_by_id[line[1]] = line[2] assert ( len(contributors_by_id) < 10000 ), "Unexpectedly many contributors to {self.test_origin}" assert ( len(contributors_by_id) > 10 ), f"Unexpectedly few contributors to {self.test_origin}: {contributors_by_id}" years = set() contributors = [] with pyzstd.open(self.deanonymized_origin_contributors_path, "rt") as fd: reader = csv.reader(cast(Iterable[str], fd)) header = next(reader) assert header == [ "contributor_id", "contributor_base64", "contributor_escaped", ], header for line in tqdm.tqdm( reader, unit_scale=True, desc="Reading person names", total=person_count, # reasonably-tight upper bound ): if line[0] in contributors_by_id: if self.test_person in line[0]: years |= set(contributors_by_id.pop(line[0]).split(" ")) contributors.append(line[2]) del contributors_by_id[line[0]] assert ( not contributors_by_id ), f"Person ids with no person: {contributors_by_id} (all contributors: {contributors})" assert any( self.test_person in contributor for contributor in contributors ), "{self.test_person} is not among the contributors to {self.test_origin}" missing_years = years - set(self.test_years.split()) assert not missing_years, ( f"{missing_years} absent from {self.test_person}'s years: {years!r} " f"(contributor_id={line[0]}, origin_id={origin_id})" )