swh.graph.luigi.aggregate_datasets module#

Luigi tasks for producing the aggregated derived datasets#

class swh.graph.luigi.aggregate_datasets.ExportNodesTable(*args, **kwargs)[source]#

Bases: Task

Creates a Parquet dataset that contains the id and SWHID of each node

local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
aggregate_datasets_path = <luigi.parameter.PathParameter object>#
requires() Dict[str, Task][source]#

Returns an instance of LocalGraph.

output() Target[source]#

Directory of Parquet files.

run() None[source]#

Runs export-nodes from tools/aggregate

class swh.graph.luigi.aggregate_datasets.AggregateContentDatasets(*args, **kwargs)[source]#

Bases: Task

Creates a Parquet dataset that contains a column for each of:

  • the content id

  • the content’s length

  • the most popular name of each content

  • number of occurrences of that name for the content

  • its date of first occurrence in a revision or release, if any

  • said revision or release, if any

  • an origin containing said revision or release, if any

local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
popular_content_names_path = <luigi.parameter.PathParameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
aggregate_datasets_path = <luigi.parameter.PathParameter object>#
requires() Dict[str, Task][source]#

Returns an instance of LocalGraph.

output() Target[source]#

Directory of Parquet files.

run() None[source]#

Runs aggregate-content-datasets from tools/aggregate

class swh.graph.luigi.aggregate_datasets.UploadNodesTable(*args, **kwargs)[source]#

Bases: _ParquetToS3ToAthenaTask

Uploads the result of AggregateContentDatasets to S3 and registers a table on Athena to query it

aggregate_datasets_path = <luigi.parameter.PathParameter object>#
dataset_name = <luigi.parameter.Parameter object>#
s3_athena_output_location = <swh.dataset.luigi.S3PathParameter object>#
requires() Task[source]#

Returns an instance of ExportNodesTable.

create_table_extras() str[source]#

Extra clauses to add to the CREATE EXTERNAL TABLE statement.

class swh.graph.luigi.aggregate_datasets.UploadAggregatedContentDataset(*args, **kwargs)[source]#

Bases: _ParquetToS3ToAthenaTask

Uploads the result of AggregateContentDatasets to S3 and registers a table on Athena to query it

aggregate_datasets_path = <luigi.parameter.PathParameter object>#
dataset_name = <luigi.parameter.Parameter object>#
s3_athena_output_location = <swh.dataset.luigi.S3PathParameter object>#
requires() Task[source]#

Returns an instance of AggregateContentDatasets.

class swh.graph.luigi.aggregate_datasets.RunAggregatedDatasets(*args, **kwargs)[source]#

Bases: WrapperTask

Runs UploadNodesTable, UploadAggregatedContentDataset, and their recursive dependencies.

aggregate_datasets_path = <luigi.parameter.PathParameter object>#
dataset_name = <luigi.parameter.Parameter object>#
s3_athena_output_location = <swh.dataset.luigi.S3PathParameter object>#
requires() Dict[str, Task][source]#

Returns an instance of AggregateContentDatasets.