swh.datasets.luigi.origin_contributors module#
Luigi tasks for contribution graph#
This module contains Luigi tasks driving the creation of the graph of contributions of people (pseudonymized by default).
File layout#
This assumes a local compressed graph (from swh.graph.luigi.compressed_graph)
is present, and generates/manipulates the following files:
base_dir/
<date>[_<flavor>]/
datasets/
contribution_graph.csv.zst
topology/
topological_order_dfs.csv.zst
And optionally:
sensitive_base_dir/
<date>[_<flavor>]/
persons_sha256_to_name.csv.zst
datasets/
contribution_graph.deanonymized.csv.zst
- class swh.datasets.luigi.origin_contributors.ListOriginContributors(*args, **kwargs)[source]#
Bases:
TaskCreates a file that contains all SWHIDs in topological order from a compressed graph.
- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- topological_order_dir#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- origin_contributors_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- origin_urls_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- max_ram_mb#
Parameter whose value is an
int.
- property resources#
Returns the value of
self.max_ram_mb
- requires() Dict[str, Task][source]#
Returns an instance of
swh.graph.luigi.compressed_graph.LocalGraphandswh.graph.libs.luigi.topology.ComputeGenerations.
- class swh.datasets.luigi.origin_contributors.ExportDeanonymizationTable(*args, **kwargs)[source]#
Bases:
TaskExports (from swh-storage) a .csv.zst file that contains the columns:
base64(sha256(full_name))`, ``base64(full_name), andescape(full_name).The first column is the anonymized full name found in
graph.persons.csv.zstin the compressed graph, and the latter two are the original name.- storage_dsn#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- deanonymization_table_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- class swh.datasets.luigi.origin_contributors.DeanonymizeContributors(*args, **kwargs)[source]#
Bases:
TaskGenerates a .csv.zst file that contains the columns:
contributor_id,contributor_base64, andcontributor_escaped.The first column is the id of the contributor in the compressed graph, and the latter two are the same as in the file produced by
ExportDeanonymizationTable.- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- deanonymization_table_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- mph_algo#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- deanonymization_mapping_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- requires() List[Task][source]#
Returns an instance of
ExportDeanonymizationTable.
- class swh.datasets.luigi.origin_contributors.DeanonymizeOriginContributors(*args, **kwargs)[source]#
Bases:
TaskGenerates a .csv.zst file similar to
ListOriginContributors’s, but withcontributor_base64andcontributor_escapedcolumns in addition tocontributor_id.This assumes that
graph.persons.csv.zstis anonymized (SHA256 of names instead of names); which may not be true depending on how the swh-dataset export was configured.- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- origin_contributors_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- deanonymization_table_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- deanonymized_origin_contributors_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- mph_algo#
- A parameter which takes two values:
an instance of
Iterableandthe class of the variables to convert to.
In the task definition, use
class MyTask(luigi.Task): my_param = luigi.ChoiceParameter(choices=[0.1, 0.2, 0.3], var_type=float)
At the command line, use
$ luigi --module my_tasks MyTask --my-param 0.1
Consider using
EnumParameterfor a typed, structured alternative. This class can perform the same role when all choices are the same type and transparency of parameter value on the command line is desired.
- deanonymization_mapping_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- requires() List[Task][source]#
Returns instances of
LocalGraph,ListOriginContributors,ExportDeanonymizationTableandDeanonymizeContributors.
- output() Target[source]#
.csv.zst file similar to
ListOriginContributors.output()’s, but withcontributor_base64andcontributor_escapedcolumns in addition tocontributor_id
- class swh.datasets.luigi.origin_contributors.RunOriginContributors(*args, **kwargs)[source]#
Bases:
Task- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- origin_urls_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- origin_contributors_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- deanonymized_origin_contributors_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- skip_integrity_check#
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- test_origin#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- test_person#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- test_years#
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See ParamConfigIngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- requires() List[Task][source]#
Returns instances of
LocalGraph,ListOriginContributors, andExportDeanonymizationTable.