swh.vault.backend module#

class swh.vault.backend.VaultDB(**config)[source]#

Bases: object

PostgreSQL backend for the Software Heritage Vault.

current_version = 4#
get_db()[source]#
put_db(db)[source]#
class swh.vault.backend.VaultBackend(**config)[source]#

Bases: VaultDB

Backend for the Software Heritage Vault.

progress(bundle_type: str, swhid: CoreSWHID, raise_notfound: bool = True) Optional[Dict[str, Any]][source]#
create_task(bundle_type: str, swhid: CoreSWHID, sticky: bool = False)[source]#

Create and send a cooking task

add_notif_email(bundle_type: str, swhid: CoreSWHID, email: str)[source]#

Add an e-mail address to notify when a given bundle is ready

put_bundle(bundle_type: str, swhid: CoreSWHID, bundle) bool[source]#
cook(bundle_type: str, swhid: CoreSWHID, *, sticky: bool = False, email: Optional[str] = None) Dict[str, Any][source]#
batch_cook(batch: List[Tuple[str, str]]) Dict[str, int][source]#
batch_progress(batch_id: int) Dict[str, Any][source]#
is_available(bundle_type: str, swhid: CoreSWHID)[source]#

Check whether a bundle is available for retrieval

fetch(bundle_type: str, swhid: CoreSWHID, raise_notfound=True) Optional[bytes][source]#

Retrieve a bundle from the cache

download_url(bundle_type: str, swhid: CoreSWHID, content_disposition: Optional[str] = None, expiry: Optional[timedelta] = None, raise_notfound=True) Optional[str][source]#

Obtain a bundle direct download link from the cache if supported

update_access_ts(bundle_type: str, swhid: CoreSWHID)[source]#

Update the last access timestamp of a bundle

set_status(bundle_type: str, swhid: CoreSWHID, status: str) bool[source]#
set_progress(bundle_type: str, swhid: CoreSWHID, progress: str) bool[source]#
send_notif(bundle_type: str, swhid: CoreSWHID) bool[source]#
send_notification(n_id: Optional[int], email: str, bundle_type: str, swhid: CoreSWHID, status: str, progress_msg: Optional[str] = None) None[source]#

Send the notification of a bundle to a specific e-mail

cache_expire_oldest(n=1, by='last_access') None[source]#

Expire the n oldest bundles

cache_expire_until(date, by='last_access') None[source]#

Expire all the bundles until a certain date