swh.vault.backend module

swh.vault.backend.batch_to_bytes(batch: List[Tuple[str, str]]) → List[Tuple[str, bytes]][source]
class swh.vault.backend.VaultBackend(**config)[source]

Bases: object

Backend for the Software Heritage Vault.

progress(obj_type: str, obj_id: Union[str, bytes], raise_notfound: bool = True) → Optional[Dict[str, Any]][source]
create_task(obj_type: str, obj_id: bytes, sticky: bool = False)[source]

Create and send a cooking task

add_notif_email(obj_type: str, obj_id: bytes, email: str)[source]

Add an e-mail address to notify when a given bundle is ready

put_bundle(obj_type: str, obj_id: Union[str, bytes], bundle) → bool[source]
cook(obj_type: str, obj_id: Union[str, bytes], *, sticky: bool = False, email: Optional[str] = None) → Dict[str, Any][source]
batch_cook(batch: List[Tuple[str, str]]) → Dict[str, int][source]
batch_progress(batch_id: int) → Dict[str, Any][source]
is_available(obj_type: str, obj_id: Union[str, bytes])[source]

Check whether a bundle is available for retrieval

fetch(obj_type: str, obj_id: Union[str, bytes], raise_notfound=True)[source]

Retrieve a bundle from the cache

update_access_ts(obj_type: str, obj_id: bytes)[source]

Update the last access timestamp of a bundle

set_status(obj_type: str, obj_id: Union[str, bytes], status: str) → bool[source]
set_progress(obj_type: str, obj_id: Union[str, bytes], progress: str) → bool[source]
send_notif(obj_type: str, obj_id: Union[str, bytes]) → bool[source]
send_notification(n_id: Optional[int], email: str, obj_type: str, hex_id: str, status: str, progress_msg: Optional[str] = None) → None[source]

Send the notification of a bundle to a specific e-mail

cache_expire_oldest(n=1, by='last_access') → None[source]

Expire the n oldest bundles

cache_expire_until(date, by='last_access') → None[source]

Expire all the bundles until a certain date