swh.graph.libs.luigi.topology module#

swh.graph.libs.luigi.topology.inverse_direction(direction: str) str[source]#
class swh.graph.libs.luigi.topology.TopoSort(*args, **kwargs)[source]#

Bases: Task

Creates a file that contains all SWHIDs in topological order from a compressed graph.

local_graph_path = <luigi.parameter.PathParameter object>#
topology_dir = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
object_types = <luigi.parameter.Parameter object>#
direction = <luigi.parameter.ChoiceParameter object>#
algorithm = <luigi.parameter.ChoiceParameter object>#
requires() List[Task][source]#

Returns an instance of LocalGraph.

output() LocalTarget[source]#

.csv.zst file that contains the topological order.

run() None[source]#

Runs ‘toposort’ command from tools/topology and compresses

class swh.graph.libs.luigi.topology.ComputeGenerations(*args, **kwargs)[source]#

Bases: Task

Creates a file that contains all SWHIDs in topological order from a compressed graph.

local_graph_path = <luigi.parameter.PathParameter object>#
topology_dir = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
object_types = <luigi.parameter.Parameter object>#
direction = <luigi.parameter.ChoiceParameter object>#
requires() List[Task][source]#

Returns an instance of LocalGraph.

output() dict[str, Target][source]#

.csv.zst file that contains the topological order.

run() None[source]#

Runs ‘toposort’ command from tools/topology and compresses

class swh.graph.libs.luigi.topology.UploadGenerationsToS3(*args, **kwargs)[source]#

Bases: Task

Uploads the output of ComputeGenerations to S3

local_graph_path = <luigi.parameter.PathParameter object>#
topology_dir = <luigi.parameter.PathParameter object>#
dataset_name = <luigi.parameter.Parameter object>#
graph_name = <luigi.parameter.Parameter object>#
object_types = <luigi.parameter.Parameter object>#
direction = <luigi.parameter.ChoiceParameter object>#
requires() Task[source]#

Returns an instance of ComputeGenerations.

output() List[Target][source]#

Returns .bitstream and .bin paths on S3.

run() None[source]#

Copies the files

class swh.graph.libs.luigi.topology.CountPaths(*args, **kwargs)[source]#

Bases: Task

Creates a file that lists:

  • the number of paths leading to each node, and starting from all leaves, and

  • the number of paths leading to each node, and starting from all other nodes

Counts include the trivial self-path (zero-length path from a node to itself).

local_graph_path = <luigi.parameter.PathParameter object>#
topology_dir = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
write_parquet = <luigi.parameter.BoolParameter object>#
object_types = <luigi.parameter.Parameter object>#
direction = <luigi.parameter.ChoiceParameter object>#
property resources#

Return the estimated RAM use of this task.

requires() Dict[str, Task][source]#

Returns an instance of LocalGraph and one of ComputeGenerations.

output() dict[str, LocalTarget][source]#

Parquet directory (if enabled) and binary arrays with path counts.

run() None[source]#

Runs ‘count_paths’ command from tools/topology

class swh.graph.libs.luigi.topology.CountDescendants(*args, **kwargs)[source]#

Bases: Task

Counts the number of unique descendants (or ancestors) of each node using a probabilistic HyperLogLog counter.

With --direction forward, for each node counts unique nodes reachable from it. With --direction backward, counts unique nodes that can reach it.

local_graph_path = <luigi.parameter.PathParameter object>#
topology_dir = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
object_types = <luigi.parameter.Parameter object>#
write_parquet = <luigi.parameter.BoolParameter object>#
direction = <luigi.parameter.ChoiceParameter object>#
rsd = <luigi.parameter.FloatParameter object>#
seed = <luigi.parameter.OptionalIntParameter object>#
property resources#

Return the estimated RAM use of this task.

requires() Dict[str, Task][source]#

Returns an instance of LocalGraph and one of ComputeGenerations.

output() dict[str, LocalTarget][source]#

Parquet directory (if enabled) and binary array with descendant counts.

run() None[source]#

Runs ‘count_descendants’ command from tools/topology

class swh.graph.libs.luigi.topology.PathCountsParquetToS3(*args, **kwargs)[source]#

Bases: _ParquetToS3ToAthenaTask

Reads the CSV from CountPaths, converts it to ORC, upload the ORC to S3, and create an Athena table for it.

topology_dir = <luigi.parameter.PathParameter object>#
object_types = <luigi.parameter.Parameter object>#
direction = <luigi.parameter.ChoiceParameter object>#
dataset_name = <luigi.parameter.Parameter object>#
s3_athena_output_location = <swh.export.luigi.S3PathParameter object>#
requires() CountPaths[source]#

Returns corresponding CountPaths instance