Source code for swh.storage.proxies.retry
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import traceback
from tenacity import RetryCallState, retry, stop_after_attempt, wait_random_exponential
from tenacity.wait import wait_base
from swh.core.api import TransientRemoteException
from swh.storage import StorageSpec, get_storage
from swh.storage.exc import NonRetryableException
from swh.storage.interface import StorageInterface
logger = logging.getLogger(__name__)
[docs]
def should_retry_adding(retry_state: RetryCallState) -> bool:
"""Retry if the error/exception is (probably) not about a caller error"""
attempt = retry_state.outcome
assert attempt
if attempt.failed:
error = attempt.exception()
if isinstance(error, NonRetryableException):
# Don't issue retries for persistent exceptions
return False
elif isinstance(error, (KeyboardInterrupt, SystemExit)):
return False
else:
# Other exception
module = getattr(error, "__module__", None)
if module:
error_name = error.__module__ + "." + error.__class__.__name__
else:
error_name = error.__class__.__name__
logger.warning(
"Retrying RPC call",
exc_info=False,
extra={
"swh_type": "storage_retry",
"swh_exception_type": error_name,
"swh_exception": traceback.format_exc(),
},
)
return True
else:
# No exception
return False
[docs]
class wait_transient_exceptions(wait_base):
"""Wait longer when servers return HTTP 503."""
def __init__(self, wait: float) -> None:
self.wait = wait
def __call__(self, retry_state: RetryCallState) -> float:
attempt = retry_state.outcome
assert attempt
if attempt.failed and isinstance(attempt.exception(), TransientRemoteException):
return self.wait
else:
return 0.0
swh_retry = retry(
retry=should_retry_adding,
wait=wait_random_exponential(multiplier=1, max=10) + wait_transient_exceptions(10),
stop=stop_after_attempt(3),
reraise=True,
)
[docs]
def retry_function(storage, attribute_name):
@swh_retry
def newf(*args, **kwargs):
return getattr(storage, attribute_name)(*args, **kwargs)
return newf
[docs]
class RetryingProxyStorage:
"""Storage implementation which retries adding objects when it specifically
fails (hash collision, integrity error).
"""
def __init__(self, storage: StorageSpec):
self.storage: StorageInterface = get_storage(**storage)
for attribute_name in dir(StorageInterface):
if attribute_name.startswith("_"):
continue
attribute = getattr(self.storage, attribute_name)
if hasattr(attribute, "__call__"):
setattr(
self, attribute_name, retry_function(self.storage, attribute_name)
)