swh.vault.backend module#

class swh.vault.backend.VaultDB(**config)[source]#

Bases: object

PostgreSQL backend for the Software Heritage Vault.

current_version = 4#
get_db()[source]#
put_db(db)[source]#
class swh.vault.backend.VaultBackend(**config)[source]#

Bases: VaultDB

Backend for the Software Heritage Vault.

progress(bundle_type: str, swhid: CoreSWHID, raise_notfound: bool = True) Dict[str, Any] | None[source]#
create_task(bundle_type: str, swhid: CoreSWHID, sticky: bool = False)[source]#

Create and send a cooking task

add_notif_email(bundle_type: str, swhid: CoreSWHID, email: str)[source]#

Add an e-mail address to notify when a given bundle is ready

put_bundle(bundle_type: str, swhid: CoreSWHID, bundle) bool[source]#
cook(bundle_type: str, swhid: CoreSWHID, *, sticky: bool = False, email: str | None = None) Dict[str, Any][source]#
batch_cook(batch: List[Tuple[str, str]]) Dict[str, int][source]#
batch_progress(batch_id: int) Dict[str, Any][source]#
is_available(bundle_type: str, swhid: CoreSWHID)[source]#

Check whether a bundle is available for retrieval

fetch(bundle_type: str, swhid: CoreSWHID, raise_notfound=True) bytes | None[source]#

Retrieve a bundle from the cache

download_url(bundle_type: str, swhid: CoreSWHID, content_disposition: str | None = None, expiry: timedelta | None = None, raise_notfound=True) str | None[source]#

Obtain a bundle direct download link from the cache if supported

update_access_ts(bundle_type: str, swhid: CoreSWHID)[source]#

Update the last access timestamp of a bundle

set_status(bundle_type: str, swhid: CoreSWHID, status: str) bool[source]#
set_progress(bundle_type: str, swhid: CoreSWHID, progress: str) bool[source]#
send_notif(bundle_type: str, swhid: CoreSWHID) bool[source]#
send_notification(n_id: int | None, email: str, bundle_type: str, swhid: CoreSWHID, status: str, progress_msg: str | None = None) None[source]#

Send the notification of a bundle to a specific e-mail

cache_expire_oldest(n=1, by='last_access') None[source]#

Expire the n oldest bundles

cache_expire_until(date, by='last_access') None[source]#

Expire all the bundles until a certain date