swh.storage.postgresql.storage module#

swh.storage.postgresql.storage.EMPTY_SNAPSHOT_ID = b'\x1a\x88\x93\xe6\xa8oDN\x8b\xe8\xe7\xbd\xa6\xcb4\xfb\x175\xa0\x0e'#

Identifier for the empty snapshot

swh.storage.postgresql.storage.VALIDATION_EXCEPTIONS = (<class 'KeyError'>, <class 'TypeError'>, <class 'ValueError'>, <class 'psycopg.errors.CheckViolation'>, <class 'psycopg.IntegrityError'>, <class 'psycopg.errors.InvalidTextRepresentation'>, <class 'psycopg.errors.NotNullViolation'>, <class 'psycopg.errors.NumericValueOutOfRange'>, <class 'psycopg.errors.UndefinedFunction'>, <class 'psycopg.errors.ProgramLimitExceeded'>)#

Exceptions raised by postgresql when validation of the arguments failed.


Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.

swh.storage.postgresql.storage.db_transaction_generator(*args, **kwargs)[source]#
swh.storage.postgresql.storage.db_transaction(*args, **kwargs)[source]#
class swh.storage.postgresql.storage.Storage(db: str | Connection[Any], objstorage: Dict | None = None, min_pool_conns: int = 1, max_pool_conns: int = 10, journal_writer: Dict[str, Any] | None = None, query_options: Dict[str, Dict[str, Any]] | None = None)[source]#

Bases: object

SWH storage datastore proxy, encompassing DB and object storage

Instantiate a storage instance backed by a PostgreSQL database and an objstorage.

When db is passed as a connection string, then this module automatically manages a connection pool between min_pool_conns and max_pool_conns. When db is an explicit psycopg connection, then min_pool_conns and max_pool_conns are ignored and the connection is used directly.

  • db – either a libpq connection string, or a psycopg connection

  • objstorage – configuration for the backend ObjStorage; if unset, use a NoopObjStorage

  • min_pool_conns – min number of connections in the psycopg pool

  • max_pool_conns – max number of connections in the psycopg pool

  • journal_writer – configuration for the JournalWriter

  • query_options

    configuration for the sql connections; keys of the dict are the method names decorated with db_transaction() or db_transaction_generator() (eg. content_find()), and values are dicts (config_name, config_value) used to configure the sql connection for the method_name. For example, using:

    {"content_get": {"statement_timeout": 5000}}

    will override the default statement timeout for the content_get() endpoint from 500ms to 5000ms.

    See swh.core.db.common for more details.

