Source code for swh.objstorage.backends.http

# Copyright (C) 2021-2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import asyncio
from datetime import timedelta
import logging
from typing import Dict, Iterable, Iterator, List, Optional
from urllib.parse import urljoin

import aiohttp
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from swh.model import hashutil
from swh.objstorage.constants import LiteralPrimaryHash
from swh.objstorage.exc import ObjNotFoundError, ReadOnlyObjStorageError
from swh.objstorage.interface import HashDict
from swh.objstorage.objstorage import (
    CompressionFormat,
    ObjStorage,
    objid_to_default_hex,
    timed,
)

LOGGER = logging.getLogger(__name__)


[docs] class HTTPReadOnlyObjStorage(ObjStorage): """Simple ObjStorage retrieving objects from an HTTP server. For example, can be used to retrieve objects from S3:: objstorage: cls: http url: https://softwareheritage.s3.amazonaws.com/content/ compression: gzip Retry strategy can be defined via the 'retry' configuration, e.g.:: objstorage: cls: http url: https://softwareheritage.s3.amazonaws.com/content/ compression: gzip retry: total: 5 backoff_factor: 0.2 status_forcelist: - 404 - 500 See https://urllib3.readthedocs.io/en/stable/reference/urllib3.util.html#urllib3.util.Retry for more details on the possible configuration entries. The :meth:`get_batch` method is implemented with ``aiohttp`` to improve the performance of object downloads. The maximum number of simultaneous connections can be set using the ``batch_max_connections`` parameter of that class (default to 100). The maximum number of simultaneous connections to the same host can be set using the ``batch_max_connections_per_host`` parameter of that class (default to 0 for no limit). """ primary_hash: LiteralPrimaryHash = "sha1" name: str = "http" def __init__( self, url=None, compression: CompressionFormat | None = None, batch_max_connections: int = 100, batch_max_connections_per_host: int = 0, **kwargs, ): super().__init__(**kwargs) self.session = Session() self.root_path = url if not self.root_path.endswith("/"): self.root_path += "/" if compression is None: LOGGER.warning( "Deprecated: compression is undefined. " "Defaulting to none, but please set it explicitly." ) compression = "none" self.compression = compression self.batch_max_connections = batch_max_connections self.batch_max_connections_per_host = batch_max_connections_per_host retry: Optional[Dict] = kwargs.get("retry") if retry is not None: self.retries_cfg = Retry(**retry) self.session.mount( self.root_path, HTTPAdapter(max_retries=self.retries_cfg) )
[docs] def check_config(self, *, check_write): """Check the configuration for this object storage""" return check_write is False
@timed def __contains__(self, obj_id: HashDict) -> bool: resp = self.session.head(self._path(obj_id)) return resp.status_code == 200
[docs] @timed def add( self, content: bytes, obj_id: HashDict, check_presence: bool = True ) -> None: raise ReadOnlyObjStorageError("add")
[docs] def delete(self, obj_id: HashDict): raise ReadOnlyObjStorageError("delete")
[docs] def restore(self, content: bytes, obj_id: HashDict) -> None: raise ReadOnlyObjStorageError("restore")
[docs] @timed def get(self, obj_id: HashDict) -> bytes: try: resp = self.session.get(self._path(obj_id)) resp.raise_for_status() except Exception: raise ObjNotFoundError(obj_id) return self.decompress( resp.content, objid_to_default_hex(obj_id, self.primary_hash) )
[docs] @timed def get_batch(self, obj_ids: Iterable[HashDict]) -> Iterator[Optional[bytes]]: return iter(asyncio.run(self._contents_get(list(obj_ids))))
[docs] def download_url( self, obj_id: HashDict, content_disposition: Optional[str] = None, expiry: Optional[timedelta] = None, ) -> Optional[str]: return self._path(obj_id)
def _hash(self, obj_id: HashDict) -> bytes: return obj_id[self.primary_hash] def _path(self, obj_id): return urljoin(self.root_path, hashutil.hash_to_hex(self._hash(obj_id))) async def _content_get( self, obj_id: HashDict, session: aiohttp.ClientSession, ) -> Optional[bytes]: try: url = self._path(obj_id) async with session.get(url) as response: response.raise_for_status() content = await response.read() return self.decompress( content, objid_to_default_hex(obj_id, self.primary_hash) ) except Exception as e: LOGGER.debug( "Unable to fetch or process content from URL %s due to %s.", url, str(e) ) return None async def _contents_get(self, obj_ids: List[HashDict]) -> List[Optional[bytes]]: async with aiohttp.ClientSession( connector=aiohttp.TCPConnector( limit=self.batch_max_connections, limit_per_host=self.batch_max_connections_per_host, ) ) as session: return await asyncio.gather( *(self._content_get(obj_id, session) for obj_id in obj_ids) )