swh.datasets.luigi.file_names module#
Luigi tasks for producing the most common names of every content and datasets based on file names#
- class swh.datasets.luigi.file_names.PopularContentNames(*args, **kwargs)[source]#
Bases:
TaskCreates a CSV file that contains the most popular name(s) of each content
- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- popular_contents_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str.
- max_results_per_content#
Parameter whose value is an
int.
- popularity_threshold#
Parameter whose value is an
int.
- property resources#
Return the estimated RAM use of this task.
- class swh.datasets.luigi.file_names.PopularContentPaths(*args, **kwargs)[source]#
Bases:
TaskCreates a CSV file that contains the most popular path of each content/directory given as input
- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- popular_contents_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str.
- input_swhids#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- max_depth#
Parameter whose value is an
int.
- property resources#
Return the estimated RAM use of this task.
- class swh.datasets.luigi.file_names.PopularContentNamesOrcToS3(*args, **kwargs)[source]#
Bases:
_ParquetToS3ToAthenaTaskReads the CSV from
PopularContents, converts it to ORC, upload the ORC to S3, and create an Athena table for it.- popular_contents_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- dataset_name#
Parameter whose value is a
str.
- s3_athena_output_location#
A parameter that strip trailing slashes
- requires() PopularContentNames[source]#
Returns corresponding PopularContentNames instance
- class swh.datasets.luigi.file_names.ListFilesByName(*args, **kwargs)[source]#
Bases:
TaskFrom every refs/heads/master, refs/heads/main, or HEAD branch in any snapshot, browse the whole directory tree looking for files named <filename>, and lists them to stdout.
- local_graph_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- graph_name#
Parameter whose value is a
str.
- output_path#
Parameter whose value is a path.
In the task definition, use
class MyTask(luigi.Task): existing_file_path = luigi.PathParameter(exists=True) new_file_path = luigi.PathParameter() def run(self): # Get data from existing file with self.existing_file_path.open("r", encoding="utf-8") as f: data = f.read() # Output message in new file self.new_file_path.parent.mkdir(parents=True, exist_ok=True) with self.new_file_path.open("w", encoding="utf-8") as f: f.write("hello from a PathParameter => ") f.write(data)
At the command line, use
$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
- file_name#
Parameter whose value is a
str.
- property resources#
Return the estimated RAM use of this task.