Source code for swh.storage.proxies.buffer

# Copyright (C) 2019-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from functools import partial
import logging
from typing import Dict, Iterable, Mapping, Sequence, Tuple, cast

from typing_extensions import Literal

from swh.core.utils import grouper
from swh.model.model import (
    BaseModel,
    Content,
    Directory,
    RawExtrinsicMetadata,
    Release,
    Revision,
    SkippedContent,
)
from swh.storage import get_storage
from swh.storage.interface import StorageInterface

logger = logging.getLogger(__name__)

LObjectType = Literal[
    "raw_extrinsic_metadata",
    "content",
    "skipped_content",
    "directory",
    "revision",
    "release",
    "snapshot",
    "extid",
]

# In the order objects are flushed
OBJECT_TYPES: Tuple[LObjectType, ...] = (
    "raw_extrinsic_metadata",
    "content",
    "skipped_content",
    "directory",
    "revision",
    "release",
    "snapshot",
    "extid",
)

DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = {
    "content": 10000,
    "content_bytes": 100 * 1024 * 1024,
    "skipped_content": 10000,
    "directory": 25000,
    "directory_entries": 200000,
    "raw_extrinsic_metadata": 1000,
    "revision": 100000,
    "revision_parents": 200000,
    "revision_bytes": 100 * 1024 * 1024,
    "release": 100000,
    "release_bytes": 100 * 1024 * 1024,
    "snapshot": 25000,
    "extid": 10000,
}


[docs] def estimate_revision_size(revision: Revision) -> int: """Estimate the size of a revision, by summing the size of variable length fields""" s = 20 * len(revision.parents) if revision.message: s += len(revision.message) if revision.author is not None: s += len(revision.author.fullname) if revision.committer is not None: s += len(revision.committer.fullname) s += sum(len(h) + len(v) for h, v in revision.extra_headers) return s
[docs] def estimate_release_size(release: Release) -> int: """Estimate the size of a release, by summing the size of variable length fields""" s = 0 if release.message: s += len(release.message) if release.author: s += len(release.author.fullname) return s
[docs] class BufferingProxyStorage: """ Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. When the number of objects of any given type exceeds the configure threshold, then objects of this type (and of other types, see the 'Flush order' below) are flushed to the backend. Deduplicates values based on a tuple of keys depending on the object type. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 directory_entries: 100000 raw_extrinsic_metadata: 1000 revision: 1000 revision_parents: 2000 revision_bytes: 100000000 release: 10000 release_bytes: 100000000 snapshot: 5000 Flush order =========== In order not to create holes when the process crashes (eg. by adding a revision but crashing before adding its root directory), objects are always flushed in reverse topological order of their types: {flush_order} This guarantees not to create holes between objects of different types. However, holes may still be created between objects of the same type when using a backend storage which inserts neither sequentially not transactionally, such as the :mod:`Cassandra <swh.storage.cassandra>` backend or the :mod:`tenacious <swh.storage.proxies.tenacious>` proxy, so this is mostly best effort, and relies on both of these inserting objects **mostly** in sequential order. """.format( flush_order="\n ".join("#. " + object_type for object_type in OBJECT_TYPES) ) def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): self.storage: StorageInterface = get_storage(**storage) self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._sizes: Dict[LObjectType, int] = {k: 0 for k in OBJECT_TYPES} self._directory_entries: int = 0 self._revision_parents: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] if object_type in OBJECT_TYPES: return partial( self.object_add, object_type=object_type, keys=["id"], ) if key == "storage": raise AttributeError(key) return getattr(self.storage, key)
[docs] def content_add(self, contents: Sequence[Content]) -> Dict[str, int]: """Push contents to write to the storage in the buffer. Following policies apply: - if the buffer's threshold is hit, flush content to the storage. - otherwise, if the total size of buffered contents's threshold is hit, flush content to the storage. """ stats = self.object_add( contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) if not stats: # We did not flush based on number of objects; check total size self._sizes["content"] += sum(c.length for c in contents) if self._sizes["content"] >= self._buffer_thresholds["content_bytes"]: return self.flush(["raw_extrinsic_metadata", "content"]) return stats
[docs] def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]: return self.object_add( contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], )
[docs] def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: stats = self.object_add(directories, object_type="directory", keys=["id"]) if not stats: # We did not flush based on number of objects; check the number of entries self._directory_entries += sum(len(d.entries) for d in directories) if self._directory_entries >= self._buffer_thresholds["directory_entries"]: return self.flush(["raw_extrinsic_metadata", "content", "directory"]) return stats
[docs] def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: stats = self.object_add(revisions, object_type="revision", keys=["id"]) if not stats: # We did not flush based on number of objects; check the number of # parents and estimated size self._revision_parents += sum(len(r.parents) for r in revisions) self._sizes["revision"] += sum(estimate_revision_size(r) for r in revisions) if ( self._revision_parents >= self._buffer_thresholds["revision_parents"] or self._sizes["revision"] >= self._buffer_thresholds["revision_bytes"] ): return self.flush( ["raw_extrinsic_metadata", "content", "directory", "revision"] ) return stats
[docs] def release_add(self, releases: Sequence[Release]) -> Dict[str, int]: stats = self.object_add(releases, object_type="release", keys=["id"]) if not stats: # We did not flush based on number of objects; check the estimated size self._sizes["release"] += sum(estimate_release_size(r) for r in releases) if self._sizes["release"] >= self._buffer_thresholds["release_bytes"]: return self.flush( [ "raw_extrinsic_metadata", "content", "directory", "revision", "release", ] ) return stats
[docs] def object_add( self, objects: Sequence[BaseModel], *, object_type: LObjectType, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {}
[docs] def flush( self, object_types: Sequence[LObjectType] = OBJECT_TYPES ) -> Dict[str, int]: summary: Dict[str, int] = {} def update_summary(stats): for k, v in stats.items(): summary[k] = v + summary.get(k, 0) for object_type in object_types: buffer_ = self._objects[object_type] if not buffer_: continue if logger.isEnabledFor(logging.DEBUG): log = "Flushing %s objects of type %s" log_args = [len(buffer_), object_type] if object_type == "content": log += " (%s bytes)" log_args.append( sum(cast(Content, c).length for c in buffer_.values()) ) elif object_type == "directory": log += " (%s entries)" log_args.append( sum(len(cast(Directory, d).entries) for d in buffer_.values()) ) elif object_type == "revision": log += " (%s parents, %s estimated bytes)" log_args.extend( ( sum( len(cast(Revision, r).parents) for r in buffer_.values() ), sum( estimate_revision_size(cast(Revision, r)) for r in buffer_.values() ), ) ) elif object_type == "release": log += " (%s estimated bytes)" log_args.append( sum( estimate_release_size(cast(Release, r)) for r in buffer_.values() ) ) elif object_type == "raw_extrinsic_metadata": log += " (%s estimated bytes)" log_args.append( sum( len(cast(RawExtrinsicMetadata, r).metadata) for r in buffer_.values() ) ) logger.debug(log, *log_args) batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) stats = add_fn(list(batch)) update_summary(stats) # Flush underlying storage stats = self.storage.flush(object_types) update_summary(stats) self.clear_buffers(object_types) return summary
[docs] def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() self._sizes[object_type] = 0 if object_type == "directory": self._directory_entries = 0 elif object_type == "revision": self._revision_parents = 0 self.storage.clear_buffers(object_types)