swh.graph.luigi.blobs_datasets module#
Luigi tasks for blob-centric datasets#
This module contains Luigi tasks driving the creation of derived datasets centered around a subset of content objects in the graph. Currently, this means:
the license dataset, and
the citation dataset
File layout#
This assumes a local compressed graph (from swh.graph.luigi.compressed_graph
)
is present, and generates/manipulates the following files:
base_dir/
<date>[_<flavor>]/
citation-blobs/
blobs-earliest.csv.zst
blobs-fileinfo.csv.zst
blobs-nb-origins.csv.zst
blobs-origins.csv.zst
blobs-sample20k.tar.zst
blobs.tar.zst
import-dataset.sql
license-blobs.csv.zst
license-blobs/
<same as above, plus these two:>
blobs-scancode.csv.zst
blobs-scancode.ndjson.zst
- swh.graph.luigi.blobs_datasets.atomic_zstd_writer(result_path: Path)[source]#
Returns a file-like object, which writes to a temporary file, then atomically renames it to the
result_path
on success.
- swh.graph.luigi.blobs_datasets.atomic_csv_zstd_writer(result_path: Path)[source]#
Returns a
csv.writer
object, which writes to a temporary file, then atomically renames it to theresult_path
on success.
- class swh.graph.luigi.blobs_datasets.SelectBlobs(*args, **kwargs)[source]#
Bases:
_BaseTask
- local_export_path = <luigi.parameter.PathParameter object>#
- known_swhids_csv = <luigi.parameter.Parameter object>#
- class swh.graph.luigi.blobs_datasets.DownloadBlobs(*args, **kwargs)[source]#
Bases:
_BaseTask
- parallel_downloads = <luigi.parameter.IntParameter object>#
- download_url = <luigi.parameter.Parameter object>#
- decompression_algo = <luigi.parameter.ChoiceParameter object>#
- requires() Task [source]#
Returns an instance of
SelectBlobs
- class swh.graph.luigi.blobs_datasets.MakeBlobTarball(*args, **kwargs)[source]#
Bases:
_BaseTask
- requires() Task [source]#
Returns an instance of
DownloadBlobs
- class swh.graph.luigi.blobs_datasets.MakeSampleBlobTarball(*args, **kwargs)[source]#
Bases:
_BaseTask
- requires() Task [source]#
Returns an instance of
DownloadBlobs
- class swh.graph.luigi.blobs_datasets.ComputeBlobFileinfo(*args, **kwargs)[source]#
Bases:
_BaseTask
- CSV_HEADER = ('swhid', 'mime_type', 'encoding', 'line_count', 'word_count', 'size')#
- READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
- requires() Task [source]#
Returns an instance of
DownloadBlobs
- class swh.graph.luigi.blobs_datasets.BlobScancode(*args, **kwargs)[source]#
Bases:
_BaseTask
Runs scancode-toolkit on the blob dataset
- FIELDNAMES = ['swhid', 'license', 'score']#
- DEFAULT_MIN_SCORE = 0#
- DEFAULT_JOBS = 1#
- DEFAULT_TIMEOUT = 120#
- MAP_CHUNKSIZE = 1#
- WORKER_MAX_TASKS = 1000#
- FIELD_SEP = ','#
- READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
- requires() Task [source]#
Returns an instance of
DownloadBlobs
- output() List[Target] [source]#
blobs-scancode.csv.zst
andblobs-scancode.ndjson.zst
inself.derived_datasets_path / self.blob_filter
- class swh.graph.luigi.blobs_datasets.FindBlobOrigins(*args, **kwargs)[source]#
Bases:
_ConcurrentCsvWritingTask
- run() None [source]#
Calls the
process_one()
function, and writes its results as a two-column CSV to the target defined byoutput()
.
- stub: TraversalServiceStub#
- class swh.graph.luigi.blobs_datasets.CountBlobOrigins(*args, **kwargs)[source]#
Bases:
_ConcurrentCsvWritingTask
- stub: TraversalServiceStub#
- class swh.graph.luigi.blobs_datasets.FindEarliestRevisions(*args, **kwargs)[source]#
Bases:
_BaseTask
- local_graph_path = <luigi.parameter.PathParameter object>#
- graph_name = <luigi.parameter.Parameter object>#
- requires() Task [source]#
Returns an instance of
SelectBlobs
- class swh.graph.luigi.blobs_datasets.RunBlobDataset(*args, **kwargs)[source]#
Bases:
Task
Runs all tasks to build a blob dataset with the given filter.
- blob_filter = <luigi.parameter.ChoiceParameter object>#
- derived_datasets_path = <luigi.parameter.PathParameter object>#