swh.storage.buffer module¶
-
class
swh.storage.buffer.
BufferingProxyStorage
(storage: Mapping, min_batch_size: Mapping = {})[source]¶ Bases:
object
- Storage implementation in charge of accumulating objects prior to
discussing with the “main” storage.
Deduplicates values based on a tuple of keys depending on the object type.
Sample configuration use case for buffering storage:
storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 revision: 1000 release: 10000
-
content_add
(contents: Sequence[swh.model.model.Content]) → Dict[source]¶ Push contents to write to the storage in the buffer.
- Following policies apply:
if the buffer’s threshold is hit, flush content to the storage.
otherwise, if the total size of buffered contents’s threshold is hit,
flush content to the storage.
-
skipped_content_add
(contents: Sequence[swh.model.model.SkippedContent]) → Dict[source]¶
-
object_add
(objects: Sequence[swh.model.model.BaseModel], *, object_type: typing_extensions.Literal[content, skipped_content, directory, revision, release], keys: Iterable[str]) → Dict[str, int][source]¶ Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit.
-
flush
(object_types: Sequence[typing_extensions.Literal[content, skipped_content, directory, revision, release]] = ('content', 'skipped_content', 'directory', 'revision', 'release')) → Dict[str, int][source]¶
-
clear_buffers
(object_types: Sequence[typing_extensions.Literal[content, skipped_content, directory, revision, release]] = ('content', 'skipped_content', 'directory', 'revision', 'release')) → None[source]¶ Clear objects from current buffer.
Warning
data that has not been flushed to storage will be lost when this method is called. This should only be called when flush fails and you want to continue your processing.