swh.storage.postgresql.storage module#

swh.storage.postgresql.storage.EMPTY_SNAPSHOT_ID = b'\x1a\x88\x93\xe6\xa8oDN\x8b\xe8\xe7\xbd\xa6\xcb4\xfb\x175\xa0\x0e'#

Identifier for the empty snapshot

swh.storage.postgresql.storage.VALIDATION_EXCEPTIONS = (<class 'KeyError'>, <class 'TypeError'>, <class 'ValueError'>, <class 'psycopg2.errors.CheckViolation'>, <class 'psycopg2.IntegrityError'>, <class 'psycopg2.errors.InvalidTextRepresentation'>, <class 'psycopg2.errors.NotNullViolation'>, <class 'psycopg2.errors.NumericValueOutOfRange'>, <class 'psycopg2.errors.UndefinedFunction'>, <class 'psycopg2.errors.ProgramLimitExceeded'>)#

Exceptions raised by postgresql when validation of the arguments failed.

swh.storage.postgresql.storage.convert_validation_exceptions()[source]#

Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.

swh.storage.postgresql.storage.db_transaction_generator(*args, **kwargs)[source]#
swh.storage.postgresql.storage.db_transaction(*args, **kwargs)[source]#
class swh.storage.postgresql.storage.Storage(db, objstorage=None, min_pool_conns=1, max_pool_conns=10, journal_writer=None, query_options=None)[source]#

Bases: object

SWH storage datastore proxy, encompassing DB and object storage

Instantiate a storage instance backed by a PostgreSQL database and an objstorage.

When db is passed as a connection string, then this module automatically manages a connection pool between min_pool_conns and max_pool_conns. When db is an explicit psycopg2 connection, then min_pool_conns and max_pool_conns are ignored and the connection is used directly.

Parameters:
  • db – either a libpq connection string, or a psycopg2 connection

  • objstorage – configuration for the backend ObjStorage; if unset, use a NoopObjStorage

  • min_pool_conns – min number of connections in the psycopg2 pool

  • max_pool_conns – max number of connections in the psycopg2 pool

  • journal_writer – configuration for the JournalWriter

  • query_options

    configuration for the sql connections; keys of the dict are the method names decorated with db_transaction() or db_transaction_generator() (eg. content_find()), and values are dicts (config_name, config_value) used to configure the sql connection for the method_name. For example, using:

    {"content_get": {"statement_timeout": 5000}}
    

    will override the default statement timeout for the content_get() endpoint from 500ms to 5000ms.

    See swh.core.db.common for more details.

current_version: int = 192#
get_db()[source]#
put_db(db)[source]#
db()[source]#
get_flavor() str[source]#
property flavor: str#
check_config(*, check_write: bool) bool[source]#
content_add(content: List[Content]) Dict[str, int][source]#
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#
content_add_metadata(content: List[Content]) Dict[str, int][source]#
content_get_data(content: HashDict | bytes) bytes | None[source]#
content_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Content, str][source]#
content_get(contents: List[bytes], algo: str = 'sha1') List[Content | None][source]#
content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#
content_find(content: HashDict) List[Content][source]#
content_get_random() bytes[source]#
skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#
skipped_content_find(content: HashDict) List[SkippedContent][source]#
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#
directory_add(directories: List[Directory]) Dict[str, int][source]#
directory_missing(directories: List[bytes]) Iterable[bytes][source]#
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Dict[str, Any] | None[source]#
directory_get_random() bytes[source]#
directory_get_entries(directory_id: bytes, page_token: bytes | None = None, limit: int = 1000) PagedResult[DirectoryEntry, str] | None[source]#
directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, bytes | None][source]#
directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
revision_add(revisions: List[Revision]) Dict[str, int][source]#
revision_missing(revisions: List[bytes]) Iterable[bytes][source]#
revision_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Revision, str][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Revision | None][source]#
revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: int | None = None) Iterable[Dict[str, Any] | None][source]#
revision_shortlog(revisions: List[bytes], limit: int | None = None) Iterable[Tuple[bytes, Tuple[bytes, ...]] | None][source]#
revision_get_random() bytes[source]#
extid_get_from_extid(id_type: str, ids: List[bytes], version: int | None = None) List[ExtID][source]#
extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: str | None = None, extid_version: int | None = None) List[ExtID][source]#
extid_add(ids: List[ExtID]) Dict[str, int][source]#
release_add(releases: List[Release]) Dict[str, int][source]#
release_missing(releases: List[bytes]) Iterable[bytes][source]#
release_get(releases: List[bytes], ignore_displayname: bool = False) List[Release | None][source]#
release_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Release, str][source]#
release_get_random() bytes[source]#
snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#
snapshot_get(snapshot_id: bytes) Dict[str, Any] | None[source]#
snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: bytes | None = None) Dict[str | None, int] | None[source]#
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: List[str] | None = None, branch_name_include_substring: bytes | None = None, branch_name_exclude_prefix: bytes | None = None) PartialBranches | None[source]#
snapshot_get_random() bytes[source]#
snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) SnapshotBranchByNameResponse | None[source]#
origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#
origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisitStatus | None[source]#
origin_visit_get(origin: str, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#
origin_visit_get_with_statuses(origin: str, allowed_statuses: List[str] | None = None, require_snapshot: bool = False, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#
origin_visit_find_by_date(origin: str, visit_date: datetime, type: str | None = None) OriginVisit | None[source]#
origin_visit_get_by(origin: str, visit: int) OriginVisit | None[source]#
origin_visit_get_latest(origin: str, type: str | None = None, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisit | None[source]#
origin_visit_status_get(origin: str, visit: int, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#
origin_visit_status_get_random(type: str) OriginVisitStatus | None[source]#
origin_get(origins: List[str]) Iterable[Origin | None][source]#
origin_get_by_sha1(sha1s: List[bytes]) List[Dict[str, Any] | None][source]#
origin_get_range(origin_from=1, origin_count=100)[source]#
origin_list(page_token: str | None = None, limit: int = 100) PagedResult[Origin, str][source]#
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#
origin_snapshot_get_all(origin_url: str) List[bytes][source]#
origin_add(origins: List[Origin]) Dict[str, int][source]#
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#
stat_counters()[source]#
refresh_stat_counters()[source]#
raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#
raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: datetime | None = None, page_token: bytes | None = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#
metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#
metadata_fetcher_get(name: str, version: str) MetadataFetcher | None[source]#
raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#
metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#
metadata_authority_get(type: MetadataAuthorityType, url: str) MetadataAuthority | None[source]#
object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#
object_references_add(references: List[ObjectReference]) Dict[str, int][source]#
clear_buffers(object_types: Sequence[str] = ()) None[source]#

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#
object_delete(swhids: List[ExtendedSWHID]) Dict[str, int][source]#

Delete objects from the storage

All skipped content objects matching the given SWHID will be removed, including those who have the same SWHID due to hash collisions.

Origin objects are removed alongside their associated origin visit and origin visit status objects.

Parameters:

swhids – list of SWHID of the objects to remove

Returns:

content:delete: Number of content objects removed content:delete:bytes: Sum of the removed contents’ data length skipped_content:delete: Number of skipped content objects removed directory:delete: Number of directory objects removed revision:delete: Number of revision objects removed release:delete: Number of release objects removed snapshot:delete: Number of snapshot objects removed origin:delete: Number of origin objects removed origin_visit:delete: Number of origin visit objects removed origin_visit_status:delete: Number of origin visit status objects removed

Return type:

Summary dict with the following keys and associated values