# Copyright (C) 2025-2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import concurrent
import logging
import math
from operator import attrgetter
from pathlib import Path
import shutil
import threading
import time
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set
if TYPE_CHECKING:
from types_boto3_s3.service_resource import ObjectSummary
STREAM_CHUNK_SIZE = 256 * 1024 # 256 Kib
logger = logging.getLogger(__name__)
[docs]
class S3Downloader:
"""Utility class to recursively download the content of a directory on S3.
It also implements a download resumption feature in case some files fail to
be downloaded (when connection errors happen for instance).
Args:
local_path: path of directory where files will be downloaded
s3_url: URL of directory in a S3 bucket (``s3://<bucket_name>/<path>/``)
parallelism: maximum number of threads for downloading files
multipart_download_chunk_size: if a file to download has its size greater than
that value (default to 1Gib), it will be downloaded concurrently by chunks
of that size and re-assembled afterwards as it improves download time
Example of use::
from swh.core.s3.downloader import S3Downloader
# download "2025-05-18-popular-1k" datasets (ORC and compressed graph)
# into a sub-directory of the current working directory named "2025-05-18-popular-1k"
s3_downloader = S3Downloader(
local_path="2025-05-18-popular-1k",
s3_url="s3://softareheritage/graph/2025-05-18-popular-1k/",
)
while not s3_downloader.download():
continue
"""
def __init__(
self,
local_path: Path,
s3_url: str,
parallelism: int = 5,
multipart_download_chunk_size: int = 1024 * 1024 * 1024, # 1Gib
) -> None:
if not s3_url.startswith("s3://"):
raise ValueError("Unsupported S3 URL")
import boto3
import botocore
from botocore.handlers import disable_signing
# silence noisy debug logs we are not interested about
for module in ("boto3", "botocore", "s3transfer", "urllib3"):
logging.getLogger(module).setLevel(logging.WARNING)
self.s3 = boto3.resource("s3")
# don't require credentials to list the bucket
self.s3.meta.client.meta.events.register("choose-signer.s3.*", disable_signing)
self.client = boto3.client(
"s3",
config=botocore.client.Config(
# https://github.com/boto/botocore/issues/619
max_pool_connections=10 * parallelism,
# don't require credentials to download files
signature_version=botocore.UNSIGNED,
),
)
self.local_path = local_path
self.s3_url = s3_url
self.bucket_name, self.prefix = self.s3_url[len("s3://") :].split("/", 1)
self.parallelism = parallelism
self.multipart_download_chunk_size = multipart_download_chunk_size
self.next_chunk_to_write: Dict[str, int] = {}
self.file_size: Dict[str, int] = {}
self.chunks_write_lock = threading.Lock()
self.bucket = self.s3.Bucket(self.bucket_name)
def _download_file(
self,
obj_key: str,
# threading event to gracefully terminate thread when a download failed
shutdown_event: Optional[threading.Event] = None,
local_file_path: Optional[Path] = None,
prefix: Optional[str] = None,
obj: Optional[ObjectSummary] = None,
chunk_id: Optional[int] = None,
) -> str:
prefix = prefix or self.prefix
assert obj_key.startswith(prefix)
relative_path = obj_key.removeprefix(prefix).lstrip("/")
if local_file_path is None:
local_file_path = self.local_path / relative_path
local_file_path.parent.mkdir(parents=True, exist_ok=True)
if obj is not None:
file_size = obj.size
else:
# fetch size of object to download
object_metadata = self.client.head_object(
Bucket=self.bucket_name, Key=obj_key
)
file_size = object_metadata["ContentLength"]
if local_file_path.exists():
# file already downloaded, we check if it has the correct size and
# trigger a new download if it is not
local_file_size = local_file_path.stat().st_size
if local_file_size != file_size:
logger.debug(
"File %s exists but has incorrect size, forcing a new download",
obj_key,
)
local_file_path.unlink()
return self._download_file(
obj_key, shutdown_event, local_file_path, prefix, obj=obj
)
if chunk_id is None:
logger.debug("File %s already downloaded, nothing to do", obj_key)
else:
logger.debug(
"File %s already downloaded, no need to download chunk %s",
obj_key,
chunk_id,
)
return relative_path
# download or resume download of a file
elif self.can_download_file(relative_path, local_file_path):
local_chunk_file = None
if chunk_id is not None:
range_start = chunk_id * self.multipart_download_chunk_size
range_end = min(
(chunk_id + 1) * self.multipart_download_chunk_size, file_size
)
local_chunk_file = self._chunk_file_path(local_file_path, chunk_id)
file_part_path = Path(str(local_chunk_file) + ".part")
else:
range_start = 0
range_end = file_size
file_part_path = Path(str(local_file_path) + ".part")
download_size = range_end - range_start
if local_chunk_file and local_chunk_file.exists():
# chunk file already downloaded, we check if it has the correct size and
# trigger a new download if it is not
if local_chunk_file.stat().st_size != download_size:
local_chunk_file.unlink()
return self._download_file(
obj_key,
shutdown_event,
local_file_path,
prefix,
obj=obj,
chunk_id=chunk_id,
)
logger.debug(
"Chunk file %s already downloaded, nothing to do",
local_chunk_file.name,
)
return local_chunk_file.name
elif file_part_path.exists():
# resume previous download that failed by fetching only the missing bytes
filename = file_part_path.name[: -len(".part")]
logger.debug("File %s was partially downloaded", filename)
file_part_size = file_part_path.stat().st_size
logger.debug(
"Resuming download of %s from byte %s",
filename,
file_part_size,
)
range_start += file_part_size
else:
file_part_size = 0
logger.debug("Downloading file %s", file_part_path.name)
if file_part_size < download_size:
with file_part_path.open("ab") as file_part:
assert (
range_start <= range_end
), f"range [{range_start}-{range_end}] is invalid"
object_ = self.client.get_object(
Bucket=self.bucket_name,
Key=obj_key,
Range=f"bytes={range_start}-{range_end - 1}",
)
for chunk in object_["Body"].iter_chunks(STREAM_CHUNK_SIZE):
file_part.write(chunk)
if shutdown_event and shutdown_event.is_set():
# some files failed to be downloaded so abort current download to
# save state and inform user early by returning False in
# download_graph method
file_part.flush()
break
if file_part_path.stat().st_size == download_size:
# file fully downloaded, rename it
file_path = Path(str(file_part_path)[: -len(".part")])
file_part_path.rename(file_path)
logger.debug("Downloaded file %s", file_path.name)
if chunk_id is None:
self.post_download_file(relative_path, local_file_path)
elif chunk_id is not None:
# remove partial download of chunk file
file_part_path.unlink()
else:
logger.debug(
"File %s already downloaded and uncompressed, nothing to do",
obj_key,
)
return relative_path
def _chunks_append_worker(self, shutdown_event: threading.Event):
# this method is executed in a thread and takes care of re-assembling
# files downloaded by chunks
while self.next_chunk_to_write:
if shutdown_event.is_set():
return
for obj_key, next_chunk in list(self.next_chunk_to_write.items()):
relative_path = obj_key.removeprefix(self.prefix).lstrip("/")
local_file_path = self.local_path / relative_path
file_part_path = Path(str(local_file_path) + ".part")
next_chunk_file_path = self._chunk_file_path(
local_file_path, next_chunk
)
with open(file_part_path, "ab") as part_file:
while next_chunk_file_path.exists():
# while consecutive chunk files are fully downloaded, append
# their bytes to final file
with open(next_chunk_file_path, "rb") as next_chunk_file:
shutil.copyfileobj(next_chunk_file, part_file)
next_chunk_file_path.unlink()
next_chunk += 1
next_chunk_file_path = self._chunk_file_path(
local_file_path, next_chunk
)
# keep track of next chunk to append
self.next_chunk_to_write[obj_key] = next_chunk
if file_part_path.stat().st_size == self.file_size[obj_key]:
# final file fully downloaded and assembled, rename it
file_path = Path(str(file_part_path)[: -len(".part")])
file_part_path.rename(file_path)
self.next_chunk_to_write.pop(obj_key)
logger.debug("Assembled file %s", file_path.name)
self.post_download_file(relative_path, local_file_path)
time.sleep(1)
def _chunk_file_path(self, file_path: Path, chunk_id: int) -> Path:
return Path(str(file_path) + f".chunk{chunk_id}")
def _local_path_size(self) -> int:
while True:
try:
return sum(
f.stat().st_size
for f in self.local_path.glob("**/*")
if f.exists() and f.is_file()
)
except FileNotFoundError:
# files can be removed or renamed by threads while
# globbing them
pass
[docs]
def download(
self,
progress_percent_cb: Callable[[int], None] = lambda _: None,
progress_status_cb: Callable[[str], None] = lambda _: None,
) -> bool:
"""Execute the download of files from S3 in parallel using a pool of threads.
Args:
progress_percent_cb: Optional callback function to report the overall
progress of the downloads
progress_status_cb: Optional callback function to get status messages
related to downloaded files
Returns:
:const:`True` if all files were successfully downloaded, :const:`False`
if an error occurred while downloading a file, in that case calling that
method again will resume such incomplete downloads
"""
import tqdm
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.parallelism)
shutdown_event = threading.Event()
try:
# recursively copy local files to S3
objects = self.filter_objects(
list(self.bucket.objects.filter(Prefix=self.prefix))
)
objects_total_size = 0
for obj in objects:
objects_total_size += obj.size
self.file_size[obj.key] = obj.size
chunks_append_future = None
with tqdm.tqdm(
total=objects_total_size,
desc="Downloading",
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as progress:
futures: Set[concurrent.futures.Future] = set()
for obj in sorted(objects, key=attrgetter("size")):
nb_chunks = math.ceil(obj.size / self.multipart_download_chunk_size)
if nb_chunks > 1:
# file is large so it will be downloaded concurrently by chunks
# and re-assembled afterwards as it improves download time
rpath = obj.key.removeprefix(self.prefix).lstrip("/")
local_file_part_path = self.local_path / (rpath + ".part")
if not local_file_part_path.exists():
self.next_chunk_to_write[obj.key] = 0
else:
# file partially assembled in a previous run that failed,
# compute first chunk number to continue assembling it
part_file_size = local_file_part_path.stat().st_size
self.next_chunk_to_write[obj.key] = (
part_file_size // self.multipart_download_chunk_size
)
if chunks_append_future is None:
# start thread whose goal is to re-assemble files downloaded
# by chunks prior download threads
chunks_append_future = executor.submit(
self._chunks_append_worker, shutdown_event
)
futures.add(chunks_append_future)
futures.update(
executor.submit(
self._download_file,
obj.key,
shutdown_event,
obj=obj,
chunk_id=chunk_id,
)
for chunk_id in range(
self.next_chunk_to_write[obj.key], nb_chunks
)
)
else:
futures.add(
executor.submit(
self._download_file, obj.key, shutdown_event, obj=obj
)
)
not_done = futures
last_local_path_size = self._local_path_size()
progress.update(last_local_path_size)
while not_done:
# poll future states every second in order to abort downloads
# on first detected error
done, not_done = concurrent.futures.wait(
not_done,
timeout=1,
return_when=concurrent.futures.FIRST_COMPLETED,
)
local_path_size = min(self._local_path_size(), objects_total_size)
progress.update(local_path_size - last_local_path_size)
last_local_path_size = local_path_size
for future in done:
progress_percent_cb(int(progress.n * 100 / progress.total))
progress_status_cb(f"Downloaded {future.result()}")
concurrent.futures.wait(futures)
self.post_downloads()
except BaseException as e:
logger.exception("Error occurred while downloading")
# notify download threads to immediately terminate
shutdown_event.set()
# shutdown pool of download threads
executor.shutdown(cancel_futures=True)
# iterate on downloaded files to log partial ones
for relative_path in self.local_path.rglob("**/*"):
if relative_path.match("*.part"):
logger.debug(
"Downloaded bytes for %s dumped in %s to resume download later",
relative_path.name[:-5],
relative_path,
)
if isinstance(e, (KeyboardInterrupt, SystemExit)):
# ensure to terminate with error on keyboard interrupt
raise SystemExit(1)
return False
return True
[docs]
def filter_objects(self, objects: List[ObjectSummary]) -> List[ObjectSummary]:
"""Method that can be overridden in derived classes to filter files to download,
return all files by default.
Args:
objects: list of files recursively discovered from the S3 directory
Returns:
filtered list of files to download
"""
return objects
[docs]
def can_download_file(self, relative_path: str, local_file_path: Path) -> bool:
"""Method that can be overridden in derived classes to prevent download of
a file under certain conditions, download all files by default.
Args:
relative_path: path of file relative to the S3 directory
local_file_path: local path where the file is downloaded
Returns:
whether to download the file or not
"""
return True
[docs]
def post_download_file(self, relative_path: str, local_file_path: Path) -> None:
"""Method that can be overridden in derived classes to execute a post processing
on a downloaded file (uncompress it for instance).
Args:
relative_path: path of file relative to the S3 directory
local_file_path: local path where the file is downloaded
"""
pass
[docs]
def post_downloads(self) -> None:
"""Method that can be overridden in derived classes to execute a post processing
after all files were downloaded."""
pass