swh.datasets.luigi.aggregate_datasets module#
Luigi tasks for producing the aggregated derived datasets#
- class swh.datasets.luigi.aggregate_datasets.ExportNodesTable(*args, **kwargs)[source]#
Bases:
TaskCreates a Parquet dataset that contains the id and SWHID of each node
- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- aggregate_datasets_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- class swh.datasets.luigi.aggregate_datasets.AggregateContentDatasets(*args, **kwargs)[source]#
Bases:
TaskCreates a Parquet dataset that contains a column for each of:
the content id
the content’s length
the most popular name of each content
number of occurrences of that name for the content
its date of first occurrence in a revision or release, if any
said revision or release, if any
an origin containing said revision or release, if any
- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- popular_content_names_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- provenance_dir#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- aggregate_datasets_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- class swh.datasets.luigi.aggregate_datasets.UploadNodesTable(*args, **kwargs)[source]#
Bases:
_ParquetToS3ToAthenaTaskUploads the result of
AggregateContentDatasetsto S3 and registers a table on Athena to query it- aggregate_datasets_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- dataset_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- s3_athena_output_location#
A parameter that strip trailing slashes
- requires() Task[source]#
Returns an instance of
ExportNodesTable.
- class swh.datasets.luigi.aggregate_datasets.UploadAggregatedContentDataset(*args, **kwargs)[source]#
Bases:
_ParquetToS3ToAthenaTaskUploads the result of
AggregateContentDatasetsto S3 and registers a table on Athena to query it- aggregate_datasets_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- dataset_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- s3_athena_output_location#
A parameter that strip trailing slashes
- requires() Task[source]#
Returns an instance of
AggregateContentDatasets.
- class swh.datasets.luigi.aggregate_datasets.RunAggregatedDatasets(*args, **kwargs)[source]#
Bases:
WrapperTaskRuns
UploadNodesTable,UploadAggregatedContentDataset, and their recursive dependencies.- aggregate_datasets_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- dataset_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- s3_athena_output_location#
A parameter that strip trailing slashes
- requires() Dict[str, Task][source]#
Returns an instance of
AggregateContentDatasets.