Source code for swh.objstorage.backends.http
# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
import logging
from typing import Dict, Iterator, Optional
from urllib.parse import urljoin
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from swh.model import hashutil
from swh.objstorage.constants import ID_HASH_ALGO
from swh.objstorage.exc import (
NonIterableObjStorageError,
ObjNotFoundError,
ReadOnlyObjStorageError,
)
from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
CompressionFormat,
ObjStorage,
objid_to_default_hex,
timed,
)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.ERROR)
[docs]
class HTTPReadOnlyObjStorage(ObjStorage):
"""Simple ObjStorage retrieving objects from an HTTP server.
For example, can be used to retrieve objects from S3::
objstorage:
cls: http
url: https://softwareheritage.s3.amazonaws.com/content/
Retry strategy can be defined via the 'retry' configuration, e.g.::
objstorage:
cls: http
url: https://softwareheritage.s3.amazonaws.com/content/
retry:
total: 5
backoff_factor: 0.2
status_forcelist:
- 404
- 500
See
https://urllib3.readthedocs.io/en/stable/reference/urllib3.util.html#urllib3.util.Retry
for more details on the possible configuration entries.
"""
name: str = "http"
def __init__(self, url=None, compression: CompressionFormat = "none", **kwargs):
super().__init__(**kwargs)
self.session = Session()
self.root_path = url
if not self.root_path.endswith("/"):
self.root_path += "/"
self.compression = compression
retry: Optional[Dict] = kwargs.get("retry")
if retry is not None:
self.retries_cfg = Retry(**retry)
self.session.mount(
self.root_path, HTTPAdapter(max_retries=self.retries_cfg)
)
[docs]
def check_config(self, *, check_write):
"""Check the configuration for this object storage"""
return check_write is False
@timed
def __contains__(self, obj_id: ObjId) -> bool:
resp = self.session.head(self._path(obj_id))
return resp.status_code == 200
def __iter__(self) -> Iterator[CompositeObjId]:
raise NonIterableObjStorageError("__iter__")
def __len__(self):
raise NonIterableObjStorageError("__len__")
[docs]
@timed
def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None:
raise ReadOnlyObjStorageError("add")
[docs]
def delete(self, obj_id: ObjId):
raise ReadOnlyObjStorageError("delete")
[docs]
def restore(self, content: bytes, obj_id: ObjId) -> None:
raise ReadOnlyObjStorageError("restore")
[docs]
def list_content(
self,
last_obj_id: Optional[ObjId] = None,
limit: Optional[int] = DEFAULT_LIMIT,
) -> Iterator[CompositeObjId]:
raise NonIterableObjStorageError("__len__")
[docs]
@timed
def get(self, obj_id: ObjId) -> bytes:
try:
resp = self.session.get(self._path(obj_id))
resp.raise_for_status()
except Exception:
raise ObjNotFoundError(obj_id)
return self.decompress(resp.content, objid_to_default_hex(obj_id))
[docs]
def download_url(
self,
obj_id: ObjId,
content_disposition: Optional[str] = None,
expiry: Optional[timedelta] = None,
) -> Optional[str]:
return self._path(obj_id)
def _hash(self, obj_id: ObjId) -> bytes:
if isinstance(obj_id, dict):
return obj_id[ID_HASH_ALGO]
else:
return obj_id
def _path(self, obj_id):
return urljoin(self.root_path, hashutil.hash_to_hex(self._hash(obj_id)))