# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks for contribution graph
==================================
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
driving the creation of the graph of contributions of people (pseudonymized
by default).
File layout
-----------
This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`)
is present, and generates/manipulates the following files::
base_dir/
<date>[_<flavor>]/
datasets/
contribution_graph.csv.zst
topology/
topological_order_dfs.csv.zst
And optionally::
sensitive_base_dir/
<date>[_<flavor>]/
persons_sha256_to_name.csv.zst
datasets/
contribution_graph.deanonymized.csv.zst
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
from pathlib import Path
from typing import Dict, Iterable, List, Tuple, cast
import luigi
from swh.graph.libs.luigi.topology import ComputeGenerations
from swh.graph.luigi.compressed_graph import LocalGraph
from swh.graph.luigi.utils import count_nodes
logger = logging.getLogger(__name__)
[docs]
class ListOriginContributors(luigi.Task):
"""Creates a file that contains all SWHIDs in topological order from a compressed
graph."""
local_graph_path = luigi.PathParameter()
topological_order_dir = luigi.PathParameter()
origin_contributors_path = luigi.PathParameter()
origin_urls_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
max_ram_mb = luigi.IntParameter(default=500_000)
@property
def resources(self):
"""Returns the value of ``self.max_ram_mb``"""
import socket
return {f"{socket.getfqdn()}_ram_mb": self.max_ram_mb}
[docs]
def requires(self) -> Dict[str, luigi.Task]:
"""Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`
and :class:`swh.graph.libs.luigi.topology.ComputeGenerations`."""
return {
"graph": LocalGraph(local_graph_path=self.local_graph_path),
"toposort": ComputeGenerations(
local_graph_path=self.local_graph_path,
topological_order_dir=self.topological_order_dir,
graph_name=self.graph_name,
direction="backward",
object_types="rev,rel,snp,ori",
),
}
[docs]
def output(self) -> List[luigi.Target]:
""".csv.zst file that contains the origin_id<->contributor_id map
and the list of origins"""
return [
luigi.LocalTarget(self.origin_contributors_path),
luigi.LocalTarget(self.origin_urls_path),
]
[docs]
def run(self) -> None:
"""Runs org.softwareheritage.graph.utils.ListOriginContributors and compresses"""
import tempfile
from swh.datasets.shell import Rust
from swh.graph.luigi.utils import count_nodes
from swh.graph.shell import AtomicFileSink, Command
generations_path = Path(self.input()["toposort"]["topo_order"].path)
num_nodes = count_nodes(
self.local_graph_path, self.graph_name, "rev,rel,snp,ori"
)
with tempfile.NamedTemporaryFile(
prefix="origin_urls_", suffix=".csv"
) as origin_urls_fd:
# fmt: off
(
Rust(
"origin-contributors",
self.local_graph_path / self.graph_name,
"--num-nodes",
str(num_nodes),
"--order",
generations_path,
"--origins-out",
origin_urls_fd.name,
)
| Command.zstdmt("-19")
> AtomicFileSink(self.origin_contributors_path)
).run()
(
Command.pv(origin_urls_fd.name)
| Command.zstdmt("-19")
> AtomicFileSink(self.origin_urls_path)
).run()
# fmt: on
[docs]
class ExportDeanonymizationTable(luigi.Task):
"""Exports (from swh-storage) a .csv.zst file that contains the columns:
``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
in the compressed graph, and the latter two are the original name."""
storage_dsn = luigi.Parameter(
default="service=swh",
description="postgresql DSN of the swh-storage database to read from.",
)
deanonymization_table_path = luigi.PathParameter()
[docs]
def output(self) -> luigi.Target:
""".csv.zst file that contains the table."""
return luigi.LocalTarget(self.deanonymization_table_path)
[docs]
def run(self) -> None:
"""Runs a postgresql query to compute the table."""
import shutil
from swh.graph.shell import AtomicFileSink, Command
if shutil.which("psql") is None:
raise RuntimeError("psql CLI is not installed")
query = """
COPY (
SELECT
encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \
encode(fullname, 'base64') as base64, \
encode(fullname, 'escape') as escaped \
FROM person \
) TO STDOUT CSV HEADER \
"""
# fmt: off
(
Command.psql(self.storage_dsn, "-c", query)
| Command.zstdmt("-19")
> AtomicFileSink(self.deanonymization_table_path)
).run()
# fmt: on
[docs]
class DeanonymizeContributors(luigi.Task):
"""Generates a .csv.zst file that contains the columns:
``contributor_id``, ``contributor_base64``, and ``contributor_escaped``.
The first column is the id of the contributor in the compressed graph, and
the latter two are the same as in the file produced by
:class:`ExportDeanonymizationTable`."""
local_graph_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
deanonymization_table_path = luigi.PathParameter()
mph_algo = luigi.ChoiceParameter(choices=["cmph", "pthash"], default="pthash")
deanonymization_mapping_path = luigi.PathParameter()
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns an instance of :class:`ExportDeanonymizationTable`."""
return [
ExportDeanonymizationTable(
deanonymization_table_path=self.deanonymization_table_path
)
]
[docs]
def output(self) -> luigi.Target:
""".csv.zst file that contains the contributor_id<->contributor_name
map, as well as the escaped version of the contributor name."""
return luigi.LocalTarget(self.deanonymization_mapping_path)
[docs]
def run(self) -> None:
"""Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
and the deanonymization table in memory, then uses them to map each row
in the original (anonymized) contributors list to the deanonymized one."""
import base64
import csv
import pyzstd
import tqdm
from swh.graph.shell import AtomicFileSink, Command, Rust, Sink
# Load the deanonymization table, to map sha256(name) to base64(name)
# and escape(name)
sha256_to_names: Dict[bytes, Tuple[str, str]] = {}
with pyzstd.open(self.deanonymization_table_path, "rt") as fd:
# TODO: remove that cast once we dropped Python 3.7 support
csv_reader = csv.reader(cast(Iterable[str], fd))
header = next(csv_reader)
assert header == ["sha256_base64", "base64", "escaped"], header
for csv_line in tqdm.tqdm(
csv_reader, unit_scale=True, desc="Loading deanonymization table"
):
(base64_sha256_name, base64_name, escaped_name) = csv_line
sha256_name = base64.b64decode(base64_sha256_name)
sha256_to_names[sha256_name] = (base64_name, escaped_name)
# Combine with the list of sha256(name), to get the list of base64(name)
# and escape(name)
logger.info("Computing person ids using MPH...")
persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
if persons_path.exists():
# It can exist for some graphs (e.g. 2024-08-23)
persons_paths = [persons_path]
else:
# some other graphs
persons_dir = self.local_graph_path / f"{self.graph_name}.persons"
persons_paths = list(persons_dir.glob("*.zst"))
# Newer graphs don't have that file anymore, so we need to recreate it
if not persons_paths:
(
Rust(
"swh-graph-extract",
"extract-persons",
self.local_graph_path.parent / "orc",
)
| Command.zstdmt()
> AtomicFileSink(persons_path)
).run()
persons_paths = [persons_path]
# fmt: off
person_ids = (
Command.pv(*persons_paths)
| Command.zstdcat()
| Rust(
"swh-graph-hash",
"persons",
"--mph-algo",
self.mph_algo,
"--mph",
self.local_graph_path / f"{self.graph_name}.persons.{self.mph_algo}",
*(["--workaround-2024-08-23"] if self.graph_name == "2024-08-23" else []),
)
> Sink()
).run()
nb_persons = person_ids.stdout.count(b"\n")
person_ids_it = iter(person_ids.stdout.decode("ascii").split("\n"))
# fmt: on
person_id_to_names: Dict[int, Tuple[bytes, str]] = {}
tmp_mapping_path = Path(f"{self.deanonymization_mapping_path.parent}.tmp")
tmp_mapping_path.parent.mkdir(parents=True, exist_ok=True)
for persons_path in persons_paths:
with pyzstd.open(persons_path, "rt") as fd:
with pyzstd.open(tmp_mapping_path, "wt") as output_fd:
csv_writer = csv.writer(output_fd, lineterminator="\r\n")
# write header
csv_writer.writerow(("id", "base64", "escaped"))
for line in tqdm.tqdm(
fd,
unit_scale=True,
total=nb_persons - len(person_id_to_names),
desc="Getting person ids",
):
base64_name, name = sha256_to_names.pop(
base64.b64decode(line.strip()), ("", "")
)
csv_writer.writerow(
(int(next(person_ids_it)), base64_name, name)
)
assert (
next(person_ids_it) == ""
), "swh-graph-hash output has fewer lines than its input"
tmp_mapping_path.replace(self.deanonymization_mapping_path)
[docs]
class DeanonymizeOriginContributors(luigi.Task):
"""Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
but with ``contributor_base64`` and ``contributor_escaped`` columns in addition to
``contributor_id``.
This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
instead of names); which may not be true depending on how the swh-dataset export
was configured.
"""
local_graph_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
origin_contributors_path = luigi.PathParameter()
deanonymization_table_path = luigi.PathParameter()
deanonymized_origin_contributors_path = luigi.PathParameter()
mph_algo = luigi.ChoiceParameter(choices=["cmph", "pthash"], default="pthash")
deanonymization_mapping_path = luigi.PathParameter()
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
:class:`ExportDeanonymizationTable` and :class:`DeanonymizeContributors`."""
return [
LocalGraph(local_graph_path=self.local_graph_path),
ListOriginContributors(
local_graph_path=self.local_graph_path,
origin_contributors_path=self.origin_contributors_path,
),
ExportDeanonymizationTable(
deanonymization_table_path=self.deanonymization_table_path,
),
DeanonymizeContributors(
local_graph_path=self.local_graph_path,
graph_name=self.graph_name,
deanonymization_table_path=self.deanonymization_table_path,
mph_algo=self.mph_algo,
deanonymization_mapping_path=self.deanonymization_mapping_path,
),
]
[docs]
def output(self) -> luigi.Target:
""".csv.zst file similar to :meth:`ListOriginContributors.output`'s,
but with ``contributor_base64`` and ``contributor_escaped`` columns in addition
to ``contributor_id``"""
return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
[docs]
def run(self) -> None:
"""Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
and the deanonymization table in memory, then uses them to map each row
in the original (anonymized) contributors list to the deanonymized one."""
# TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
# was configured to do so); this should add support for it.
import base64
import csv
import pyzstd
import tqdm
# Read the set of person ids from the main table
person_ids = set()
with pyzstd.open(self.origin_contributors_path, "rt") as input_fd:
# TODO: remove that cast once we dropped Python 3.7 support
csv_reader = csv.reader(cast(Iterable[str], input_fd))
header = next(csv_reader)
assert header == ["origin_id", "contributor_id", "years"], header
for origin_id, person_id_str, years in tqdm.tqdm(
csv_reader, unit_scale=True, desc="Reading set of contributor ids"
):
if person_id_str == "null":
# FIXME: workaround for a bug in contribution graphs generated
# before 2022-12-01. Those were only used in tests and never
# published, so the conditional can be removed when this is
# productionized
continue
person_ids.add(int(person_id_str))
person_id_to_names: Dict[int, Tuple[bytes, str]] = {}
with pyzstd.open(self.deanonymization_mapping_path, "rt") as mapping_fd:
# TODO: remove that cast once we dropped Python 3.7 support
csv_reader = csv.reader(cast(Iterable[str], mapping_fd))
header = next(csv_reader)
assert header == ["id", "base64", "escaped"], header
for id, base64_name, escaped in tqdm.tqdm(
csv_reader, unit_scale=True, desc="Reading mapping from id to name"
):
person_id_to_names[int(id)] = (base64.b64decode(base64_name), escaped)
# Finally, write a new table of all persons.
tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
tmp_output_path.parent.mkdir(parents=True, exist_ok=True)
with pyzstd.open(tmp_output_path, "wt") as output_fd:
csv_writer = csv.writer(output_fd, lineterminator="\r\n")
# write header
csv_writer.writerow(
("contributor_id", "contributor_base64", "contributor_escaped")
)
for person_id in tqdm.tqdm(
sorted(person_ids), unit_scale=True, desc="Writing contributor names"
):
v = person_id_to_names.get(person_id)
if v is None:
logger.error("No person with id %s", person_id)
continue
(name, escaped_name) = v
base64_name = base64.b64encode(name).decode("ascii")
csv_writer.writerow((person_id, base64_name, escaped_name))
tmp_output_path.replace(self.deanonymized_origin_contributors_path)
[docs]
class RunOriginContributors(luigi.Task):
local_graph_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
origin_urls_path = luigi.PathParameter()
origin_contributors_path = luigi.PathParameter()
deanonymized_origin_contributors_path = luigi.PathParameter()
skip_integrity_check = luigi.BoolParameter()
test_origin = luigi.Parameter(
default="https://forge.softwareheritage.org/source/swh-graph.git"
)
test_person = luigi.Parameter(default="vlorentz")
test_years = luigi.Parameter(default="2021 2022")
[docs]
def requires(self) -> List[luigi.Task]:
"""Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
and :class:`ExportDeanonymizationTable`."""
return [
ListOriginContributors(
graph_name=self.graph_name,
origin_urls_path=self.origin_urls_path,
origin_contributors_path=self.origin_contributors_path,
),
DeanonymizeOriginContributors(
graph_name=self.graph_name,
deanonymized_origin_contributors_path=self.deanonymized_origin_contributors_path,
),
]
[docs]
def run(self) -> None:
"""Checks integrity of the produced dataset using a well-known example"""
import base64
import csv
import pyzstd
import tqdm
if self.skip_integrity_check:
return
origin_count = count_nodes(self.local_graph_path, self.graph_name, "ori")
person_count = int(
(self.local_graph_path / f"{self.graph_name}.persons.count.txt")
.read_text()
.strip()
)
origin_id = None
with pyzstd.open(self.origin_urls_path, "rt") as fd:
reader = csv.reader(cast(Iterable[str], fd))
header = next(reader)
assert header == ["origin_id", "origin_url_base64"], header
encoded_origin_url = base64.b64encode(self.test_origin.encode()).decode()
for line in tqdm.tqdm(
reader, unit_scale=True, desc="Reading origin URLs", total=origin_count
):
if line[1] == encoded_origin_url:
assert (
origin_id is None
), f"Duplicate origin {self.test_origin}: has ids {origin_id} and {line[0]}"
origin_id = line[0]
if origin_id is None:
assert f"{self.test_origin} is absent from the list of origins"
approx_contributors_per_origin = 8.5 # in 2022-12-07
contributors_by_id = {}
with pyzstd.open(self.origin_contributors_path, "rt") as fd:
reader = csv.reader(cast(Iterable[str], fd))
header = next(reader)
assert header == ["origin_id", "contributor_id", "years"], header
for line in tqdm.tqdm(
reader,
unit_scale=True,
desc="Reading contributors",
total=origin_count * approx_contributors_per_origin,
):
if line[0] == origin_id:
contributors_by_id[line[1]] = line[2]
assert (
len(contributors_by_id) < 10000
), "Unexpectedly many contributors to {self.test_origin}"
assert (
len(contributors_by_id) > 10
), f"Unexpectedly few contributors to {self.test_origin}: {contributors_by_id}"
years = set()
contributors = []
with pyzstd.open(self.deanonymized_origin_contributors_path, "rt") as fd:
reader = csv.reader(cast(Iterable[str], fd))
header = next(reader)
assert header == [
"contributor_id",
"contributor_base64",
"contributor_escaped",
], header
for line in tqdm.tqdm(
reader,
unit_scale=True,
desc="Reading person names",
total=person_count, # reasonably-tight upper bound
):
if line[0] in contributors_by_id:
if self.test_person in line[0]:
years |= set(contributors_by_id.pop(line[0]).split(" "))
contributors.append(line[2])
del contributors_by_id[line[0]]
assert (
not contributors_by_id
), f"Person ids with no person: {contributors_by_id} (all contributors: {contributors})"
assert any(
self.test_person in contributor for contributor in contributors
), "{self.test_person} is not among the contributors to {self.test_origin}"
missing_years = years - set(self.test_years.split())
assert not missing_years, (
f"{missing_years} absent from {self.test_person}'s years: {years!r} "
f"(contributor_id={line[0]}, origin_id={origin_id})"
)