swh.datasets.luigi.specific_languages_datasets module#

class swh.datasets.luigi.specific_languages_datasets.ExtractFileExtension(*args, **kwargs)[source]#

Bases: Task

Filters the contents parquet by file extension.

Reads the contents parquet produced by AggregateContentDatasets, extracts the file extension from each filename, and writes a filtered parquet directory containing only rows whose extension is in extensions.

Input columns expected: id, filename (plus any others, which are dropped).

Output columns: id, filename, extension.

aggregate_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
specific_languages_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
extensions#

Parameter whose value is a str.

requires() Task[source]#

Returns an instance of AggregateContentDatasets.

output() LocalTarget[source]#

Directory of Parquet files with columns id, filename, extension.

run() None[source]#

Filters the contents parquet by extension and writes the result.

class swh.datasets.luigi.specific_languages_datasets.JoinFilteredContentsWithNodes(*args, **kwargs)[source]#

Bases: Task

Joins the filtered contents parquet with the nodes parquet to obtain SWHIDs, then resolves each SWHID to a SHA1 hash using swh.digestmap.

Reads the output of ExtractFileExtension and the nodes parquet produced by ExportNodesTable, joins them on id, resolves SWHIDs to SHA1 via swh.digestmap.DigestMap, and writes a single parquet directory.

Output columns: id, filename, extension, swhid, sha1.

aggregate_datasets_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
specific_languages_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
digestmap_path#

Parameter whose value is a path.

In the task definition, use

class MyTask(luigi.Task):
    existing_file_path = luigi.PathParameter(exists=True)
    new_file_path = luigi.PathParameter()

    def run(self):
        # Get data from existing file
        with self.existing_file_path.open("r", encoding="utf-8") as f:
            data = f.read()

        # Output message in new file
        self.new_file_path.parent.mkdir(parents=True, exist_ok=True)
        with self.new_file_path.open("w", encoding="utf-8") as f:
            f.write("hello from a PathParameter => ")
            f.write(data)

At the command line, use

$ luigi --module my_tasks MyTask --existing-file-path <path> --new-file-path <path>
extensions#

Parameter whose value is a str.

requires()[source]#

Returns instances of ExtractFileExtension, ExportNodesTable, and _DigestMap.

output() LocalTarget[source]#

Directory of Parquet files with columns id, filename, extension, swhid, sha1.

run() None[source]#

Joins filtered contents with nodes, resolves SWHIDs to SHA1 via digestmap, and writes the result.