Source code for swh.graph.download
# Copyright (C) 2022-2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import logging
from pathlib import Path
import subprocess
from typing import TYPE_CHECKING, List
from swh.core.s3.downloader import S3Downloader
if TYPE_CHECKING:
from types_boto3_s3.service_resource import ObjectSummary
logger = logging.getLogger(__name__)
[docs]
class GraphDownloader(S3Downloader):
"""Utility class to download a compressed Software Heritage graph dataset
from S3 implementing a download resumption feature in case some files fail to
be downloaded (when connection errors happen for instance).
Example of use::
from swh.graph.download import GraphDownloader
# download "2025-05-18-popular-1k" graph dataset into a sub-directory of the
# current working directory named "2025-05-18-popular-1k"
graph_downloader = GraphDownloader(
local_path="2025-05-18-popular-1k",
s3_url="s3://softareheritage/graph/2025-05-18-popular-1k/compressed/"
)
while not graph_downloader.download():
continue
"""
[docs]
def filter_objects(self, objects: List[ObjectSummary]) -> List[ObjectSummary]:
# meta/compression.json file must be downloaded after all other files
return [obj for obj in objects if not obj.key.endswith("meta/compression.json")]
[docs]
def can_download_file(self, relative_path: str, local_file_path: Path) -> bool:
# do not download again a file compressed with zstd if it was locally uncompressed
return (
not relative_path.endswith(".bin.zst")
or not Path(str(local_file_path)[:-4]).exists()
)
[docs]
def post_download_file(self, relative_path: str, local_file_path: Path) -> None:
if (
relative_path.endswith(".bin.zst")
and not Path(str(local_file_path)[:-4]).exists()
):
# The file was compressed with zstd before uploading it to S3, we need it
# to be decompressed locally
subprocess.check_call(["unzstd", "-d", "-q", "--rm", local_file_path])
def _local_path_size(self) -> int:
# override _local_path_size to get accurate download progress bar
while True:
try:
size = 0
for f in self.local_path.glob("**/*"):
if f.exists() and f.is_file():
relative_path = f.relative_to(self.local_path)
obj_key = self.prefix + str(relative_path)
zst_path = self.local_path / (str(relative_path) + ".zst")
if obj_key + ".zst" in self.file_size and not zst_path.exists():
# file downloaded was compressed with zstd, return its compressed
# size once it has been uncompressed
size += self.file_size[obj_key + ".zst"]
else:
size += f.stat().st_size
return size
except FileNotFoundError:
# files can be removed or renamed by threads while
# globbing them
pass
[docs]
def post_downloads(self) -> None:
for file in self.local_path.rglob("**/*.ef"):
# silence
# https://github.com/vigna/webgraph-rs/commit/b494048f787e3f0a021f6f289d66400bdfb5d5f3
file.touch()
if not self.s3_url.startswith(
tuple(
(
f"s3://softwareheritage/graph/{year}-"
for year in (2018, 2019, 2020, 2021)
)
)
):
# skip metadata download for old graphs, they did not have that file yet
objects = list(self.bucket.objects.filter(Prefix=self.prefix))
for obj in objects:
if obj.key.endswith("meta/compression.json"):
# Write it last, to act as a stamp
self._download_file(obj.key, obj=obj)
break
else:
raise ValueError(
"did not see meta/compression.json in directory listing"
)