swh.datasets.luigi.aggregate_datasets module#

Luigi tasks for producing the aggregated derived datasets#

class swh.datasets.luigi.aggregate_datasets.ExportNodesTable(*args, **kwargs)[source]#

Bases: Task

Creates a Parquet dataset that contains the id and SWHID of each node

local_graph_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
graph_name#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

aggregate_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
requires() Dict[str, Task][source]#

Returns an instance of LocalGraph.

output() LocalTarget[source]#

Directory of Parquet files.

run() None[source]#

Runs export-nodes from tools/aggregate

class swh.datasets.luigi.aggregate_datasets.AggregateContentDatasets(*args, **kwargs)[source]#

Bases: Task

Creates a Parquet dataset that contains a column for each of:

  • the content id

  • the content’s length

  • the most popular name of each content

  • number of occurrences of that name for the content

  • its date of first occurrence in a revision or release, if any

  • said revision or release, if any

  • an origin containing said revision or release, if any

local_graph_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
graph_name#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

popular_content_names_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
provenance_dir#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
aggregate_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
requires() Dict[str, Task][source]#

Returns an instance of LocalGraph.

output() LocalTarget[source]#

Directory of Parquet files.

run() None[source]#

Runs aggregate-content-datasets from tools/aggregate

class swh.datasets.luigi.aggregate_datasets.UploadNodesTable(*args, **kwargs)[source]#

Bases: _ParquetToS3ToAthenaTask

Uploads the result of AggregateContentDatasets to S3 and registers a table on Athena to query it

aggregate_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
dataset_name#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

s3_athena_output_location#

A parameter that strip trailing slashes

requires() Task[source]#

Returns an instance of ExportNodesTable.

create_table_extras() str[source]#

Extra clauses to add to the CREATE EXTERNAL TABLE statement.

class swh.datasets.luigi.aggregate_datasets.UploadAggregatedContentDataset(*args, **kwargs)[source]#

Bases: _ParquetToS3ToAthenaTask

Uploads the result of AggregateContentDatasets to S3 and registers a table on Athena to query it

aggregate_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
dataset_name#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

s3_athena_output_location#

A parameter that strip trailing slashes

requires() Task[source]#

Returns an instance of AggregateContentDatasets.

class swh.datasets.luigi.aggregate_datasets.RunAggregatedDatasets(*args, **kwargs)[source]#

Bases: WrapperTask

Runs UploadNodesTable, UploadAggregatedContentDataset, and their recursive dependencies.

aggregate_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
dataset_name#

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See ParamConfigIngestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

s3_athena_output_location#

A parameter that strip trailing slashes

requires() Dict[str, Task][source]#

Returns an instance of AggregateContentDatasets.