swh.objstorage.multiplexer module#

class swh.objstorage.multiplexer.ObjStorageThread(storage)[source]#

Bases: Thread

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run()[source]#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

queue_command(command, *args, mailbox=None, **kwargs)[source]#

Enqueue a new command to be processed by the thread.

Parameters:
  • command (str) – one of the method names for the underlying storage.

  • mailbox (queue.Queue) – explicit mailbox if the calling thread wants to override it.

  • args – arguments for the command.

  • kwargs – arguments for the command.

Returns:

The mailbox you can read the response from

Return type:

queue.Queue

queue_result(mailbox, result_type, result)[source]#

Enqueue a new result in the mailbox

This also provides a reference to the storage, which can be useful when an exceptional condition arises.

Parameters:
  • mailbox (queue.Queue) – the mailbox to which we need to enqueue the result

  • result_type (str) – one of ‘result’, ‘exception’

  • result – the result to pass back to the calling thread

static get_result_from_mailbox(mailbox, *args, **kwargs)[source]#

Unpack the result from the mailbox.

Parameters:
  • mailbox (queue.Queue) – A mailbox to unpack a result from

  • args – positional arguments to mailbox.get()

  • kwargs – keyword arguments to mailbox.get()

Returns:

the next result unpacked from the queue

Raises:

either the exception we got back from the underlying storage, – or queue.Empty if mailbox.get() raises that.

static collect_results(mailbox, num_results)[source]#

Collect num_results from the mailbox

class swh.objstorage.multiplexer.MultiplexerObjStorage(*, objstorages: Iterable[ObjStorageInterface | Dict], read_exception_cooldown: float = 5, transient_read_exceptions: List[str] | None = None, readonly: bool = False, **kwargs)[source]#

Bases: ObjStorage

Implementation of ObjStorage that distributes between multiple storages.

The multiplexer object storage allows an input to be demultiplexed among multiple storages that will or will not accept it by themselves.

As the ids can be different, no pre-computed ids should be submitted. Also, there are no guarantees that the returned ids can be used directly into the storages that the multiplexer manage.

Use case examples follow.

Example:

storage_v1 = ReadOnlyProxyObjStorage(
                PathSlicingObjStorage('/dir1', '0:2/2:4/4:6')
)
storage_v2 = PathSlicingObjStorage('/dir2', '0:1/0:5')
storage = MultiplexerObjStorage([storage_v1, storage_v2])

When using ‘storage’, all the new contents will only be added to the v2 storage, while it will be retrievable from both.

name: str = 'multiplexer'#

Default objstorage name; can be overloaded at instantiation time giving a ‘name’ argument to the constructor

reset_active_readers()[source]#

Reset the active readers set to all storages, and cancel all reset_failed_threads

disable_backend(endpoint: str, name: str, i: int) None[source]#

Mark read backend name at index i as failed, from endpoint endpoint.

enable_backend(name: str, i: int)[source]#

Mark a reader as available again

wrap_call(threads, call, *args, **kwargs)[source]#
get_read_threads(obj_id=None)[source]#
get_write_threads(obj_id=None)[source]#
check_config(*, check_write)[source]#

Check whether the object storage is properly configured.

If check_write is True, return True if at least one object storage returned True.

Parameters:
  • check_write (bool) – if True, check if writes to the object storage

  • succeed. (can)

Returns:

True if the configuration check worked, an exception if it didn’t.

add(content: bytes, obj_id: bytes | CompositeObjId, check_presence: bool = True) None[source]#

Add a new object to the object storage.

If the adding step works in all the storages that accept this content, this is a success. Otherwise, the full adding step is an error even if it succeed in some of the storages.

Parameters:
  • content – content of the object to be added to the storage.

  • obj_id – checksum of [bytes] using [ID_HASH_ALGO] algorithm. When given, obj_id will be trusted to match the bytes. If missing, obj_id will be computed on the fly.

  • check_presence – indicate if the presence of the content should be verified before adding the file.

Returns:

an id of the object into the storage. As the write-storages are always readable as well, any id will be valid to retrieve a content.

add_batch(contents: Mapping[bytes, bytes] | Iterable[Tuple[bytes | CompositeObjId, bytes]], check_presence: bool = True) Dict[source]#

Add a batch of new objects to the object storage.

restore(content: bytes, obj_id: bytes | CompositeObjId) None[source]#
get(obj_id: bytes | CompositeObjId) bytes[source]#
check(obj_id: bytes | CompositeObjId) None[source]#

Check if a content is found and recompute its hash to check integrity.

delete(obj_id: bytes | CompositeObjId)[source]#