# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter, deque
import datetime
from functools import partial
import logging
import traceback
from typing import Callable
from typing import Counter as CounterT
from typing import Deque, Dict, Iterable, List, cast
import yaml
from swh.model.model import BaseModel, Content
from swh.storage import get_storage
from swh.storage.exc import HashCollision
logger = logging.getLogger(__name__)
[docs]
def now():
return datetime.datetime.now(datetime.UTC)
[docs]
class RateQueue:
def __init__(self, size: int, max_errors: int):
assert size > max_errors
self._size = size
self._max_errors = max_errors
self._errors: Deque[bool] = deque(maxlen=size)
[docs]
def add_ok(self, n_ok: int = 1) -> None:
self._errors.extend([False] * n_ok)
[docs]
def add_error(self, n_error: int = 1) -> None:
self._errors.extend([True] * n_error)
[docs]
def limit_reached(self) -> bool:
return sum(self._errors) > self._max_errors
[docs]
def reset(self):
# mainly for testing purpose
self._errors.clear()
[docs]
class TenaciousProxyStorage:
"""Storage proxy that have a tenacious insertion behavior.
When an xxx_add method is called, it's first attempted as is against the backend
storage. If a failure occurs, split the list of inserted objects in pieces until
erroneous objects have been identified, so all the valid objects are guaranteed to
be inserted.
Also provides a error-rate limit feature: if more than n errors occurred during the
insertion of the last p (window_size) objects, stop accepting any insertion.
The number of insertion retries for a single object can be specified via
the 'retries' parameter.
This proxy is mainly intended to be used in a replayer configuration (aka a
mirror stack), where insertion errors are mostly unexpected (which explains
the low default ratio errors/window_size).
Conversely, it should not be used in a loader configuration, as it may
drop objects without stopping the loader, which leads to holes in the graph.
Deployments using this proxy should carefully monitor their logs to check any
failure is expected (because the failed object is corrupted),
not because of transient errors or issues with the storage backend.
Sample configuration use case for tenacious storage:
.. code-block:: yaml
storage:
cls: tenacious
storage:
cls: remote
args: http://storage.internal.staging.swh.network:5002/
error-rate-limit:
errors: 10
window_size: 1000
"""
tenacious_methods: Dict[str, str] = {
"content_add": "content",
"content_add_metadata": "content",
"skipped_content_add": "skipped_content",
"directory_add": "directory",
"revision_add": "revision",
"extid_add": "extid",
"release_add": "release",
"snapshot_add": "snapshot",
"origin_add": "origin",
}
def __init__(
self,
storage,
error_rate_limit: Dict[str, int] | None = None,
retries: int = 3,
error_reporter: Callable[[str, bytes], None] | None = None,
):
self.storage = get_storage(**storage)
if error_rate_limit is None:
error_rate_limit = {"errors": 10, "window_size": 1000}
assert "errors" in error_rate_limit
assert "window_size" in error_rate_limit
self.rate_queue = RateQueue(
size=error_rate_limit["window_size"],
max_errors=error_rate_limit["errors"],
)
self._single_object_retries: int = retries
self.error_reporter = error_reporter
def __getattr__(self, key):
if key in self.tenacious_methods:
return partial(self._tenacious_add, key)
return getattr(self.storage, key)
def _tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]:
"""Try hard to add as many objects as possible the the backend storage."""
add_function = getattr(self.storage, func_name)
object_type = self.tenacious_methods[func_name]
# list of lists of objects; note this to_add list is consumed from the
# tail. This list is also deduplicated (while keeping the orders of the
# elements; using the list(dict.fronkeys(()) trick) to ensure we don't
# get hit by unicity constraint errors, depending on the actual storage
# backend we have...
to_add: List[List[BaseModel]] = [list(dict.fromkeys(objects))]
n_objs: int = len(to_add[0])
results: CounterT[str] = Counter()
retries: int = self._single_object_retries
while to_add:
if self.rate_queue.limit_reached():
logger.error(
"Too many insertion errors have been detected; "
"disabling insertions"
)
raise RuntimeError(
"Too many insertion errors have been detected; "
"disabling insertions"
)
objs = to_add.pop()
try:
r = add_function(objs)
results.update(r)
self.rate_queue.add_ok(len(objs))
except HashCollision as exc:
# In case we have a HashCollision error and the batch on
# inserted contents only have a few of thems (usually only
# one), then we can be a bit smarter than the generic logic.
# This exception gives us the list of failed objects, so use it
# instead of splitting the batch over and over again
assert object_type in ("content", "skipped_content")
to_add.append(
[
obj
for obj in cast(List[Content], objs)
if obj.hashes() not in exc.colliding_content_hashes()
]
)
dropped_objs = [
obj
for obj in cast(List[Content], objs)
if obj.hashes() in exc.colliding_content_hashes()
]
n_dropped = len(dropped_objs)
logger.info(
"%s: dropping %s %s objects (hash collision)",
func_name,
n_dropped,
object_type,
)
for dropped in dropped_objs:
logger.info("dropped %s", dropped)
if self.error_reporter:
key = f"{now().isoformat()}/{object_type}"
value = {
"exc": traceback.format_exception(exc),
"obj": dropped.to_dict(),
}
self.error_reporter(key, yaml.dump(value).encode())
results.update({f"{object_type}:add:errors": n_dropped})
except Exception as exc:
if len(objs) > 1:
logger.info(
"%s: failed to insert a batch of %s %s objects, splitting",
func_name,
len(objs),
object_type,
)
# reinsert objs split in 2 parts at the end of to_add
to_add.append(objs[(len(objs) // 2) :])
to_add.append(objs[: (len(objs) // 2)])
# each time we append a batch in the to_add bag, reset the
# one-object-batch retries counter
retries = self._single_object_retries
else:
assert len(objs) == 1
obj = objs[0]
retries -= 1
if retries:
logger.info(
"%s: failed to insert an %s, retrying",
func_name,
object_type,
)
# give it another chance
to_add.append(objs)
else:
logger.error(
"%s: failed to insert an object, excluding %s (from a batch of %s)",
func_name,
obj,
n_objs,
)
logger.error(
"Exception was: %s",
repr(exc),
exc_info=not isinstance(exc, HashCollision),
)
results.update({f"{object_type}:add:errors": 1})
if self.error_reporter:
key = f"{now().isoformat()}/{object_type}"
value = {
"obj": obj.to_dict(),
"exc": traceback.format_exception(exc),
}
self.error_reporter(key, yaml.dump(value).encode())
self.rate_queue.add_error()
# reset the retries counter (needed in case the next
# batch is also 1 element only)
retries = self._single_object_retries
return dict(results)
[docs]
def reset(self):
self.rate_queue.reset()