swh.datasets.luigi.blobs_datasets module#

Luigi tasks for blob-centric datasets#

This module contains Luigi tasks driving the creation of derived datasets centered around a subset of content objects in the graph. Currently, this means:

File layout#

This assumes a local compressed graph (from swh.graph.luigi.compressed_graph) is present, and generates/manipulates the following files:

base_dir/
    <date>[_<flavor>]/
        citation-blobs/
            blobs-earliest.csv.zst
            blobs-fileinfo.csv.zst
            blobs-nb-origins.csv.zst
            blobs-origins.csv.zst
            blobs-sample20k.tar.zst
            blobs.tar.zst
            import-dataset.sql
            license-blobs.csv.zst
        license-blobs/
            <same as above, plus these two:>
            blobs-scancode.csv.zst
            blobs-scancode.ndjson.zst
swh.datasets.luigi.blobs_datasets.atomic_zstd_writer(result_path: Path)[source]#

Returns a file-like object, which writes to a temporary file, then atomically renames it to the result_path on success.

swh.datasets.luigi.blobs_datasets.atomic_csv_zstd_writer(result_path: Path)[source]#

Returns a csv.writer object, which writes to a temporary file, then atomically renames it to the result_path on success.

swh.datasets.luigi.blobs_datasets.check_csv(csv_path: Path) None[source]#
class swh.datasets.luigi.blobs_datasets.SelectBlobs(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

local_export_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
derived_datasets_path: Path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
known_swhids_csv#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

requires() List[Task][source]#

Returns an instance of LocalExport

output() List[Target][source]#

blobs.csv.zst and stats/count.txt in self.derived_datasets_path / self.blob_filter

run() None[source]#

Runs an Athena query to get the list of blobs and writes it to blobs.csv.zst.

previous_derived_datasets_path: Path | None#
class swh.datasets.luigi.blobs_datasets.DownloadBlobs(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

derived_datasets_path: Path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
previous_derived_datasets_path: Path | None#

Class to parse optional path parameters.

parallel_downloads#

Parameter whose value is an int.

download_url#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

decompression_algo#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

requires() Task[source]#

Returns an instance of SelectBlobs

output() List[Target][source]#

stats/size.txt in self.derived_datasets_path / self.blob_filter

run() None[source]#

Reads file SHA1 hashes from blobs.csv.zst and downloads them to blobs/.

session()[source]#
class swh.datasets.luigi.blobs_datasets.MakeBlobTarball(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

derived_datasets_path: Path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Run task.

previous_derived_datasets_path: Path | None#
class swh.datasets.luigi.blobs_datasets.MakeSampleBlobTarball(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

derived_datasets_path: Path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Selects a sample of 20k random blobs and puts them in a tarball.

previous_derived_datasets_path: Path | None#
class swh.datasets.luigi.blobs_datasets.ComputeBlobFileinfo(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

derived_datasets_path: Path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
CSV_HEADER = ('swhid', 'mime_type', 'encoding', 'line_count', 'word_count', 'size')#
READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[LocalTarget][source]#

blobs-fileinfo.csv.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Run task.

previous_derived_datasets_path: Path | None#
class swh.datasets.luigi.blobs_datasets.BlobScancode(*args, **kwargs)[source]#

Bases: _BaseTask

Runs scancode-toolkit on the blob dataset

blob_filter: str#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

derived_datasets_path: Path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
FIELDNAMES = ['swhid', 'license', 'score']#
DEFAULT_MIN_SCORE = 0#
DEFAULT_JOBS = 1#
DEFAULT_TIMEOUT = 120#
MAP_CHUNKSIZE = 1#
WORKER_MAX_TASKS = 1000#
FIELD_SEP = ','#
READABLE_ENCODINGS = ('us-ascii', 'utf-8', 'iso-8859-1')#
requires() Task[source]#

Returns an instance of DownloadBlobs

output() List[Target][source]#

blobs-scancode.csv.zst and blobs-scancode.ndjson.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Detect license(s) of license blobs located under blob_dir using scancode.

Save frequencies to csv_outfile in a 3-column (sha1, license, score) CSV format.

previous_derived_datasets_path: Path | None#
class swh.datasets.luigi.blobs_datasets.FindBlobOrigins(*args, **kwargs)[source]#

Bases: _ConcurrentCsvWritingTask

previous_derived_datasets_path: Path | None#

Class to parse optional path parameters.

output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

CSV_HEADER: Tuple[str, str] = ('swhid', 'origin_url')#
EXCLUDE_MISSING_SHA1: bool = False#
async process_one(row: Tuple[str, str, str]) Tuple[str, str][source]#
run() None[source]#

Calls the process_one() function, and writes its results as a two-column CSV to the target defined by output().

stub: TraversalServiceStub#
class swh.datasets.luigi.blobs_datasets.CountBlobOrigins(*args, **kwargs)[source]#

Bases: _ConcurrentCsvWritingTask

CSV_HEADER: Tuple[str, str] = ('swhid', 'origin_count')#
EXCLUDE_MISSING_SHA1: bool = False#
output() List[Target][source]#

blobs.tar.zst in self.derived_datasets_path / self.blob_filter

async process_one(row: Tuple[str, str, str]) Tuple[str, str][source]#
stub: TraversalServiceStub#
class swh.datasets.luigi.blobs_datasets.FindEarliestRevisions(*args, **kwargs)[source]#

Bases: _BaseTask

blob_filter: str#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

derived_datasets_path: Path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
local_graph_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
graph_name#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

requires() Task[source]#

Returns an instance of SelectBlobs

output() LocalTarget[source]#

blobs-earliest.csv.zst in self.derived_datasets_path / self.blob_filter

run() None[source]#

Run task.

previous_derived_datasets_path: Path | None#
class swh.datasets.luigi.blobs_datasets.RunBlobDataset(*args, **kwargs)[source]#

Bases: Task

Runs all tasks to build a blob dataset with the given filter.

blob_filter#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

derived_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
requires() Sequence[Task][source]#

Returns a list of task such that every task in this module are transitively depended on.

complete() bool[source]#

Always returns False; status is checked by dependencies.

run()[source]#

Checks all files are in the correct format and contain a well-known SWHID