swh.vault.backend module

class swh.vault.backend.VaultBackend(**config)[source]

Bases: object

Backend for the Software Heritage Vault.

get_db()[source]
put_db(db)[source]
progress(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, raise_notfound: bool = True) Optional[Dict[str, Any]][source]
create_task(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, sticky: bool = False)[source]

Create and send a cooking task

add_notif_email(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, email: str)[source]

Add an e-mail address to notify when a given bundle is ready

put_bundle(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, bundle) bool[source]
cook(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, *, sticky: bool = False, email: Optional[str] = None) Dict[str, Any][source]
batch_cook(batch: List[Tuple[str, str]]) Dict[str, int][source]
batch_progress(batch_id: int) Dict[str, Any][source]
is_available(bundle_type: str, swhid: swh.model.swhids.CoreSWHID)[source]

Check whether a bundle is available for retrieval

fetch(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, raise_notfound=True) Optional[bytes][source]

Retrieve a bundle from the cache

update_access_ts(bundle_type: str, swhid: swh.model.swhids.CoreSWHID)[source]

Update the last access timestamp of a bundle

set_status(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, status: str) bool[source]
set_progress(bundle_type: str, swhid: swh.model.swhids.CoreSWHID, progress: str) bool[source]
send_notif(bundle_type: str, swhid: swh.model.swhids.CoreSWHID) bool[source]
send_notification(n_id: Optional[int], email: str, bundle_type: str, swhid: swh.model.swhids.CoreSWHID, status: str, progress_msg: Optional[str] = None) None[source]

Send the notification of a bundle to a specific e-mail

cache_expire_oldest(n=1, by='last_access') None[source]

Expire the n oldest bundles

cache_expire_until(date, by='last_access') None[source]

Expire all the bundles until a certain date