swh.storage.buffer module

class swh.storage.buffer.BufferingProxyStorage(storage, min_batch_size=None)[source]

Bases: object

Storage implementation in charge of accumulating objects prior to

discussing with the “main” storage.

Sample configuration use case for buffering storage:

storage:
  cls: buffer
  args:
    storage:
      cls: remote
      args: http://storage.internal.staging.swh.network:5002/
    min_batch_size:
      content: 10000
      content_bytes: 100000000
      skipped_content: 10000
      directory: 5000
      revision: 1000
      release: 10000
content_add(content: Iterable[swh.model.model.Content]) → Dict[source]

Enqueue contents to write to the storage.

Following policies apply:

  • First, check if the queue’s threshold is hit. If it is flush content to the storage.

  • If not, check if the total size of enqueued contents’s threshold is hit. If it is flush content to the storage.

skipped_content_add(content: Iterable[swh.model.model.Content]) → Dict[source]
flush(object_types: Optional[Iterable[str]] = None) → Dict[source]
object_add(objects: Iterable[swh.model.model.BaseModel], *, object_type: str, keys: List[str]) → Dict[source]

Enqueue objects to write to the storage. This checks if the queue’s threshold is hit. If it is actually write those to the storage.

clear_buffers(object_types: Optional[Iterable[str]] = None) → None[source]

Clear objects from current buffer.

Warning

data that has not been flushed to storage will be lost when this method is called. This should only be called when flush fails and you want to continue your processing.