Source code for swh.datasets.luigi.specific_languages_datasets
# Copyright (C) 2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import luigi
from swh.datasets.luigi.aggregate_datasets import (
AggregateContentDatasets,
ExportNodesTable,
)
logger = logging.getLogger(__name__)
class _DigestMap(luigi.ExternalTask):
"""Represents the digestmap file produced externally."""
digestmap_path = luigi.PathParameter()
def output(self) -> luigi.LocalTarget:
return luigi.LocalTarget(self.digestmap_path)
[docs]
class ExtractFileExtension(luigi.Task):
"""Filters the contents parquet by file extension.
Reads the contents parquet produced by :class:`AggregateContentDatasets`,
extracts the file extension from each filename, and writes a filtered
parquet directory containing only rows whose extension is in
``extensions``.
Input columns expected: ``id``, ``filename`` (plus any others, which are
dropped).
Output columns: ``id``, ``filename``, ``extension``.
"""
aggregate_datasets_path = luigi.PathParameter()
specific_languages_path = luigi.PathParameter()
extensions = luigi.StrParameter(
description=(
"Comma-separated file extensions to keep, without leading dot "
"(e.g. ``py,v``)."
)
)
[docs]
def requires(self) -> luigi.Task:
"""Returns an instance of :class:`AggregateContentDatasets`."""
return AggregateContentDatasets(
aggregate_datasets_path=self.aggregate_datasets_path
)
[docs]
def output(self) -> luigi.LocalTarget:
"""Directory of Parquet files with columns ``id``, ``filename``, ``extension``."""
return luigi.LocalTarget(self.specific_languages_path / "filtered_contents")
[docs]
def run(self) -> None:
"""Filters the contents parquet by extension and writes the result."""
import pathlib
import shutil
import tempfile
import datafusion
from datafusion import col, lit
import datafusion.functions as f
import pyarrow as pa
import pyarrow.parquet as pq
output_path = pathlib.Path(self.output().path)
ctx = datafusion.SessionContext()
df = (
ctx.read_parquet(self.input().path)
.with_column("filename", col("filename").cast(pa.utf8()))
.with_column(
"extension",
f.array_element(
f.regexp_match(col("filename"), lit(r"\.([^.]+)$")),
lit(1),
),
)
.filter(
f.in_list(
col("extension"), [lit(e) for e in self.extensions.split(",")]
)
)
)
with tempfile.TemporaryDirectory(dir=output_path.parent) as tmp:
df.select(col("id"), col("filename"), col("extension")).write_parquet(tmp)
count = sum(
pq.read_metadata(p).num_rows
for p in pathlib.Path(tmp).rglob("*.parquet")
)
if count == 0:
raise ValueError(
f"ExtractFileExtension: no rows matched extensions "
f"{self.extensions!r} in {self.input().path}"
)
shutil.move(tmp, output_path)
logger.info(
"ExtractFileExtension: wrote %d rows to %s",
count,
output_path,
)
[docs]
class JoinFilteredContentsWithNodes(luigi.Task):
"""Joins the filtered contents parquet with the nodes parquet to obtain SWHIDs,
then resolves each SWHID to a SHA1 hash using ``swh.digestmap``.
Reads the output of :class:`ExtractFileExtension` and the nodes parquet
produced by :class:`ExportNodesTable`, joins them on ``id``, resolves
SWHIDs to SHA1 via :class:`swh.digestmap.DigestMap`, and writes a single
parquet directory.
Output columns: ``id``, ``filename``, ``extension``, ``swhid``, ``sha1``.
"""
aggregate_datasets_path = luigi.PathParameter()
specific_languages_path = luigi.PathParameter()
digestmap_path = luigi.PathParameter()
extensions = luigi.StrParameter(
description=(
"Comma-separated file extensions to keep, without leading dot "
"(e.g. ``py,v``)."
)
)
[docs]
def requires(self):
"""Returns instances of :class:`ExtractFileExtension`, :class:`ExportNodesTable`,
and :class:`_DigestMap`."""
return {
"contents": ExtractFileExtension(
aggregate_datasets_path=self.aggregate_datasets_path,
specific_languages_path=self.specific_languages_path,
extensions=self.extensions,
),
"nodes": ExportNodesTable(
aggregate_datasets_path=self.aggregate_datasets_path,
),
"digestmap": _DigestMap(digestmap_path=self.digestmap_path),
}
[docs]
def output(self) -> luigi.LocalTarget:
"""Directory of Parquet files with columns
``id``, ``filename``, ``extension``, ``swhid``, ``sha1``.
"""
return luigi.LocalTarget(self.specific_languages_path / "contents_with_sha1")
[docs]
def run(self) -> None:
"""Joins filtered contents with nodes, resolves SWHIDs to SHA1 via digestmap,
and writes the result."""
import pathlib
import shutil
import tempfile
import datafusion
from datafusion import col, udf
import pyarrow as pa
import pyarrow.parquet as pq
from swh.digestmap import DigestMap
dm = DigestMap(self.digestmap_path)
resolve_udf = udf(
dm.sha1_from_swhid_arrow, # type: ignore[type-var]
# https://github.com/apache/datafusion-python/issues/1516
[pa.large_utf8()],
pa.large_utf8(),
volatility="volatile",
)
output_path = pathlib.Path(self.output().path)
ctx = datafusion.SessionContext()
df_contents = ctx.read_parquet(self.input()["contents"].path)
df_nodes = ctx.read_parquet(self.input()["nodes"].path)
df_result = (
df_contents.join(
df_nodes.select(col("id").alias("node_id"), col("swhid")),
join_keys=(["id"], ["node_id"]),
how="inner",
)
.select(col("id"), col("filename"), col("extension"), col("swhid"))
.with_column("sha1", resolve_udf(col("swhid")))
)
with tempfile.TemporaryDirectory(dir=output_path.parent) as tmp:
df_result.write_parquet(tmp)
count = sum(
pq.read_metadata(p).num_rows
for p in pathlib.Path(tmp).rglob("*.parquet")
)
if count == 0:
raise ValueError(
f"JoinFilteredContentsWithNodes: no rows after join for extensions "
f"{self.extensions!r}"
)
shutil.move(tmp, output_path)
logger.info(
"JoinFilteredContentsWithNodes: wrote %d rows to %s", count, output_path
)