swh.datasets.luigi.blobs_datasets module#
Luigi tasks for blob-centric datasets#
This module contains Luigi tasks driving the creation of derived datasets centered around a subset of content objects in the graph. Currently, this means:
the license dataset, and
the citation dataset
File layout#
This assumes a local compressed graph (from swh.graph.luigi.compressed_graph)
is present, and generates/manipulates the following files:
base_dir/
<date>[_<flavor>]/
citation-blobs/
blobs-earliest.csv.zst
blobs-fileinfo.csv.zst
blobs-nb-origins.csv.zst
blobs-origins.csv.zst
blobs-sample20k.tar.zst
blobs.tar.zst
import-dataset.sql
license-blobs.csv.zst
license-blobs/
<same as above, plus these two:>
blobs-scancode.csv.zst
blobs-scancode.ndjson.zst
- swh.datasets.luigi.blobs_datasets.atomic_zstd_writer(result_path: Path)[source]#
Returns a file-like object, which writes to a temporary file, then atomically renames it to the
result_pathon success.
- swh.datasets.luigi.blobs_datasets.atomic_csv_zstd_writer(result_path: Path)[source]#
Returns a
csv.writerobject, which writes to a temporary file, then atomically renames it to theresult_pathon success.
- class swh.datasets.luigi.blobs_datasets.SelectBlobs(*args, **kwargs)[source]#
Bases:
_BaseTask- blob_filter: str#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- local_export_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- derived_datasets_path: Path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- known_swhids_csv#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- class swh.datasets.luigi.blobs_datasets.DownloadBlobs(*args, **kwargs)[source]#
Bases:
_BaseTask- blob_filter: str#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- derived_datasets_path: Path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- parallel_downloads#
Parameter whose value is an
int.
- download_url#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- decompression_algo#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- requires() Task[source]#
Returns an instance of
SelectBlobs
- class swh.datasets.luigi.blobs_datasets.MakeBlobTarball(*args, **kwargs)[source]#
Bases:
_BaseTask- blob_filter: str#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- derived_datasets_path: Path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- requires() Task[source]#
Returns an instance of
DownloadBlobs
- class swh.datasets.luigi.blobs_datasets.MakeSampleBlobTarball(*args, **kwargs)[source]#
Bases:
_BaseTask- blob_filter: str#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- derived_datasets_path: Path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- requires() Task[source]#
Returns an instance of
DownloadBlobs
- class swh.datasets.luigi.blobs_datasets.ComputeBlobFileinfo(*args, **kwargs)[source]#
Bases:
_BaseTask- blob_filter: str#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- derived_datasets_path: Path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- CSV_HEADER = ('swhid', 'mime_type', 'encoding', 'line_count', 'word_count', 'size')#
- READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
- requires() Task[source]#
Returns an instance of
DownloadBlobs
- class swh.datasets.luigi.blobs_datasets.BlobScancode(*args, **kwargs)[source]#
Bases:
_BaseTaskRuns scancode-toolkit on the blob dataset
- blob_filter: str#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- derived_datasets_path: Path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- FIELDNAMES = ['swhid', 'license', 'score']#
- DEFAULT_MIN_SCORE = 0#
- DEFAULT_JOBS = 1#
- DEFAULT_TIMEOUT = 120#
- MAP_CHUNKSIZE = 1#
- WORKER_MAX_TASKS = 1000#
- FIELD_SEP = ','#
- READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
- requires() Task[source]#
Returns an instance of
DownloadBlobs
- output() List[Target][source]#
blobs-scancode.csv.zstandblobs-scancode.ndjson.zstinself.derived_datasets_path / self.blob_filter
- class swh.datasets.luigi.blobs_datasets.FindBlobOrigins(*args, **kwargs)[source]#
Bases:
_ConcurrentCsvWritingTask- run() None[source]#
Calls the
process_one()function, and writes its results as a two-column CSV to the target defined byoutput().
- stub: TraversalServiceStub#
- class swh.datasets.luigi.blobs_datasets.CountBlobOrigins(*args, **kwargs)[source]#
Bases:
_ConcurrentCsvWritingTask- stub: TraversalServiceStub#
- class swh.datasets.luigi.blobs_datasets.FindEarliestRevisions(*args, **kwargs)[source]#
Bases:
_BaseTask- blob_filter: str#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- derived_datasets_path: Path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- requires() Task[source]#
Returns an instance of
SelectBlobs
- class swh.datasets.luigi.blobs_datasets.RunBlobDataset(*args, **kwargs)[source]#
Bases:
TaskRuns all tasks to build a blob dataset with the given filter.
- blob_filter#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- derived_datasets_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>