swh.graph.luigi.provenance module#

Luigi tasks to help compute the provenance of content blobs#

This module contains Luigi tasks driving the computation of a topological order, and count the number of paths to every node.

File layout#

This assumes a local compressed graph (from swh.graph.luigi.compressed_graph) is present, and generates/manipulates the following files:

base_dir/
    <date>[_<flavor>]/
        provenance/
            topological_order_dfs.csv.zst
class swh.graph.luigi.provenance.SortRevrelByDate(*args, **kwargs)[source]#

Bases: Task

Creates a file that contains all revision/release author dates and their SWHIDs in date order from a graph export.

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
requires() Dict[str, Task][source]#

Returns LocalGraph and LocalExport instances

output() LocalTarget[source]#

Returns {provenance_dir}/revrel_by_author_date.csv.zst

run() None[source]#

For each ORC revision or release file, read it with pyorc, produce a “date,swhid” CSV, and sort it with GNU sort. Then merge all outputs with GNU sort.

static orc_to_csv()[source]#

Must be called as a CLI script. Syntax: {rev,rel} path/to/dataset/file.orc

Reads an ORC file containing revisions or releases, and writes a CSV to its stdout, containing a date and a SWHID on each row.

class swh.graph.luigi.provenance.ListEarliestRevisions(*args, **kwargs)[source]#

Bases: Task

Creates a file that contains all directory/content SWHIDs, along with the first revision/release author date and SWHIDs they occur in.

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
property resources#

Returns the value of self.max_ram_mb

requires() Dict[str, Task][source]#

Returns LocalGraph and SortRevrelByDate instances.

output() Dict[str, LocalTarget][source]#

Returns provenance_dir/revrel_by_author_date.csv.zst and :file:`{provenance_dir}/earliest_timestamps.bin.

run() None[source]#

Runs org.softwareheritage.graph.utils.ListEarliestRevisions

class swh.graph.luigi.provenance.ListDirectoryMaxLeafTimestamp(*args, **kwargs)[source]#

Bases: Task

Creates a file that contains all directory/content SWHIDs, along with the first revision/release author date and SWHIDs they occur in.

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
topological_order_dir = <luigi.parameter.PathParameter object>#
property resources#

Returns the value of self.max_ram_mb

requires() Dict[str, Task][source]#

Returns LocalGraph, TopoSort, and ListEarliestRevisions instances.

output() LocalTarget[source]#

Returns {provenance_dir}/max_leaf_timestamps.bin

run() None[source]#

Runs org.softwareheritage.graph.utils.ListDirectoryMaxLeafTimestamp

class swh.graph.luigi.provenance.ComputeDirectoryFrontier(*args, **kwargs)[source]#

Bases: Task

Creates a file that contains the “directory frontier” as defined by swh-provenance.

In short, it is a directory which directly contains a file (not a directory), which is a non-root directory in a revision newer than the directory timestamp computed by ListDirectoryMaxLeafTimestamp.

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
topological_order_dir = <luigi.parameter.PathParameter object>#
batch_size = <luigi.parameter.IntParameter object>#
property resources#

Returns the value of self.max_ram_mb

requires() Dict[str, Task][source]#

Returns LocalGraph and ListDirectoryMaxLeafTimestamp instances.

output() LocalTarget[source]#

Returns {provenance_dir}/directory_frontier.csv.zst

run() None[source]#

Runs org.softwareheritage.graph.utils.ComputeDirectoryFrontier

class swh.graph.luigi.provenance.DeduplicateFrontierDirectories(*args, **kwargs)[source]#

Bases: Task

Reads the output of ComputeDirectoryFrontier (which outputs (directory, revision) pairs), and returns the set of directories in it.

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
topological_order_dir = <luigi.parameter.PathParameter object>#
batch_size = <luigi.parameter.IntParameter object>#
requires() Dict[str, Task][source]#

Returns LocalGraph and ListDirectoryMaxLeafTimestamp instances.

output() LocalTarget[source]#

Returns {provenance_dir}/directory_frontier.deduplicated.csv.zst

run()[source]#

Runs cut | sort --uniq to produce unique directory SWHIDs from directory_frontier.csv.zst.

class swh.graph.luigi.provenance.ListContentsInRevisionsWithoutFrontier(*args, **kwargs)[source]#

Bases: Task

Creates a file that contains the list of (file, revision) where the file is reachable from the revision without going through any “directory frontier” as defined by swh-provenance.

In short, it is a directory which directly contains a file (not a directory), which is a non-root directory in a revision newer than the directory timestamp computed by ListDirectoryMaxLeafTimestamp.

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
topological_order_dir = <luigi.parameter.PathParameter object>#
batch_size = <luigi.parameter.IntParameter object>#
property resources#

Returns the value of self.max_ram_mb

requires() Dict[str, Task][source]#

Returns LocalGraph and ListDirectoryMaxLeafTimestamp instances.

output() LocalTarget[source]#

Returns {provenance_dir}/directory_frontier.csv.zst

run() None[source]#

Runs org.softwareheritage.graph.utils.ListContentsInRevisionsWithoutFrontier

class swh.graph.luigi.provenance.ListContentsInFrontierDirectories(*args, **kwargs)[source]#

Bases: Task

Enumerates all contents in all directories returned by ComputeDirectoryFrontier.

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
topological_order_dir = <luigi.parameter.PathParameter object>#
property resources#

Returns the value of self.max_ram_mb

requires() Dict[str, Task][source]#

Returns LocalGraph and ComputeDirectoryFrontier instances.

output() LocalTarget[source]#

Returns {provenance_dir}/contents_in_frontier_directories.csv.zst

run() None[source]#

Runs org.softwareheritage.graph.utils.ListContentsInDirectories

class swh.graph.luigi.provenance.RunProvenance(*args, **kwargs)[source]#

Bases: WrapperTask

(Transitively) depends on all provenance tasks

local_export_path = <luigi.parameter.PathParameter object>#
local_graph_path = <luigi.parameter.PathParameter object>#
graph_name = <luigi.parameter.Parameter object>#
provenance_dir = <luigi.parameter.PathParameter object>#
topological_order_dir = <luigi.parameter.PathParameter object>#
requires()[source]#

Returns ListContentsInFrontierDirectories and ListContentsInRevisionsWithoutFrontier