Source code for swh.graph.luigi.blobs_datasets

# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""
Luigi tasks for blob-centric datasets
=====================================

This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
driving the creation of derived datasets centered around a subset of
content objects in the graph. Currently, this means:

* the `license dataset <https://annex.softwareheritage.org/public/dataset/license-blobs/>`_, and
* the `citation dataset <https://annex.softwareheritage.org/public/dataset/citation-blobs/>`_

File layout
-----------

This assumes a local compressed graph (from :mod:`swh.graph.luigi.compressed_graph`)
is present, and generates/manipulates the following files::

    base_dir/
        <date>[_<flavor>]/
            citation-blobs/
                blobs-earliest.csv.zst
                blobs-fileinfo.csv.zst
                blobs-nb-origins.csv.zst
                blobs-origins.csv.zst
                blobs-sample20k.tar.zst
                blobs.tar.zst
                import-dataset.sql
                license-blobs.csv.zst
            license-blobs/
                <same as above, plus these two:>
                blobs-scancode.csv.zst
                blobs-scancode.ndjson.zst
"""

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import contextlib
import functools
import hashlib
import logging
import os
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    ContextManager,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Set,
    Tuple,
    TypeVar,
    cast,
)

import luigi

from swh.dataset.luigi import Format, LocalExport

if TYPE_CHECKING:
    import asyncio

    import magic
    from requests import Session

    from swh.graph.grpc.swhgraph_pb2_grpc import TraversalServiceStub


def _s3_url_to_bucket_path(s3_url: str) -> Tuple[str, str]:
    loc = _removeprefix(s3_url, "s3://")
    bucket, path = loc.split("/", 1)
    return bucket, path


# XXX in wait of Python 3.9 for PER 616...
def _removeprefix(s, prefix):
    if s.startswith(prefix):
        return s[len(prefix) :]


logger = logging.getLogger(__name__)

COMPRESS_LEVEL = 19
GRAPH_REQUEST_CONCURRENCY = 70
EMPTY_FILE_SHA1 = "da39a3ee5e6b4b0d3255bfef95601890afd80709"


SELECTION_QUERIES = {
    "citation": r"""
        SELECT
            concat('swh:1:cnt:', t1.target) AS swhid,
            t2.sha1 AS sha1,
            t1.filename AS name
        FROM (
            SELECT DISTINCT target, lower(trim(TRY_CAST(name AS VARCHAR))) AS filename
            FROM directory_entry
            WHERE (
                type='file'
                AND (
                    lower(trim(TRY_CAST(name AS VARCHAR))) = 'codemeta.json'
                    OR lower(trim(TRY_CAST(name AS VARCHAR))) = 'citation.cff'
                )
            )
        ) AS t1
        LEFT JOIN (SELECT sha1,sha1_git FROM content) AS t2
        ON (t1.target=t2.sha1_git)
        ORDER BY sha1
    """,
    "license": r"""
        SELECT
            concat('swh:1:cnt:', t1.target) AS swhid,
            t2.sha1 AS sha1,
            t1.filename AS name
        FROM (
            SELECT DISTINCT target, lower(trim(TRY_CAST(name AS VARCHAR))) AS filename
            FROM directory_entry
            WHERE (
                type = 'file' AND
                -- TODO: replace not(empty(regexp_match())) with regexp_find()
                not(empty(regexp_match(
                    lower(TRY_CAST(name AS VARCHAR)),
                    '^([a-z0-9._-]+\.)?(copying|licen(c|s)(e|ing)|notice|copyright|disclaimer|authors)(\.[a-z0-9\._-]+)?$'
                )))
            )
        ) AS t1
        LEFT JOIN (SELECT sha1,sha1_git FROM content) AS t2
        ON (t1.target=t2.sha1_git)
        ORDER BY sha1
    """,
    "readme": r"""
        SELECT
            concat('swh:1:cnt:', t1.target) AS swhid,
            t2.sha1 AS sha1,
            t1.filename AS name
        FROM (
            SELECT DISTINCT target, lower(trim(TRY_CAST(name AS VARCHAR))) AS filename
            FROM directory_entry
            WHERE (
                type = 'file' AND
                -- TODO: replace not(empty(regexp_match())) with regexp_find()
                not(empty(regexp_match(
                    lower(TRY_CAST(name AS VARCHAR)),
                    '^(readme)(\.[a-z0-9\._-]+)?$'
                )))
            )
        ) AS t1
        LEFT JOIN (SELECT sha1,sha1_git FROM content) AS t2
        ON (t1.target=t2.sha1_git)
        ORDER BY sha1
    """,
    "known_swhids": r"""
        SELECT
            concat('swh:1:cnt:', content.sha1_git) AS swhid,
            content.sha1 AS sha1,
            '<unknown>' AS name
        FROM swhids
        INNER JOIN content
        ON (swhids.swhid=concat('swh:1:cnt:', content.sha1_git))
        ORDER BY sha1
    """,
}


_mime_guesser = None


def _init_mime_guesser():
    global _mime_guesser
    if _mime_guesser is None:
        import magic

        _mime_guesser = magic.Magic(mime=True, mime_encoding=True)

    return _mime_guesser


def _guess_mime(path: str) -> Tuple[str, str]:
    _mime_guesser = _init_mime_guesser()
    info = _mime_guesser.from_file(path)
    mime_type, encoding = info.split()
    mime_type, encoding = mime_type.rstrip(";"), _removeprefix(encoding, "charset=")

    return (mime_type, encoding)


[docs] @contextlib.contextmanager def atomic_zstd_writer(result_path: Path): """Returns a file-like object, which writes to a temporary file, then atomically renames it to the ``result_path`` on success.""" import pyzstd tmp_result_path = Path(f"{result_path}.tmp") try: with pyzstd.open( tmp_result_path, "wt", level_or_option=COMPRESS_LEVEL ) as output_fd: yield output_fd tmp_result_path.replace(result_path) except BaseException: tmp_result_path.unlink() raise
[docs] @contextlib.contextmanager def atomic_csv_zstd_writer(result_path: Path): """Returns a ``csv.writer`` object, which writes to a temporary file, then atomically renames it to the ``result_path`` on success.""" import csv with atomic_zstd_writer(result_path) as output_fd: yield csv.writer(output_fd, lineterminator="\n")
# luigi.Task with some helpers to get paths class _BaseTask(luigi.Task): blob_filter: str derived_datasets_path: Path previous_derived_datasets_path: Optional[Path] def blob_count(self) -> int: """Returns the total number of selected blobs""" with self.blob_count_path().open() as fd: return int(fd.read().strip()) def blob_size(self) -> int: """Returns the total size of selected blobs""" with self.blob_size_path().open() as fd: return int(fd.read().strip()) def blob_size_path(self) -> Path: return self.derived_datasets_path / self.blob_filter / "stats" / "size.txt" def blob_count_path(self) -> Path: return self.derived_datasets_path / self.blob_filter / "stats" / "count.txt" def blob_dir(self) -> Path: return self.derived_datasets_path / self.blob_filter / "blobs" def blob_list_path(self) -> Path: return self.derived_datasets_path / self.blob_filter / "blobs.csv.zst" def blob_tarball_path(self) -> Path: return self.derived_datasets_path / self.blob_filter / "blobs.tar.zst" def previous_blob_tarball_path(self) -> Optional[Path]: if self.previous_derived_datasets_path: return ( self.previous_derived_datasets_path / self.blob_filter / "blobs.tar.zst" ) else: return None def sample_blob_tarball_path(self) -> Path: return self.derived_datasets_path / self.blob_filter / "blobs-sample20k.tar.zst" def iter_blobs( self, *, unique_sha1: bool, with_tqdm: bool = True ) -> Iterator[Tuple[str, str, str]]: """Yields ``(swhid, sha1, name)`` by reading :file:`blobs.csv.zst`, and uses tqdm for progress report. If ``unique_sha1`` is True, skips all but the first occurrence of each sha1.""" import csv import pyzstd import tqdm last_sha1 = "" * 20 with pyzstd.open(self.blob_list_path(), "rt") as fd: reader = csv.reader(cast(Iterator[str], fd)) header = next(reader) if header != ["swhid", "sha1", "name"]: raise ValueError( "Unexpected header in %s: %r", self.blob_list_path(), header ) rows_it: Iterable[List[str]] = reader if with_tqdm: rows_it = tqdm.tqdm(rows_it, total=self.blob_count()) for row in rows_it: try: (swhid, sha1, name) = row except ValueError: raise ValueError(f"Unexpected row: {row!r}") from None if sha1 < last_sha1: raise ValueError(f"Not sorted by sha1 ({last_sha1} before {sha1}") if not unique_sha1 or sha1 != last_sha1: yield tuple(row) # type: ignore[misc] last_sha1 = sha1 def blob_paths(self, sha1: str) -> Tuple[Path, Path]: """Returns ``(sharded_path, unsharded_path)``, which are the two possible paths for this blob, depending on the blob dir layout.""" sharded_path = self.blob_dir() / sha1[0:2] / sha1[2:4] / sha1 unsharded_path = self.blob_dir() / sha1 return (sharded_path, unsharded_path) def complete(self) -> bool: if not super().complete(): return False for target in self.output(): output_path = target.path if output_path.endswith(".csv.zst"): check_csv(Path(output_path)) return True _TCallable = TypeVar("_TCallable", bound=Callable) def _log_exceptions(f: _TCallable) -> _TCallable: """Decorator for functions called by asyncio that would never be awaited if they crashed, causing asyncio to silently hide the exception.""" @functools.wraps(f) def newf(*args, **kwargs): try: return f(*args, **kwargs) except BaseException: logger.exception( "Error while calling %s with %r and %r", f.__name__, args, kwargs ) raise return newf # type: ignore[return-value] class _ConcurrentCsvWritingTask(_BaseTask): """Base classes for tasks writing a CSV using asyncio. asyncio is only used for gRPC requires to swh-graph; file writes are synchronous to keep the code simpler, as performance improvements from making them async would be negligeable.""" CSV_HEADER: Tuple[str, str] blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter() grpc_api = luigi.Parameter() stub: "TraversalServiceStub" def requires(self) -> luigi.Task: """Returns an instance of :class:`SelectBlobs`""" return SelectBlobs( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, ) def run(self) -> None: """Calls the :meth:`process_one` function, and writes its results as a two-column CSV to the target defined by :meth:`output`. """ import asyncio asyncio.run(self._run_async()) async def _run_async(self) -> None: import asyncio import grpc.aio import swh.graph.grpc.swhgraph_pb2_grpc as swhgraph_grpc input_queue: asyncio.Queue[Tuple[str, str, str]] = asyncio.Queue(maxsize=20) result_queue: asyncio.Queue[Tuple[str, str]] = asyncio.Queue(maxsize=20) async with grpc.aio.insecure_channel(self.grpc_api) as channel: self.stub = swhgraph_grpc.TraversalServiceStub(channel) fill_queue_task = asyncio.create_task(self._fill_input_queue(input_queue)) write_task = asyncio.create_task(self._write_results(result_queue)) worker_tasks = [ asyncio.create_task(self._worker(input_queue, result_queue)) for _ in range(GRAPH_REQUEST_CONCURRENCY) ] await write_task # wait for workers to write everything await fill_queue_task # should be instant for task in worker_tasks: task.cancel() await asyncio.gather( fill_queue_task, write_task, *worker_tasks, return_exceptions=True, ) @_log_exceptions async def _fill_input_queue( self, input_queue: "asyncio.Queue[Tuple[str, str, str]]" ) -> None: for swhid, sha1, name in self.iter_blobs(with_tqdm=False, unique_sha1=True): if not swhid.startswith("swh:1:"): raise ValueError(f"Invalid SWHID: {swhid}") await input_queue.put((swhid, sha1, name)) @_log_exceptions async def _worker( self, input_queue: "asyncio.Queue[Tuple[str, str, str]]", result_queue: "asyncio.Queue[Tuple[str, str]]", ) -> None: while True: # exit via Task.cancel() row = await input_queue.get() (swhid, sha1, name) = row try: res = await self.process_one(row) except BaseException as e: res = (swhid, "") logger.exception("Error while processing %r", row) if not isinstance(e, Exception): # KeyboardInterrupt, ... raise await result_queue.put(res) async def _write_results( self, result_queue: "asyncio.Queue[Tuple[str, str]]" ) -> None: import tqdm.asyncio (target,) = self.output() result_path = Path(target.path) with atomic_csv_zstd_writer(result_path) as writer: writer.writerow(self.CSV_HEADER) async for i in tqdm.asyncio.trange(self.blob_count()): (swhid, result) = await result_queue.get() writer.writerow((swhid, result))
[docs] def check_csv(csv_path: Path) -> None: import re import pyzstd with cast(ContextManager[Iterator[str]], pyzstd.open(csv_path, "rt")) as fd: try: header = next(fd) except StopIteration: raise ValueError(f"{csv_path} is empty") from None except pyzstd.ZstdError: raise ValueError(f"{csv_path} could not be decompressed as zstd") from None # check the header contains no whitespace if len(header.split()) != 1: raise ValueError( f"{csv_path.name} is not comma-separated " f"(or has whitespaces in column name)" ) columns = header.split(",") if columns[0] != "swhid": raise ValueError( f"First column of {csv_path.name} is {columns[0]!r} " f"but should be 'swhid'" ) try: first_line = next(fd) except StopIteration: raise ValueError(f"{csv_path} has no content") from None if not re.match("^swh:1:cnt:[0-9a-f]{40},", first_line): raise ValueError(f"{csv_path} has unexpected first row: {first_line}")
[docs] class SelectBlobs(_BaseTask): blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) local_export_path = luigi.PathParameter() derived_datasets_path = luigi.PathParameter() known_swhids_csv = luigi.Parameter(default="")
[docs] def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`LocalExport`""" return [ LocalExport( local_export_path=self.local_export_path, formats=[Format.orc], # type: ignore[attr-defined] ), ]
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs.csv.zst` and :file:`stats/count.txt` in ``self.derived_datasets_path / self.blob_filter``""" return [ luigi.LocalTarget(self.blob_list_path()), luigi.LocalTarget(self.blob_count_path()), ]
[docs] def run(self) -> None: """Runs an Athena query to get the list of blobs and writes it to :file:`blobs.csv.zst`.""" import tempfile import textwrap import datafusion import pyarrow.dataset from ..shell import AtomicFileSink, Command ctx = datafusion.SessionContext() for table in ("directory_entry", "content"): ctx.register_dataset( table, pyarrow.dataset.dataset( self.local_export_path / "orc" / table, format="orc" ), ) if self.blob_filter == "known_swhids": assert self.known_swhids_csv, "Missing --SelectBlobs-known-swhids-csv" ctx.sql( f""" CREATE EXTERNAL TABLE swhids ( swhid VARCHAR NOT NULL ) STORED AS CSV LOCATION '{self.known_swhids_csv}' OPTIONS ( 'has_header' 'true', 'format.compression' 'zstd', ); """ ) with tempfile.NamedTemporaryFile(suffix=".csv") as sql_res: logger.info("Running query...") query = f""" COPY ({SELECTION_QUERIES[self.blob_filter]}) TO '{sql_res.name}' OPTIONS ( 'has_header' 'true' ) """ logger.debug("%s", textwrap.indent(query, " ")) df = ctx.sql(query) logger.info("Reformatting...") columns = df.schema().names assert columns == ["swhid", "sha1", "name"], columns output_path = self.blob_list_path() output_path.parent.mkdir(parents=True, exist_ok=True) df.cache() # In the 2022-04-24 license dataset, the 'egrep' command filters # just 5 entries: # # $ egrep -v '^"[^"]*","[^"]*","[^"]*"$' license-blobs.csv # "swh:1:cnt:03e1933241b8c3878d81c0184d7f2fd3d8cd6185","037d40bc6bcb42dfd740be545dbdf2df3405442f","LICENSE # " # "swh:1:cnt:65a5c662900ee946583147129720563fd4ba286d","40e9258799f752fe25d7518155c615c1c497b7ac","LICENSE.md # " # "swh:1:cnt:8751b326784c7e145b242637866a4d46e8e274a5","a6bad643d9defc1e667c708d0d9aa9f1d6752fbc","LICENSE # " # "swh:1:cnt:e69de29bb2d1d6434b8b29ae775ad8c2e48c5391","da39a3ee5e6b4b0d3255bfef95601890afd80709","license.txt # " # "swh:1:cnt:82714d7648eb4f6cda2ed88fc4768e7d05472fe6","f096063880f4d0329856d3fca51c6d8afa13af9b","LICENSE.txt # " # fmt: off ( Command.pv(sql_res.name) | Command.egrep('^[^,]*,[^,]*,[^,]*$') | Command.zstdmt("-") > AtomicFileSink(output_path) ).run() # fmt: on logger.info("Counting...") count = sum(1 for _ in self.iter_blobs(with_tqdm=False, unique_sha1=True)) self.blob_count_path().parent.mkdir(exist_ok=True, parents=True) with self.blob_count_path().open("wt") as fd: fd.write(f"{count}\n")
[docs] class DownloadBlobs(_BaseTask): blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter() previous_derived_datasets_path = luigi.OptionalPathParameter(default=None) parallel_downloads = luigi.IntParameter(default=10, significant=False) download_url = luigi.Parameter( default="https://softwareheritage.s3.amazonaws.com/content/{sha1}", description="""Where to download blobs from. {sha1} will be replaced by the file's SHA1 hexdigest. Alternative value: https://archive.softwareheritage.org/api/1/content/sha1:{sha1}/raw/""", significant=False, ) decompression_algo = luigi.ChoiceParameter( choices=["none", "gzip"], default="gzip", description="""The decompression algorithm to use after downloading. Defaults to 'none' to match the SWH API. Should be 'none' when downloading from archive.softwareheritage.org""", ) _session = None _session_pid = None
[docs] def requires(self) -> luigi.Task: """Returns an instance of :class:`SelectBlobs`""" return SelectBlobs( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, )
[docs] def output(self) -> List[luigi.Target]: """:file:`stats/size.txt` in ``self.derived_datasets_path / self.blob_filter``""" return [ luigi.LocalTarget(self.blob_dir()), luigi.LocalTarget(self.blob_size_path()), ]
@classmethod def _compute_sha1(cls, path: Path) -> str: with path.open("rb") as fd: h = hashlib.sha1() while True: data = fd.read(40960) if not data: break h.update(data) return h.hexdigest() def _download_blob(self, session: "Session", path: Path, sha1: str) -> int: """Returns the size in bytes.""" import shutil import time import requests while True: url = self.download_url.format(sha1=sha1) try: resp = session.get(url, stream=True) except requests.exceptions.ConnectionError: logger.exception("Failed to query %s, retrying in 10 seconds:", url) time.sleep(10) continue if resp.status_code == 429: rate_limit_reset = int(resp.headers["X-RateLimit-Reset"]) wait_seconds = max(10, rate_limit_reset - time.time()) logger.warning("Waiting timeout for %d seconds", wait_seconds) time.sleep(wait_seconds) continue elif 500 <= resp.status_code < 600: logger.warning("Got %d error, retrying in 10 seconds", resp.status_code) time.sleep(10) elif resp.status_code == 200: break elif resp.status_code == 404: logger.error("%s returned 404", url) return 0 else: msg = f"Unexpected status code: {resp.status_code}" logger.error(msg) logger.error(resp.text) raise Exception(msg) tmp_path = path.parent / f".tmp_{sha1}" if self.decompression_algo == "none": with tmp_path.open("wb") as fd: for chunk in resp.iter_content(chunk_size=40960): fd.write(chunk) elif self.decompression_algo == "gzip": import gzip if not hasattr(gzip, "BadGzipFile"): # Python < 3.8 BadGzipFile = OSError else: BadGzipFile = gzip.BadGzipFile try: with gzip.open(resp.raw, "rb") as compressed_fd: with tmp_path.open("wb") as decompressed_fd: shutil.copyfileobj(compressed_fd, decompressed_fd) except BadGzipFile as e: if e.args[0] == r"Not a gzipped file (b'\x00\x00')": # WTF? https://gitlab.softwareheritage.org/swh/meta/-/issues/5034 print(f"{sha1} has null bytes instead of magic value") return 0 else: raise else: assert False, f"Unexpected decompression algo: {self.decompression_algo}" if self._compute_sha1(tmp_path) != sha1: if tmp_path.stat().st_size == 0 and sha1 != EMPTY_FILE_SHA1: msg = f"Blob downloaded to {tmp_path} is empty" else: msg = f"Blob downloaded to {tmp_path} does not match its checksum" logger.error(msg) raise Exception(msg) # Atomically write to destination tmp_path.rename(path) return path.stat().st_size def _download_blob_if_missing(self, session: "Session", sha1: str) -> int: """Returns the size in bytes.""" assert set(sha1) <= set("0123456789abcdef"), sha1 assert len(sha1) == 40, sha1 (sharded_path, unsharded_path) = self.blob_paths(sha1) for path in (sharded_path, unsharded_path): if path.exists(): if self._compute_sha1(path) != sha1: msg = f"Existing blob at {path} does not match its checksum" logger.error(msg) raise Exception(msg) logger.debug("Skipping %s, already exists", sha1) return path.stat().st_size else: logger.debug("Downloading %s", sha1) return self._download_blob(session, sharded_path, sha1)
[docs] def run(self) -> None: """Reads file SHA1 hashes from :file:`blobs.csv.zst` and downloads them to :file:`blobs/`.""" import multiprocessing import multiprocessing.dummy import tqdm from ..shell import Command # Create sharded directories for the blobs for i in range(256): for j in range(256): (self.blob_dir() / f"{i:02x}" / f"{j:02x}").mkdir( exist_ok=True, parents=True ) previous_blob_tarball_path = self.previous_blob_tarball_path() if previous_blob_tarball_path: # reuse blobs from a previous version of the dataset, so we don't have # to download them one by one print(f"Unpacking previous blobs from {previous_blob_tarball_path}") # fmt: off ( Command.pv(previous_blob_tarball_path) | Command.zstdcat() | Command.tar("-x", "-C", self.blob_dir().parent) # tar root is blobs/ ).run() # fmt: on print("Done unpacking") total_size = 0 # Use a thread pool (more efficient because no IPC) if there is no compression # (because then it's IO bound), and a process pool when there is (it would # be single-thread CPU bound otherwise) Pool = multiprocessing.Pool if self.decompression_algo == "none": Pool = multiprocessing.dummy.Pool # type: ignore[assignment] with Pool(self.parallel_downloads) as pool: for size in tqdm.tqdm( pool.imap_unordered( self._worker, self.iter_blobs(unique_sha1=True, with_tqdm=False), chunksize=100, ), total=self.blob_count(), ): total_size += size with self.blob_size_path().open("wt") as fd: fd.write(f"{total_size}\n")
[docs] def session(self): if self._session_pid != os.getpid(): # we forked; create a new session for this process import requests self._session_pid = os.getpid() self._session = requests.Session() self._session.headers["User-Agent"] = ( f"SWH {self.blob_filter} Dataset blob downloader" ) auth_token = os.environ.get("SWH_AUTH_TOKEN") if auth_token: self._session.headers["Authorization"] = f"Bearer {auth_token}" return self._session
def _worker(self, args): (swhid, sha1, name) = args return self._download_blob_if_missing(self.session(), sha1)
[docs] class MakeBlobTarball(_BaseTask): blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter()
[docs] def requires(self) -> luigi.Task: """Returns an instance of :class:`DownloadBlobs`""" return DownloadBlobs( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, )
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs.tar.zst` in ``self.derived_datasets_path / self.blob_filter``""" return [luigi.LocalTarget(self.blob_tarball_path())]
[docs] def run(self) -> None: """Run task.""" from ..shell import AtomicFileSink, Command approx_tarball_size = ( self.blob_size() # the content itself + 512 * self.blob_count() # assuming one header per file ) cwd = self.derived_datasets_path / self.blob_filter # fmt: off ( Command.tar("-c", "--sort=name", "blobs/", cwd=cwd) | Command.pv("--size", str(approx_tarball_size)) | Command.zstdmt(f"-{COMPRESS_LEVEL}") > AtomicFileSink(self.blob_tarball_path()) ).run()
# fmt: on
[docs] class MakeSampleBlobTarball(_BaseTask): blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter()
[docs] def requires(self) -> luigi.Task: """Returns an instance of :class:`DownloadBlobs`""" return DownloadBlobs( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, )
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs.tar.zst` in ``self.derived_datasets_path / self.blob_filter``""" return [luigi.LocalTarget(self.sample_blob_tarball_path())]
[docs] def run(self) -> None: """Selects a sample of 20k random blobs and puts them in a tarball.""" from ..shell import AtomicFileSink, Command cwd = self.derived_datasets_path / self.blob_filter # fmt: off ( Command.zstdcat(self.blob_list_path()) | Command.tail("-n", "+2") | Command.cut("-d", ",", "-f", "2") | Command.uniq() | Command.shuf("--head-count=20000") | Command.sort() | Command.sed(r"s#\(\(..\)\(..\)\(..*\)\)#blobs/\2/\3/\1#") | Command.tar( "-c", "--files-from=/dev/stdin", "--transform=s/^blobs/blobs-sample20k/", cwd=cwd ) | Command.zstdmt(f"-{COMPRESS_LEVEL}") > AtomicFileSink(self.sample_blob_tarball_path()) ).run()
# fmt: on
[docs] class ComputeBlobFileinfo(_BaseTask): blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter() CSV_HEADER = ( "swhid", "mime_type", "encoding", "line_count", "word_count", "size", ) READABLE_ENCODINGS = ("us-ascii", "utf-8", "iso-8859-1")
[docs] def requires(self) -> luigi.Task: """Returns an instance of :class:`DownloadBlobs`""" return DownloadBlobs( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, )
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs-fileinfo.csv.zst` in ``self.derived_datasets_path / self.blob_filter``""" return [ luigi.LocalTarget( self.derived_datasets_path / self.blob_filter / "blobs-fileinfo.csv.zst" ) ]
[docs] def run(self) -> None: """Run task.""" import multiprocessing import tqdm with atomic_csv_zstd_writer(self.output()[0].path) as writer: writer.writerow(self.CSV_HEADER) with multiprocessing.Pool() as pool: # imap instead of imap_unordered, to preserve sha1 order of blobs for row in tqdm.tqdm( pool.imap( self._analyze_blob, self.iter_blobs(unique_sha1=True, with_tqdm=False), ), total=self.blob_count(), ): writer.writerow(row)
def _analyze_blob(self, row) -> Tuple[str, str, str, str, str, str]: (swhid, sha1, name) = row (path, _) = self.blob_paths(sha1) assert path.exists(), f"{path} does not exist" size = path.stat().st_size mime_type, encoding = _guess_mime(str(path)) line_count, word_count = None, None if mime_type == "text/plain" and encoding in self.READABLE_ENCODINGS: line_count = 0 word_count = 0 try: with open(path, encoding="utf8") as f: for line in f: line_count += 1 word_count += len(line.rstrip().split()) except UnicodeDecodeError: line_count = None word_count = None return ( swhid, mime_type, encoding, str(line_count) if line_count is not None else "", str(word_count) if word_count is not None else "", str(size), # byte count )
[docs] class BlobScancode(_BaseTask): """Runs scancode-toolkit on the blob dataset""" blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter() FIELDNAMES = [ "swhid", "license", "score", ] DEFAULT_MIN_SCORE = 0 DEFAULT_JOBS = 1 DEFAULT_TIMEOUT = 120 MAP_CHUNKSIZE = 1 WORKER_MAX_TASKS = 1000 # to workaround Scancode get_licenses() memory leaks FIELD_SEP = "," READABLE_ENCODINGS = ("us-ascii", "utf-8", "iso-8859-1") def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.mime_guesser: Optional[magic.Magic] = None # set in child processes
[docs] def requires(self) -> luigi.Task: """Returns an instance of :class:`DownloadBlobs`""" return DownloadBlobs( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, )
def _csv_path(self) -> Path: return self.derived_datasets_path / self.blob_filter / "blobs-scancode.csv.zst" def _json_path(self) -> Path: return ( self.derived_datasets_path / self.blob_filter / "blobs-scancode.ndjson.zst" )
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs-scancode.csv.zst` and :file:`blobs-scancode.ndjson.zst` in ``self.derived_datasets_path / self.blob_filter``""" return [ luigi.LocalTarget(self._csv_path()), luigi.LocalTarget(self._json_path()), ]
def _detect_licenses(self, row) -> Tuple[Set[Tuple[str, str, float]], str]: import json import time # needs to be initialized before importing scancode: # https://github.com/nexB/scancode-plugins/issues/30 _init_mime_guesser() from scancode.api import get_copyrights, get_licenses (swhid, sha1, name) = row (path, _) = self.blob_paths(sha1) assert path.exists(), f"{path} does not exist" mime_type, encoding = _guess_mime(str(path)) license_rows = set() res: Dict[str, Any] = {"swhid": swhid} if mime_type == "text/plain" and encoding in self.READABLE_ENCODINGS: deadline = time.time() + self.DEFAULT_TIMEOUT res["licenses"] = get_licenses( str(path), min_score=self.DEFAULT_MIN_SCORE, deadline=deadline ) license_rows = { ( swhid, lic["spdx_license_key"], lic["score"], ) for lic in res["licenses"]["licenses"] } res["copyrights"] = get_copyrights(str(path)) return (license_rows, json.dumps(res))
[docs] def run(self) -> None: """Detect license(s) of license blobs located under blob_dir using scancode. Save frequencies to csv_outfile in a 3-column (sha1, license, score) CSV format. """ import csv import multiprocessing import multiprocessing.pool import tqdm # ensure clean slate if self._csv_path().exists(): self._csv_path().unlink() if self._json_path().exists(): self._json_path().unlink() context = multiprocessing.get_context(method="spawn") with ( atomic_zstd_writer(self._csv_path()) as csvfile, atomic_zstd_writer(self._json_path()) as jsonfile, ): csv_writer = csv.writer(csvfile, delimiter=self.FIELD_SEP) csv_writer.writerow(self.FIELDNAMES) with multiprocessing.pool.Pool( maxtasksperchild=self.WORKER_MAX_TASKS, context=context, ) as pool: for license_rows, results in tqdm.tqdm( pool.imap_unordered( self._detect_licenses, self.iter_blobs(unique_sha1=True, with_tqdm=False), chunksize=self.MAP_CHUNKSIZE, ), total=self.blob_count(), ): # each detect() call can return multiple licenses, flatten them for sha1, license, score in license_rows: csv_writer.writerow([sha1, license, str(score)]) assert "\n" not in results jsonfile.write(results + "\n") print("Done")
[docs] class FindBlobOrigins(_ConcurrentCsvWritingTask): previous_derived_datasets_path = luigi.OptionalPathParameter(default=None)
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs.tar.zst` in ``self.derived_datasets_path / self.blob_filter``""" return [ luigi.LocalTarget( self.derived_datasets_path / self.blob_filter / "blobs-origins.csv.zst" ) ]
CSV_HEADER = ("swhid", "origin_url")
[docs] async def process_one(self, row: Tuple[str, str, str]) -> Tuple[str, str]: from google.protobuf.field_mask_pb2 import FieldMask import swh.graph.grpc.swhgraph_pb2 as swhgraph (swhid, sha1, name) = row if not swhid.startswith("swh:1:"): raise ValueError(f"Invalid SWHID: {swhid}") # If we are running incrementally, skip the request origin_url = self.existing_swhids.get(swhid) if not origin_url: response = self.stub.Traverse( swhgraph.TraversalRequest( src=[swhid], direction=swhgraph.GraphDirection.BACKWARD, mask=FieldMask(paths=["swhid", "ori.url"]), return_nodes=swhgraph.NodeFilter( types="ori", # return only origins... ), max_matching_nodes=1, # return at most one ) ) async for item in response: origin_url = item.ori.url if origin_url: break else: print(f"{item.swhid} does not have an associated URL") else: # no origin found origin_url = "" assert origin_url is not None return (swhid, origin_url)
[docs] def run(self) -> None: import pyzstd self.existing_swhids: Dict[str, str] = {} if self.previous_derived_datasets_path: separator = "," # or "\t" for datasets before 2022-12-07 # reuse blobs from a previous version of the dataset, so we don't have # to recompute them all path = ( self.previous_derived_datasets_path / self.blob_filter / "blobs-origins.csv.zst" ) with pyzstd.open(path, "rt") as fd: self.existing_swhids = dict( line.strip().split(separator) for line in cast(Iterable[str], fd) if line[-2] != separator ) super().run()
[docs] class CountBlobOrigins(_ConcurrentCsvWritingTask): CSV_HEADER = ("swhid", "origin_count")
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs.tar.zst` in ``self.derived_datasets_path / self.blob_filter``""" return [ luigi.LocalTarget( self.derived_datasets_path / self.blob_filter / "blobs-nb-origins.csv.zst" ) ]
[docs] async def process_one(self, row: Tuple[str, str, str]) -> Tuple[str, str]: from google.protobuf.field_mask_pb2 import FieldMask import swh.graph.grpc.swhgraph_pb2 as swhgraph (swhid, sha1, name) = row if not swhid.startswith("swh:1:"): raise ValueError(f"Invalid SWHID: {swhid}") response = await self.stub.CountNodes( swhgraph.TraversalRequest( src=[swhid], direction=swhgraph.GraphDirection.BACKWARD, mask=FieldMask(paths=["url"]), return_nodes=swhgraph.NodeFilter( types="ori", # count only origins ), ) ) return (swhid, response.count)
[docs] class FindEarliestRevisions(_BaseTask): blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter() local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph")
[docs] def requires(self) -> luigi.Task: """Returns an instance of :class:`SelectBlobs`""" return SelectBlobs( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, )
[docs] def output(self) -> List[luigi.Target]: """:file:`blobs-earliest.csv.zst` in ``self.derived_datasets_path / self.blob_filter``""" return [ luigi.LocalTarget( self.derived_datasets_path / self.blob_filter / "blobs-earliest.csv.zst" ) ]
[docs] def run(self) -> None: """Run task.""" from ..shell import AtomicFileSink, Command, Rust # fmt: off ( Command.zstdcat(self.blob_list_path()) | Command.sed("s/,.*//") | Command.uniq() | Rust( "find-earliest-revision", self.local_graph_path / self.graph_name, ) | Command.pv("--wait", "--line-mode", "--size", str(self.blob_count())) | Command.zstdmt(f"-{COMPRESS_LEVEL}") > AtomicFileSink(self.output()[0]) ).run()
# fmt: on
[docs] class RunBlobDataset(luigi.Task): """Runs all tasks to build a blob dataset with the given filter.""" blob_filter = luigi.ChoiceParameter(choices=list(SELECTION_QUERIES)) derived_datasets_path = luigi.PathParameter()
[docs] def requires(self) -> List[luigi.Task]: """Returns a list of task such that every task in this module are transitively depended on.""" kwargs = dict( blob_filter=self.blob_filter, derived_datasets_path=self.derived_datasets_path, ) tasks = [ DownloadBlobs(**kwargs), MakeBlobTarball(**kwargs), MakeSampleBlobTarball(**kwargs), FindBlobOrigins(**kwargs), CountBlobOrigins(**kwargs), FindEarliestRevisions(**kwargs), ComputeBlobFileinfo(**kwargs), ] if self.blob_filter in ("citation", "readme", "known_swhids"): pass elif self.blob_filter == "license": tasks.append(BlobScancode(**kwargs)) else: raise ValueError(f"Unexpected blob filter: {self.blob_filter}") return tasks
[docs] def complete(self) -> bool: """Always returns False; status is checked by dependencies.""" return False
[docs] def run(self): """Checks all files are in the correct format and contain a well-known SWHID""" dir_path = self.derived_datasets_path / self.blob_filter for csv_path in dir_path.glob("*.csv.zst"): check_csv(csv_path) if self.blob_filter == "license": # Text of the GPLv3 swhid = "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2" expected_fileinfo = f"{swhid},text/plain,us-ascii,674,5644,35147\n" min_expected_origins = 2_000_000 elif self.blob_filter == "citation": # Codemeta's own codemeta.json swhid = "swh:1:cnt:6daebd857f6f6a98dd9288ef7b942283f7fa4f0e" expected_fileinfo = f"{swhid},application/json,us-ascii,,,7173\n" min_expected_origins = 5 else: assert False, f"Unexpected blob filter: {self.blob_filter}" self._check_fileinfo( swhid, expected_fileinfo, dir_path / "blobs-fileinfo.csv.zst" ) if self.blob_filter == "license": self._check_scancode(swhid, dir_path) self._check_nb_origins( swhid, min_expected_origins, dir_path / "blobs-nb-origins.csv.zst" ) self._check_exactly_one_line(swhid, dir_path / "blobs-origins.csv.zst") self._check_exactly_one_line(swhid, dir_path / "blobs-earliest.csv.zst")
def _check_fileinfo(self, swhid: str, expected_fileinfo: str, path: Path) -> None: from ..shell import Command, Sink # fmt: off results = ( Command.zstdcat(path) | Command.grep("^" + swhid) > Sink() ).run() # fmt: on assert results.decode() == expected_fileinfo, f"Unexpected fileinfo for {swhid}" def _check_scancode(self, swhid: str, dir_path: Path) -> None: import json from ..shell import Command, Sink assert swhid == "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2" # fmt: off csv_results = ( Command.zstdcat(dir_path / "blobs-scancode.csv.zst") | Command.grep("^" + swhid) > Sink() ).run().decode() # fmt: on assert csv_results == ( f"{swhid},GPL-3.0-only,100.0\r\n" ), f"Unexpected scancode CSV for {swhid}: {csv_results!r}" # fmt: off json_results = ( Command.zstdcat(dir_path / "blobs-scancode.ndjson.zst") | Command.grep(swhid) > Sink() ).run().decode() # fmt: on assert ( json_results.count("\n") == 1 ), f"Unexpectedly number of results for {swhid} in scancode NDJSON:\n{json_results}" result = json.loads(json_results) assert result["swhid"] == swhid, result licenses = [license for license in result["licenses"]["licenses"]] assert ( len(licenses) == 1 ), f"Unexpected number of licenses for {swhid}: {licenses}" assert licenses[0]["key"] == "gpl-3.0" assert licenses[0]["score"] == 100.0 def _check_exactly_one_line(self, swhid: str, path: Path) -> None: from ..shell import Command, Sink # fmt: off results = ( Command.zstdcat(path) | Command.grep("^" + swhid) > Sink() ).run() # fmt: on assert ( results.count(b"\n") == 1 ), f"Unexpected number of line for {swhid} in {path}:\n{results}" def _check_nb_origins( self, swhid: str, min_expected_origins: int, path: Path ) -> None: from ..shell import Command, Sink # fmt: off results = ( Command.zstdcat(path) | Command.grep("^" + swhid) > Sink() ).run().decode() # fmt: on assert ( results.count("\n") == 1 ), f"Unexpected number of origin counts for {swhid}:\n{results}" count = int(results.split("\n")[0].split(",")[1]) assert ( min_expected_origins <= count <= 1_000_000_000 ), f"Unexpected number of origins for {swhid}: {count}"