Source code for swh.graph.download

# Copyright (C) 2022-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from __future__ import annotations

import logging
from pathlib import Path
import subprocess
from typing import TYPE_CHECKING, List

from swh.core.s3.downloader import S3Downloader

if TYPE_CHECKING:
    from types_boto3_s3.service_resource import ObjectSummary


logger = logging.getLogger(__name__)


[docs] class GraphDownloader(S3Downloader): """Utility class to download a compressed Software Heritage graph dataset from S3 implementing a download resumption feature in case some files fail to be downloaded (when connection errors happen for instance). Example of use:: from swh.graph.download import GraphDownloader # download "2025-05-18-popular-1k" graph dataset into a sub-directory of the # current working directory named "2025-05-18-popular-1k" graph_downloader = GraphDownloader( local_path="2025-05-18-popular-1k", s3_url="s3://softareheritage/graph/2025-05-18-popular-1k/compressed/" ) while not graph_downloader.download(): continue """
[docs] def filter_objects(self, objects: List[ObjectSummary]) -> List[ObjectSummary]: # meta/compression.json file must be downloaded after all other files return [obj for obj in objects if not obj.key.endswith("meta/compression.json")]
[docs] def can_download_file(self, relative_path: str, local_file_path: Path) -> bool: # do not download again a file compressed with zstd if it was locally uncompressed return ( not relative_path.endswith(".bin.zst") or not Path(str(local_file_path)[:-4]).exists() )
[docs] def post_download_file(self, relative_path: str, local_file_path: Path) -> None: if ( relative_path.endswith(".bin.zst") and not Path(str(local_file_path)[:-4]).exists() ): # The file was compressed with zstd before uploading it to S3, we need it # to be decompressed locally subprocess.check_call(["unzstd", "-d", "-q", "--rm", local_file_path])
[docs] def post_downloads(self) -> None: if not self.s3_url.startswith( tuple( ( f"s3://softwareheritage/graph/{year}-" for year in (2018, 2019, 2020, 2021) ) ) ): # skip metadata download for old graphs, they did not have that file yet objects = list(self.bucket.objects.filter(Prefix=self.prefix)) for obj in objects: if obj.key.endswith("meta/compression.json"): # Write it last, to act as a stamp self._download_file(obj.key) break else: raise ValueError( "did not see meta/compression.json in directory listing" )