swh.graph.luigi.blobs_datasets module#

Luigi tasks for blob-centric datasets#

This module contains Luigi tasks driving the creation of derived datasets centered around a subset of content objects in the graph. Currently, this means:

File layout#

This assumes a local compressed graph (from swh.graph.luigi.compressed_graph) is present, and generates/manipulates the following files:

base_dir/
    <date>[_<flavor>]/
        citation-blobs/
            blobs-earliest.csv.zst
            blobs-fileinfo.csv.zst
            blobs-nb-origins.csv.zst
            blobs-origins.csv.zst
            blobs-sample20k.tar.zst
            blobs.tar.zst
            import-dataset.sql
            license-blobs.csv.zst
        license-blobs/
            <same as above, plus these two:>
            blobs-scancode.csv.zst
            blobs-scancode.ndjson.zst
swh.graph.luigi.blobs_datasets.atomic_zstd_writer(result_path: Path)[source]#

Returns a file-like object, which writes to a temporary file, then atomically renames it to the result_path on success.

swh.graph.luigi.blobs_datasets.atomic_csv_zstd_writer(result_path: Path)[source]#

Returns a csv.writer object, which writes to a temporary file, then atomically renames it to the result_path on success.

swh.graph.luigi.blobs_datasets.check_csv(csv_path: Path) None[source]#
class swh.graph.luigi.blobs_datasets.SelectBlobs(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str = <luigi.parameter.ChoiceParameter object>#
local_export_path = <luigi.parameter.PathParameter object>#
derived_datasets_path: Path = <luigi.parameter.PathParameter object>#
known_swhids_csv = <luigi.parameter.Parameter object>#
requires() List[Task][source]#

Returns an instance of LocalExport

output() List[Target][source]#

blobs.csv.zst and stats/count.txt in self.derived_datasets_path / self.blob_filter

run() None[source]#

Runs an Athena query to get the list of blobs and writes it to blobs.csv.zst.

previous_derived_datasets_path: Path | None#
class swh.graph.luigi.blobs_datasets.DownloadBlobs(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str = <luigi.parameter.ChoiceParameter object>#
derived_datasets_path: Path = <luigi.parameter.PathParameter object>#
previous_derived_datasets_path: Path | None = <luigi.parameter.OptionalPathParameter object>#
parallel_downloads = <luigi.parameter.IntParameter object>#
download_url = <luigi.parameter.Parameter object>#
decompression_algo = <luigi.parameter.ChoiceParameter object>#
requires() Task[source]#

Returns an instance of SelectBlobs

output() List[Target][source]#

stats/size.txt in self.derived_datasets_path / self.blob_filter

run() None[source]#

Reads file SHA1 hashes from blobs.csv.zst and downloads them to blobs/.

session()[source]#
class swh.graph.luigi.blobs_datasets.MakeBlobTarball(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str = <luigi.parameter.ChoiceParameter object>#
derived_datasets_path: Path = <luigi.parameter.PathParameter object>#
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Run task.

previous_derived_datasets_path: Path | None#
class swh.graph.luigi.blobs_datasets.MakeSampleBlobTarball(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str = <luigi.parameter.ChoiceParameter object>#
derived_datasets_path: Path = <luigi.parameter.PathParameter object>#
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Selects a sample of 20k random blobs and puts them in a tarball.

previous_derived_datasets_path: Path | None#
class swh.graph.luigi.blobs_datasets.ComputeBlobFileinfo(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str = <luigi.parameter.ChoiceParameter object>#
derived_datasets_path: Path = <luigi.parameter.PathParameter object>#
CSV_HEADER = ('swhid', 'mime_type', 'encoding', 'line_count', 'word_count', 'size')#
READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[LocalTarget][source]#

blobs-fileinfo.csv.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Run task.

previous_derived_datasets_path: Path | None#
class swh.graph.luigi.blobs_datasets.BlobScancode(*args, **kwargs)[source]#

Bases: _BaseTask

Runs scancode-toolkit on the blob dataset

blob_filter: str = <luigi.parameter.ChoiceParameter object>#
derived_datasets_path: Path = <luigi.parameter.PathParameter object>#
FIELDNAMES = ['swhid', 'license', 'score']#
DEFAULT_MIN_SCORE = 0#
DEFAULT_JOBS = 1#
DEFAULT_TIMEOUT = 120#
MAP_CHUNKSIZE = 1#
WORKER_MAX_TASKS = 1000#
FIELD_SEP = ','#
READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[Target][source]#

blobs-scancode.csv.zst and blobs-scancode.ndjson.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Detect license(s) of license blobs located under blob_dir using scancode.

Save frequencies to csv_outfile in a 3-column (sha1, license, score) CSV format.

previous_derived_datasets_path: Path | None#
class swh.graph.luigi.blobs_datasets.FindBlobOrigins(*args, **kwargs)[source]#

Bases: _ConcurrentCsvWritingTask

previous_derived_datasets_path: Path | None = <luigi.parameter.OptionalPathParameter object>#
output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

CSV_HEADER: Tuple[str, str] = ('swhid', 'origin_url')#
async process_one(row: Tuple[str, str, str]) Tuple[str, str][source]#
run() None[source]#

Calls the process_one() function, and writes its results as a two-column CSV to the target defined by output().

stub: TraversalServiceStub#
class swh.graph.luigi.blobs_datasets.CountBlobOrigins(*args, **kwargs)[source]#

Bases: _ConcurrentCsvWritingTask

CSV_HEADER: Tuple[str, str] = ('swhid', 'origin_count')#
output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

async process_one(row: Tuple[str, str, str]) Tuple[str, str][source]#
stub: TraversalServiceStub#
class swh.graph.luigi.blobs_datasets.FindEarliestRevisions(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str = <luigi.parameter.ChoiceParameter object>#
derived_datasets_path: Path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
requires() Task[source]#

Returns an instance of SelectBlobs

output() LocalTarget[source]#

blobs-earliest.csv.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Run task.

previous_derived_datasets_path: Path | None#
class swh.graph.luigi.blobs_datasets.RunBlobDataset(*args, **kwargs)[source]#

Bases: Task

Runs all tasks to build a blob dataset with the given filter.

blob_filter = <luigi.parameter.ChoiceParameter object>#
derived_datasets_path = <luigi.parameter.PathParameter object>#
requires() Sequence[Task][source]#

Returns a list of task such that every task in this module are transitively depended on.

complete() bool[source]#

Always returns False; status is checked by dependencies.

run()[source]#

Checks all files are in the correct format and contain a well-known SWHID