Source code for swh.graph.download
# Copyright (C) 2022-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import multiprocessing.dummy
from pathlib import Path
from typing import Callable
import boto3
import botocore
import tqdm
[docs]
class GraphDownloader:
def __init__(
self,
local_graph_path: Path,
s3_graph_path: str,
parallelism: int,
) -> None:
if not s3_graph_path.startswith("s3://"):
raise ValueError("Unsupported S3 URL")
self.s3 = boto3.resource("s3")
# don't require credentials to list the bucket
self.s3.meta.client.meta.events.register(
"choose-signer.s3.*", botocore.handlers.disable_signing
)
self.client = boto3.client(
"s3",
config=botocore.client.Config(
# https://github.com/boto/botocore/issues/619
max_pool_connections=10 * parallelism,
# don't require credentials to download files
signature_version=botocore.UNSIGNED,
),
)
self.local_graph_path = local_graph_path
self.s3_graph_path = s3_graph_path
self.bucket_name, self.prefix = self.s3_graph_path[len("s3://") :].split("/", 1)
self.compression_metadata_obj = None
self.parallelism = parallelism
def _download_file(self, obj, write_if_stamp=False):
import subprocess
assert obj.key.startswith(self.prefix)
relative_path = obj.key.removeprefix(self.prefix).lstrip("/")
if relative_path == "meta/compression.json" and not write_if_stamp:
# Will copy it last
self.compression_metadata_obj = obj
return
local_path = self.local_graph_path / relative_path
local_path.parent.mkdir(parents=True, exist_ok=True)
if relative_path.endswith(".bin.zst"):
# The file was compressed before uploading to S3, we need it
# to be decompressed locally
# write to a temporary location, so we don't end up with a partial file download
# or decompression is interrupted.
tmp_local_path = local_path.with_suffix(".tmp")
proc = subprocess.Popen(
["zstdmt", "--force", "-d", "-", "-o", str(tmp_local_path)],
stdin=subprocess.PIPE,
)
for chunk in obj.get()["Body"].iter_chunks(102400):
proc.stdin.write(chunk)
proc.stdin.close()
ret = proc.wait()
if ret != 0:
raise ValueError(f"zstdmt could not decompress {local_path}")
tmp_local_path.rename(str(local_path)[0:-4])
else:
self.client.download_file(
Bucket=self.bucket_name,
Key=obj.key,
Filename=str(local_path),
)
return relative_path
[docs]
def download_graph(
self,
progress_percent_cb: Callable[[int], None],
progress_status_cb: Callable[[str], None],
):
bucket = self.s3.Bucket(self.bucket_name)
# recursively copy local files to S3, and end with compression metadata
objects = list(bucket.objects.filter(Prefix=self.prefix))
with multiprocessing.dummy.Pool(self.parallelism) as p:
for i, relative_path in tqdm.tqdm(
enumerate(p.imap_unordered(self._download_file, objects)),
total=len(objects),
desc="Downloading",
):
progress_percent_cb(int(i * 100 / len(objects)))
progress_status_cb(f"Downloaded {relative_path}")
if not self.s3_graph_path.startswith(
tuple(
(
f"s3://softwareheritage/graph/{year}-"
for year in (2018, 2019, 2020, 2021)
)
)
):
# skip metadata download for old graphs, they did not have that file yet
assert (
self.compression_metadata_obj is not None
), "did not see meta/compression.json in directory listing"
# Write it last, to act as a stamp
self._download_file(self.compression_metadata_obj, write_if_stamp=True)