swh.graph.luigi.subdataset module#

class swh.graph.luigi.subdataset.SelectTopGithubOrigins(*args, **kwargs)[source]#

Bases: Task

Writes a list of origins selected from popular Github repositories

local_export_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
num_origins#

Parameter whose value is an int.

query#

Parameter whose value is a str.

output() LocalTarget[source]#

Text file with a list of origin URLs

run() None[source]#

Sends a query to the Github API to get a list of origins

class swh.graph.luigi.subdataset.SubdatasetOriginsFromFile(*args, **kwargs)[source]#

Bases: Task

Reads a list of origins from a local file, computed externally to Luigi.

local_export_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
path#

Parameter whose value is a str.

output() LocalTarget[source]#

Text file with a list of origin URLs

run() None[source]#

Does nothing

class swh.graph.luigi.subdataset.ListSwhidsForSubdataset(*args, **kwargs)[source]#

Bases: Task

Lists all SWHIDs reachable from a set of origins

select_task#
A parameter which takes two values:
  1. an instance of Iterable and

  2. the class of the variables to convert to.

In the task definition, use

class MyTask(luigi.Task):
    my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)

At the command line, use

$ luigi --module my_tasks MyTask --my-param 0.1

Consider using EnumParameter for a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.

local_export_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
grpc_api#

Parameter whose value is a str.

requires() Task[source]#

Returns an instance of self.select_task

output() LocalTarget[source]#

Text file with a list of SWHIDs

run() None[source]#

Builds the list

class swh.graph.luigi.subdataset.CreateSubdatasetOnAthena(*args, **kwargs)[source]#

Bases: Task

Generates an ORC export from an existing ORC export, filtering out SWHIDs not in the given list.

local_export_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
s3_parent_export_path#

A parameter that strip trailing slashes

s3_export_path#

A parameter that strip trailing slashes

s3_athena_output_location#

A parameter that strip trailing slashes

athena_db_name#

Parameter whose value is a str.

athena_parent_db_name#

Parameter whose value is a str.

object_types#

A parameter whose value is a comma-separated list of Enum. Values should come from the same enum.

Values are taken to be a list, i.e. order is preserved, duplicates may occur, and empty list is possible.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumListParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda,Volvo
requires() Dict[str, Task][source]#

Returns an instance of ListSwhidsForSubdataset and one of CreateAthena

output() Dict[str, Target][source]#

Returns the S3 location and Athena database for the subdataset

run() None[source]#

Runs a query on Athena, producing files on S3