Source code for swh.datasets.luigi.specific_languages_datasets

# Copyright (C) 2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging

import luigi

from swh.datasets.luigi.aggregate_datasets import (
    AggregateContentDatasets,
    ExportNodesTable,
)

logger = logging.getLogger(__name__)


class _DigestMap(luigi.ExternalTask):
    """Represents the digestmap file produced externally."""

    digestmap_path = luigi.PathParameter()

    def output(self) -> luigi.LocalTarget:
        return luigi.LocalTarget(self.digestmap_path)


[docs] class ExtractFileExtension(luigi.Task): """Filters the contents parquet by file extension. Reads the contents parquet produced by :class:`AggregateContentDatasets`, extracts the file extension from each filename, and writes a filtered parquet directory containing only rows whose extension is in ``extensions``. Input columns expected: ``id``, ``filename`` (plus any others, which are dropped). Output columns: ``id``, ``filename``, ``extension``. """ aggregate_datasets_path = luigi.PathParameter() specific_languages_path = luigi.PathParameter() extensions = luigi.StrParameter( description=( "Comma-separated file extensions to keep, without leading dot " "(e.g. ``py,v``)." ) )
[docs] def requires(self) -> luigi.Task: """Returns an instance of :class:`AggregateContentDatasets`.""" return AggregateContentDatasets( aggregate_datasets_path=self.aggregate_datasets_path )
[docs] def output(self) -> luigi.LocalTarget: """Directory of Parquet files with columns ``id``, ``filename``, ``extension``.""" return luigi.LocalTarget(self.specific_languages_path / "filtered_contents")
[docs] def run(self) -> None: """Filters the contents parquet by extension and writes the result.""" import pathlib import shutil import tempfile import datafusion from datafusion import col, lit import datafusion.functions as f import pyarrow as pa import pyarrow.parquet as pq output_path = pathlib.Path(self.output().path) ctx = datafusion.SessionContext() df = ( ctx.read_parquet(self.input().path) .with_column("filename", col("filename").cast(pa.utf8())) .with_column( "extension", f.array_element( f.regexp_match(col("filename"), lit(r"\.([^.]+)$")), lit(1), ), ) .filter( f.in_list( col("extension"), [lit(e) for e in self.extensions.split(",")] ) ) ) with tempfile.TemporaryDirectory(dir=output_path.parent) as tmp: df.select(col("id"), col("filename"), col("extension")).write_parquet(tmp) count = sum( pq.read_metadata(p).num_rows for p in pathlib.Path(tmp).rglob("*.parquet") ) if count == 0: raise ValueError( f"ExtractFileExtension: no rows matched extensions " f"{self.extensions!r} in {self.input().path}" ) shutil.move(tmp, output_path) logger.info( "ExtractFileExtension: wrote %d rows to %s", count, output_path, )
[docs] class JoinFilteredContentsWithNodes(luigi.Task): """Joins the filtered contents parquet with the nodes parquet to obtain SWHIDs, then resolves each SWHID to a SHA1 hash using ``swh.digestmap``. Reads the output of :class:`ExtractFileExtension` and the nodes parquet produced by :class:`ExportNodesTable`, joins them on ``id``, resolves SWHIDs to SHA1 via :class:`swh.digestmap.DigestMap`, and writes a single parquet directory. Output columns: ``id``, ``filename``, ``extension``, ``swhid``, ``sha1``. """ aggregate_datasets_path = luigi.PathParameter() specific_languages_path = luigi.PathParameter() digestmap_path = luigi.PathParameter() extensions = luigi.StrParameter( description=( "Comma-separated file extensions to keep, without leading dot " "(e.g. ``py,v``)." ) )
[docs] def requires(self): """Returns instances of :class:`ExtractFileExtension`, :class:`ExportNodesTable`, and :class:`_DigestMap`.""" return { "contents": ExtractFileExtension( aggregate_datasets_path=self.aggregate_datasets_path, specific_languages_path=self.specific_languages_path, extensions=self.extensions, ), "nodes": ExportNodesTable( aggregate_datasets_path=self.aggregate_datasets_path, ), "digestmap": _DigestMap(digestmap_path=self.digestmap_path), }
[docs] def output(self) -> luigi.LocalTarget: """Directory of Parquet files with columns ``id``, ``filename``, ``extension``, ``swhid``, ``sha1``. """ return luigi.LocalTarget(self.specific_languages_path / "contents_with_sha1")
[docs] def run(self) -> None: """Joins filtered contents with nodes, resolves SWHIDs to SHA1 via digestmap, and writes the result.""" import pathlib import shutil import tempfile import datafusion from datafusion import col, udf import pyarrow as pa import pyarrow.parquet as pq from swh.digestmap import DigestMap dm = DigestMap(self.digestmap_path) resolve_udf = udf( dm.sha1_from_swhid_arrow, # type: ignore[type-var] # https://github.com/apache/datafusion-python/issues/1516 [pa.large_utf8()], pa.large_utf8(), volatility="volatile", ) output_path = pathlib.Path(self.output().path) ctx = datafusion.SessionContext() df_contents = ctx.read_parquet(self.input()["contents"].path) df_nodes = ctx.read_parquet(self.input()["nodes"].path) df_result = ( df_contents.join( df_nodes.select(col("id").alias("node_id"), col("swhid")), join_keys=(["id"], ["node_id"]), how="inner", ) .select(col("id"), col("filename"), col("extension"), col("swhid")) .with_column("sha1", resolve_udf(col("swhid"))) ) with tempfile.TemporaryDirectory(dir=output_path.parent) as tmp: df_result.write_parquet(tmp) count = sum( pq.read_metadata(p).num_rows for p in pathlib.Path(tmp).rglob("*.parquet") ) if count == 0: raise ValueError( f"JoinFilteredContentsWithNodes: no rows after join for extensions " f"{self.extensions!r}" ) shutil.move(tmp, output_path) logger.info( "JoinFilteredContentsWithNodes: wrote %d rows to %s", count, output_path )