swh.storage.proxies.buffer module#

swh.storage.proxies.buffer.estimate_revision_size(revision: Revision) int[source]#

Estimate the size of a revision, by summing the size of variable length fields

swh.storage.proxies.buffer.estimate_release_size(release: Release) int[source]#

Estimate the size of a release, by summing the size of variable length fields

class swh.storage.proxies.buffer.BufferingProxyStorage(storage: Mapping, min_batch_size: Mapping = {})[source]#

Bases: object

Storage implementation in charge of accumulating objects prior to discussing with the “main” storage. When the number of objects of any given type exceeds the configure threshold, then objects of this type (and of other types, see the ‘Flush order’ below) are flushed to the backend.

Deduplicates values based on a tuple of keys depending on the object type.

Sample configuration use case for buffering storage:

storage:
  cls: buffer
  args:
    storage:
      cls: remote
      args: http://storage.internal.staging.swh.network:5002/
    min_batch_size:
      content: 10000
      content_bytes: 100000000
      skipped_content: 10000
      directory: 5000
      directory_entries: 100000
      raw_extrinsic_metadata: 1000
      revision: 1000
      revision_parents: 2000
      revision_bytes: 100000000
      release: 10000
      release_bytes: 100000000
      snapshot: 5000

Flush order#

In order not to create holes when the process crashes (eg. by adding a revision but crashing before adding its root directory), objects are always flushed in reverse topological order of their types:

  1. raw_extrinsic_metadata

  2. content

  3. content_metadata

  4. skipped_content

  5. directory

  6. revision

  7. release

  8. snapshot

  9. extid

This guarantees not to create holes between objects of different types.

However, holes may still be created between objects of the same type when using a backend storage which inserts neither sequentially not transactionally, such as the Cassandra backend or the tenacious proxy, so this is mostly best effort, and relies on both of these inserting objects mostly in sequential order.

content_add(contents: Sequence[Content]) Dict[str, int][source]#

Push contents to write to the storage in the buffer.

Following policies apply:

  • if the buffer’s threshold is hit, flush content to the storage.

  • otherwise, if the total size of buffered contents’s threshold is hit, flush content to the storage.

content_add_metadata(contents: Sequence[Content]) Dict[str, int][source]#

Push content metadata (content without the payload) to write to the storage in the buffer.

skipped_content_add(contents: Sequence[SkippedContent]) Dict[str, int][source]#
directory_add(directories: Sequence[Directory]) Dict[str, int][source]#
revision_add(revisions: Sequence[Revision]) Dict[str, int][source]#
release_add(releases: Sequence[Release]) Dict[str, int][source]#
object_add(objects: Sequence[BaseModel], *, object_type: Literal['raw_extrinsic_metadata', 'content', 'content_metadata', 'skipped_content', 'directory', 'revision', 'release', 'snapshot', 'extid'], keys: Iterable[str]) Dict[str, int][source]#

Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit.

flush(object_types: Sequence[Literal['raw_extrinsic_metadata', 'content', 'content_metadata', 'skipped_content', 'directory', 'revision', 'release', 'snapshot', 'extid']] = ('raw_extrinsic_metadata', 'content', 'content_metadata', 'skipped_content', 'directory', 'revision', 'release', 'snapshot', 'extid')) Dict[str, int][source]#
clear_buffers(object_types: Sequence[Literal['raw_extrinsic_metadata', 'content', 'content_metadata', 'skipped_content', 'directory', 'revision', 'release', 'snapshot', 'extid']] = ('raw_extrinsic_metadata', 'content', 'content_metadata', 'skipped_content', 'directory', 'revision', 'release', 'snapshot', 'extid')) None[source]#

Clear objects from current buffer.

Warning

data that has not been flushed to storage will be lost when this method is called. This should only be called when flush fails and you want to continue your processing.