current_version: int = 193#
get_flavor() str[source]#
property flavor: str#
check_config(*, check_write: bool) bool[source]#
content_add(content: List[Content]) Dict[str, int][source]#
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#
content_add_metadata(content: List[Content]) Dict[str, int][source]#
content_get_data(content: HashDict | bytes) bytes | None[source]#
content_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Content, str][source]#
content_get(contents: List[bytes], algo: str = 'sha1') List[Content | None][source]#
content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#
content_find(content: HashDict) List[Content][source]#
content_get_random() bytes[source]#
skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#
skipped_content_find(content: HashDict) List[SkippedContent][source]#
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#
directory_add(directories: List[Directory]) Dict[str, int][source]#
directory_missing(directories: List[bytes]) Iterable[bytes][source]#
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Dict[str, Any] | None[source]#
directory_get_random() bytes[source]#
directory_get_entries(directory_id: bytes, page_token: bytes | None = None, limit: int = 1000) PagedResult[DirectoryEntry, str] | None[source]#
directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, bytes | None][source]#
directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
revision_add(revisions: List[Revision]) Dict[str, int][source]#
revision_missing(revisions: List[bytes]) Iterable[bytes][source]#
revision_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Revision, str][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Revision | None][source]#
revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: int | None = None) Iterable[Dict[str, Any] | None][source]#
revision_shortlog(revisions: List[bytes], limit: int | None = None) Iterable[Tuple[bytes, Tuple[bytes, ...]] | None][source]#
revision_get_random() bytes[source]#
extid_get_from_extid(id_type: str, ids: List[bytes], version: int | None = None) List[ExtID][source]#
extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: str | None = None, extid_version: int | None = None) List[ExtID][source]#
extid_add(ids: List[ExtID]) Dict[str, int][source]#
release_add(releases: List[Release]) Dict[str, int][source]#
release_missing(releases: List[bytes]) Iterable[bytes][source]#
release_get(releases: List[bytes], ignore_displayname: bool = False) List[Release | None][source]#
release_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Release, str][source]#
release_get_random() bytes[source]#
snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#
snapshot_get(snapshot_id: bytes) Dict[str, Any] | None[source]#
snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: bytes | None = None) Dict[str | None, int] | None[source]#
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: List[str] | None = None, branch_name_include_substring: bytes | None = None, branch_name_exclude_prefix: bytes | None = None) PartialBranches | None[source]#
snapshot_get_random() bytes[source]#
snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) SnapshotBranchByNameResponse | None[source]#
origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#
origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisitStatus | None[source]#
origin_visit_get(origin: str, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#
origin_visit_get_with_statuses(origin: str, allowed_statuses: List[str] | None = None, require_snapshot: bool = False, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#
origin_visit_find_by_date(origin: str, visit_date: datetime, type: str | None = None) OriginVisit | None[source]#
origin_visit_get_by(origin: str, visit: int) OriginVisit | None[source]#
origin_visit_get_latest(origin: str, type: str | None = None, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisit | None[source]#
origin_visit_status_get(origin: str, visit: int, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#
origin_visit_status_get_random(type: str) OriginVisitStatus | None[source]#
origin_get(origins: List[str]) List[Origin | None][source]#
origin_get_by_sha1(sha1s: List[bytes]) List[Dict[str, Any] | None][source]#
origin_get_range(origin_from=1, origin_count=100)[source]#
origin_list(page_token: str | None = None, limit: int = 100) PagedResult[Origin, str][source]#
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#
origin_snapshot_get_all(origin_url: str) List[bytes][source]#
origin_add(origins: List[Origin]) Dict[str, int][source]#
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#
raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#
raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: datetime | None = None, page_token: bytes | None = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#
metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#
metadata_fetcher_get(name: str, version: str) MetadataFetcher | None[source]#
raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#
metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#
metadata_authority_get(type: MetadataAuthorityType, url: str) MetadataAuthority | None[source]#
object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#
object_references_add(references: List[ObjectReference]) Dict[str, int][source]#
clear_buffers(object_types: Sequence[str] = ()) None[source]#

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#
object_delete(swhids: List[ExtendedSWHID]) Dict[str, int][source]#

Delete objects from the storage

All skipped content objects matching the given SWHID will be removed, including those who have the same SWHID due to hash collisions.

Origin objects are removed alongside their associated origin visit and origin visit status objects.


swhids – list of SWHID of the objects to remove


number of objects removed. Details of each key:


Number of content objects removed


Sum of the removed contents’ data length


Number of skipped content objects removed


Number of directory objects removed


Number of revision objects removed


Number of release objects removed


Number of snapshot objects removed


Number of origin objects removed


Number of origin visit objects removed


Number of origin visit status objects removed


Number of raw extrinsic metadata objects targeting an origin that have been removed


Number of raw extrinsic metadata objects targeting a snapshot that have been removed


Number of raw extrinsic metadata objects targeting a revision that have been removed


Number of raw extrinsic metadata objects targeting a release that have been removed


Number ef raw extrinsic metadata objects targeting a directory that have been removed


Number of raw extrinsic metadata objects targeting a content that have been removed


Number of raw extrinsic metadata objects targeting a raw extrinsic metadata object that have been removed

Return type:


extid_delete_for_target(target_swhids: List[CoreSWHID]) Dict[str, int][source]#

Delete ExtID objects from the storage


target_swhids – list of SWHIDs targeted by the ExtID objects to remove


extid:delete: Number of ExtID objects removed

Return type:

Summary dict with the following keys and associated values

object_references_create_partition(year: int, week: int) Tuple[date, date][source]#

Create the partition of the object_references table for the given ISO year and week.

object_references_drop_partition(partition: ObjectReferencesPartition) None[source]#

Delete the partition of the object_references table for the given partition.

object_references_list_partitions() List[ObjectReferencesPartition][source]#

List existing partitions of the object_references table, ordered from oldest to the most recent.