Source code for swh.storage.proxies.buffer

# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from functools import partial
import logging
from typing import Dict, Iterable, Mapping, Sequence, Tuple, cast

from typing_extensions import Literal

from swh.core.utils import grouper
from swh.model.model import (
    BaseModel,
    Content,
    Directory,
    Release,
    Revision,
    SkippedContent,
)
from swh.storage import get_storage
from swh.storage.interface import StorageInterface

logger = logging.getLogger(__name__)

LObjectType = Literal[
    "content",
    "skipped_content",
    "directory",
    "revision",
    "release",
    "snapshot",
    "extid",
]
OBJECT_TYPES: Tuple[LObjectType, ...] = (
    "content",
    "skipped_content",
    "directory",
    "revision",
    "release",
    "snapshot",
    "extid",
)

DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = {
    "content": 10000,
    "content_bytes": 100 * 1024 * 1024,
    "skipped_content": 10000,
    "directory": 25000,
    "directory_entries": 200000,
    "revision": 100000,
    "revision_parents": 200000,
    "revision_bytes": 100 * 1024 * 1024,
    "release": 100000,
    "release_bytes": 100 * 1024 * 1024,
    "snapshot": 25000,
    "extid": 10000,
}


[docs]def estimate_revision_size(revision: Revision) -> int: """Estimate the size of a revision, by summing the size of variable length fields""" s = 20 * len(revision.parents) if revision.message: s += len(revision.message) if revision.author is not None: s += len(revision.author.fullname) if revision.committer is not None: s += len(revision.committer.fullname) s += sum(len(h) + len(v) for h, v in revision.extra_headers) return s
[docs]def estimate_release_size(release: Release) -> int: """Estimate the size of a release, by summing the size of variable length fields""" s = 0 if release.message: s += len(release.message) if release.author: s += len(release.author.fullname) return s
[docs]class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Deduplicates values based on a tuple of keys depending on the object type. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 directory_entries: 100000 revision: 1000 revision_parents: 2000 revision_bytes: 100000000 release: 10000 release_bytes: 100000000 snapshot: 5000 """ def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): self.storage: StorageInterface = get_storage(**storage) self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 self._directory_entries: int = 0 self._revision_parents: int = 0 self._revision_size: int = 0 self._release_size: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] if object_type in OBJECT_TYPES: return partial( self.object_add, object_type=object_type, keys=["id"], ) if key == "storage": raise AttributeError(key) return getattr(self.storage, key)
[docs] def content_add(self, contents: Sequence[Content]) -> Dict[str, int]: """Push contents to write to the storage in the buffer. Following policies apply: - if the buffer's threshold is hit, flush content to the storage. - otherwise, if the total size of buffered contents's threshold is hit, flush content to the storage. """ stats = self.object_add( contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) if not stats: # We did not flush based on number of objects; check total size self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) return stats
[docs] def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]: return self.object_add( contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], )
[docs] def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: stats = self.object_add(directories, object_type="directory", keys=["id"]) if not stats: # We did not flush based on number of objects; check the number of entries self._directory_entries += sum(len(d.entries) for d in directories) if self._directory_entries >= self._buffer_thresholds["directory_entries"]: return self.flush(["content", "directory"]) return stats
[docs] def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: stats = self.object_add(revisions, object_type="revision", keys=["id"]) if not stats: # We did not flush based on number of objects; check the number of # parents and estimated size self._revision_parents += sum(len(r.parents) for r in revisions) self._revision_size += sum(estimate_revision_size(r) for r in revisions) if ( self._revision_parents >= self._buffer_thresholds["revision_parents"] or self._revision_size >= self._buffer_thresholds["revision_bytes"] ): return self.flush(["content", "directory", "revision"]) return stats
[docs] def release_add(self, releases: Sequence[Release]) -> Dict[str, int]: stats = self.object_add(releases, object_type="release", keys=["id"]) if not stats: # We did not flush based on number of objects; check the estimated size self._release_size += sum(estimate_release_size(r) for r in releases) if self._release_size >= self._buffer_thresholds["release_bytes"]: return self.flush(["content", "directory", "revision", "release"]) return stats
[docs] def object_add( self, objects: Sequence[BaseModel], *, object_type: LObjectType, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {}
[docs] def flush( self, object_types: Sequence[LObjectType] = OBJECT_TYPES ) -> Dict[str, int]: summary: Dict[str, int] = {} def update_summary(stats): for k, v in stats.items(): summary[k] = v + summary.get(k, 0) for object_type in object_types: buffer_ = self._objects[object_type] if not buffer_: continue if logger.isEnabledFor(logging.DEBUG): log = "Flushing %s objects of type %s" log_args = [len(buffer_), object_type] if object_type == "content": log += " (%s bytes)" log_args.append( sum(cast(Content, c).length for c in buffer_.values()) ) elif object_type == "directory": log += " (%s entries)" log_args.append( sum(len(cast(Directory, d).entries) for d in buffer_.values()) ) elif object_type == "revision": log += " (%s parents, %s estimated bytes)" log_args.extend( ( sum( len(cast(Revision, r).parents) for r in buffer_.values() ), sum( estimate_revision_size(cast(Revision, r)) for r in buffer_.values() ), ) ) elif object_type == "release": log += " (%s estimated bytes)" log_args.append( sum( estimate_release_size(cast(Release, r)) for r in buffer_.values() ) ) logger.debug(log, *log_args) batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) stats = add_fn(list(batch)) update_summary(stats) # Flush underlying storage stats = self.storage.flush(object_types) update_summary(stats) self.clear_buffers(object_types) return summary
[docs] def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 elif object_type == "directory": self._directory_entries = 0 elif object_type == "revision": self._revision_parents = 0 self._revision_size = 0 elif object_type == "release": self._release_size = 0 self.storage.clear_buffers(object_types)