Source code for swh.core.s3.downloader

# Copyright (C) 2025-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from __future__ import annotations

import concurrent
import logging
import math
from operator import attrgetter
from pathlib import Path
import shutil
import threading
import time
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set

if TYPE_CHECKING:
    from types_boto3_s3.service_resource import ObjectSummary

STREAM_CHUNK_SIZE = 256 * 1024  # 256 Kib

logger = logging.getLogger(__name__)


[docs] class S3Downloader: """Utility class to recursively download the content of a directory on S3. It also implements a download resumption feature in case some files fail to be downloaded (when connection errors happen for instance). Args: local_path: path of directory where files will be downloaded s3_url: URL of directory in a S3 bucket (``s3://<bucket_name>/<path>/``) parallelism: maximum number of threads for downloading files multipart_download_chunk_size: if a file to download has its size greater than that value (default to 1Gib), it will be downloaded concurrently by chunks of that size and re-assembled afterwards as it improves download time Example of use:: from swh.core.s3.downloader import S3Downloader # download "2025-05-18-popular-1k" datasets (ORC and compressed graph) # into a sub-directory of the current working directory named "2025-05-18-popular-1k" s3_downloader = S3Downloader( local_path="2025-05-18-popular-1k", s3_url="s3://softareheritage/graph/2025-05-18-popular-1k/", ) while not s3_downloader.download(): continue """ def __init__( self, local_path: Path, s3_url: str, parallelism: int = 5, multipart_download_chunk_size: int = 1024 * 1024 * 1024, # 1Gib ) -> None: if not s3_url.startswith("s3://"): raise ValueError("Unsupported S3 URL") import boto3 import botocore from botocore.handlers import disable_signing # silence noisy debug logs we are not interested about for module in ("boto3", "botocore", "s3transfer", "urllib3"): logging.getLogger(module).setLevel(logging.WARNING) self.s3 = boto3.resource("s3") # don't require credentials to list the bucket self.s3.meta.client.meta.events.register("choose-signer.s3.*", disable_signing) self.client = boto3.client( "s3", config=botocore.client.Config( # https://github.com/boto/botocore/issues/619 max_pool_connections=10 * parallelism, # don't require credentials to download files signature_version=botocore.UNSIGNED, ), ) self.local_path = local_path self.s3_url = s3_url self.bucket_name, self.prefix = self.s3_url[len("s3://") :].split("/", 1) self.parallelism = parallelism self.multipart_download_chunk_size = multipart_download_chunk_size self.next_chunk_to_write: Dict[str, int] = {} self.file_size: Dict[str, int] = {} self.chunks_write_lock = threading.Lock() self.bucket = self.s3.Bucket(self.bucket_name) def _download_file( self, obj_key: str, # threading event to gracefully terminate thread when a download failed shutdown_event: Optional[threading.Event] = None, local_file_path: Optional[Path] = None, prefix: Optional[str] = None, obj: Optional[ObjectSummary] = None, chunk_id: Optional[int] = None, ) -> str: prefix = prefix or self.prefix assert obj_key.startswith(prefix) relative_path = obj_key.removeprefix(prefix).lstrip("/") if local_file_path is None: local_file_path = self.local_path / relative_path local_file_path.parent.mkdir(parents=True, exist_ok=True) if obj is not None: file_size = obj.size else: # fetch size of object to download object_metadata = self.client.head_object( Bucket=self.bucket_name, Key=obj_key ) file_size = object_metadata["ContentLength"] if local_file_path.exists(): # file already downloaded, we check if it has the correct size and # trigger a new download if it is not local_file_size = local_file_path.stat().st_size if local_file_size != file_size: logger.debug( "File %s exists but has incorrect size, forcing a new download", obj_key, ) local_file_path.unlink() return self._download_file( obj_key, shutdown_event, local_file_path, prefix, obj=obj ) if chunk_id is None: logger.debug("File %s already downloaded, nothing to do", obj_key) else: logger.debug( "File %s already downloaded, no need to download chunk %s", obj_key, chunk_id, ) return relative_path # download or resume download of a file elif self.can_download_file(relative_path, local_file_path): local_chunk_file = None if chunk_id is not None: range_start = chunk_id * self.multipart_download_chunk_size range_end = min( (chunk_id + 1) * self.multipart_download_chunk_size, file_size ) local_chunk_file = self._chunk_file_path(local_file_path, chunk_id) file_part_path = Path(str(local_chunk_file) + ".part") else: range_start = 0 range_end = file_size file_part_path = Path(str(local_file_path) + ".part") download_size = range_end - range_start if local_chunk_file and local_chunk_file.exists(): # chunk file already downloaded, we check if it has the correct size and # trigger a new download if it is not if local_chunk_file.stat().st_size != download_size: local_chunk_file.unlink() return self._download_file( obj_key, shutdown_event, local_file_path, prefix, obj=obj, chunk_id=chunk_id, ) logger.debug( "Chunk file %s already downloaded, nothing to do", local_chunk_file.name, ) return local_chunk_file.name elif file_part_path.exists(): # resume previous download that failed by fetching only the missing bytes filename = file_part_path.name[: -len(".part")] logger.debug("File %s was partially downloaded", filename) file_part_size = file_part_path.stat().st_size logger.debug( "Resuming download of %s from byte %s", filename, file_part_size, ) range_start += file_part_size else: file_part_size = 0 logger.debug("Downloading file %s", file_part_path.name) if file_part_size < download_size: with file_part_path.open("ab") as file_part: assert ( range_start <= range_end ), f"range [{range_start}-{range_end}] is invalid" object_ = self.client.get_object( Bucket=self.bucket_name, Key=obj_key, Range=f"bytes={range_start}-{range_end - 1}", ) for chunk in object_["Body"].iter_chunks(STREAM_CHUNK_SIZE): file_part.write(chunk) if shutdown_event and shutdown_event.is_set(): # some files failed to be downloaded so abort current download to # save state and inform user early by returning False in # download_graph method file_part.flush() break if file_part_path.stat().st_size == download_size: # file fully downloaded, rename it file_path = Path(str(file_part_path)[: -len(".part")]) file_part_path.rename(file_path) logger.debug("Downloaded file %s", file_path.name) if chunk_id is None: self.post_download_file(relative_path, local_file_path) elif chunk_id is not None: # remove partial download of chunk file file_part_path.unlink() else: logger.debug( "File %s already downloaded and uncompressed, nothing to do", obj_key, ) return relative_path def _chunks_append_worker(self, shutdown_event: threading.Event): # this method is executed in a thread and takes care of re-assembling # files downloaded by chunks while self.next_chunk_to_write: if shutdown_event.is_set(): return for obj_key, next_chunk in list(self.next_chunk_to_write.items()): relative_path = obj_key.removeprefix(self.prefix).lstrip("/") local_file_path = self.local_path / relative_path file_part_path = Path(str(local_file_path) + ".part") next_chunk_file_path = self._chunk_file_path( local_file_path, next_chunk ) with open(file_part_path, "ab") as part_file: while next_chunk_file_path.exists(): # while consecutive chunk files are fully downloaded, append # their bytes to final file with open(next_chunk_file_path, "rb") as next_chunk_file: shutil.copyfileobj(next_chunk_file, part_file) next_chunk_file_path.unlink() next_chunk += 1 next_chunk_file_path = self._chunk_file_path( local_file_path, next_chunk ) # keep track of next chunk to append self.next_chunk_to_write[obj_key] = next_chunk if file_part_path.stat().st_size == self.file_size[obj_key]: # final file fully downloaded and assembled, rename it file_path = Path(str(file_part_path)[: -len(".part")]) file_part_path.rename(file_path) self.next_chunk_to_write.pop(obj_key) logger.debug("Assembled file %s", file_path.name) self.post_download_file(relative_path, local_file_path) time.sleep(1) def _chunk_file_path(self, file_path: Path, chunk_id: int) -> Path: return Path(str(file_path) + f".chunk{chunk_id}") def _local_path_size(self) -> int: while True: try: return sum( f.stat().st_size for f in self.local_path.glob("**/*") if f.exists() and f.is_file() ) except FileNotFoundError: # files can be removed or renamed by threads while # globbing them pass
[docs] def download( self, progress_percent_cb: Callable[[int], None] = lambda _: None, progress_status_cb: Callable[[str], None] = lambda _: None, ) -> bool: """Execute the download of files from S3 in parallel using a pool of threads. Args: progress_percent_cb: Optional callback function to report the overall progress of the downloads progress_status_cb: Optional callback function to get status messages related to downloaded files Returns: :const:`True` if all files were successfully downloaded, :const:`False` if an error occurred while downloading a file, in that case calling that method again will resume such incomplete downloads """ import tqdm executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.parallelism) shutdown_event = threading.Event() try: # recursively copy local files to S3 objects = self.filter_objects( list(self.bucket.objects.filter(Prefix=self.prefix)) ) objects_total_size = 0 for obj in objects: objects_total_size += obj.size self.file_size[obj.key] = obj.size chunks_append_future = None with tqdm.tqdm( total=objects_total_size, desc="Downloading", unit="iB", unit_scale=True, unit_divisor=1024, ) as progress: futures: Set[concurrent.futures.Future] = set() for obj in sorted(objects, key=attrgetter("size")): nb_chunks = math.ceil(obj.size / self.multipart_download_chunk_size) if nb_chunks > 1: # file is large so it will be downloaded concurrently by chunks # and re-assembled afterwards as it improves download time rpath = obj.key.removeprefix(self.prefix).lstrip("/") local_file_part_path = self.local_path / (rpath + ".part") if not local_file_part_path.exists(): self.next_chunk_to_write[obj.key] = 0 else: # file partially assembled in a previous run that failed, # compute first chunk number to continue assembling it part_file_size = local_file_part_path.stat().st_size self.next_chunk_to_write[obj.key] = ( part_file_size // self.multipart_download_chunk_size ) if chunks_append_future is None: # start thread whose goal is to re-assemble files downloaded # by chunks prior download threads chunks_append_future = executor.submit( self._chunks_append_worker, shutdown_event ) futures.add(chunks_append_future) futures.update( executor.submit( self._download_file, obj.key, shutdown_event, obj=obj, chunk_id=chunk_id, ) for chunk_id in range( self.next_chunk_to_write[obj.key], nb_chunks ) ) else: futures.add( executor.submit( self._download_file, obj.key, shutdown_event, obj=obj ) ) not_done = futures last_local_path_size = self._local_path_size() progress.update(last_local_path_size) while not_done: # poll future states every second in order to abort downloads # on first detected error done, not_done = concurrent.futures.wait( not_done, timeout=1, return_when=concurrent.futures.FIRST_COMPLETED, ) local_path_size = min(self._local_path_size(), objects_total_size) progress.update(local_path_size - last_local_path_size) last_local_path_size = local_path_size for future in done: progress_percent_cb(int(progress.n * 100 / progress.total)) progress_status_cb(f"Downloaded {future.result()}") concurrent.futures.wait(futures) self.post_downloads() except BaseException as e: logger.exception("Error occurred while downloading") # notify download threads to immediately terminate shutdown_event.set() # shutdown pool of download threads executor.shutdown(cancel_futures=True) # iterate on downloaded files to log partial ones for relative_path in self.local_path.rglob("**/*"): if relative_path.match("*.part"): logger.debug( "Downloaded bytes for %s dumped in %s to resume download later", relative_path.name[:-5], relative_path, ) if isinstance(e, (KeyboardInterrupt, SystemExit)): # ensure to terminate with error on keyboard interrupt raise SystemExit(1) return False return True
[docs] def filter_objects(self, objects: List[ObjectSummary]) -> List[ObjectSummary]: """Method that can be overridden in derived classes to filter files to download, return all files by default. Args: objects: list of files recursively discovered from the S3 directory Returns: filtered list of files to download """ return objects
[docs] def can_download_file(self, relative_path: str, local_file_path: Path) -> bool: """Method that can be overridden in derived classes to prevent download of a file under certain conditions, download all files by default. Args: relative_path: path of file relative to the S3 directory local_file_path: local path where the file is downloaded Returns: whether to download the file or not """ return True
[docs] def post_download_file(self, relative_path: str, local_file_path: Path) -> None: """Method that can be overridden in derived classes to execute a post processing on a downloaded file (uncompress it for instance). Args: relative_path: path of file relative to the S3 directory local_file_path: local path where the file is downloaded """ pass
[docs] def post_downloads(self) -> None: """Method that can be overridden in derived classes to execute a post processing after all files were downloaded.""" pass