swh.storage.postgresql.storage module#

swh.storage.postgresql.storage.EMPTY_SNAPSHOT_ID = b'\x1a\x88\x93\xe6\xa8oDN\x8b\xe8\xe7\xbd\xa6\xcb4\xfb\x175\xa0\x0e'#

Identifier for the empty snapshot

swh.storage.postgresql.storage.VALIDATION_EXCEPTIONS = (<class 'KeyError'>, <class 'TypeError'>, <class 'ValueError'>, <class 'psycopg2.errors.CheckViolation'>, <class 'psycopg2.IntegrityError'>, <class 'psycopg2.errors.InvalidTextRepresentation'>, <class 'psycopg2.errors.NotNullViolation'>, <class 'psycopg2.errors.NumericValueOutOfRange'>, <class 'psycopg2.errors.UndefinedFunction'>, <class 'psycopg2.errors.ProgramLimitExceeded'>)#

Exceptions raised by postgresql when validation of the arguments failed.

swh.storage.postgresql.storage.convert_validation_exceptions()[source]#

Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.

swh.storage.postgresql.storage.db_transaction_generator(*args, **kwargs)[source]#
swh.storage.postgresql.storage.db_transaction(*args, **kwargs)[source]#
class swh.storage.postgresql.storage.Storage(db: str | Db, objstorage: Dict | None = None, min_pool_conns: int = 1, max_pool_conns: int = 10, journal_writer: Dict[str, Any] | None = None, query_options: Dict[str, Dict[str, Any]] | None = None)[source]#

Bases: object

SWH storage datastore proxy, encompassing DB and object storage

Instantiate a storage instance backed by a PostgreSQL database and an objstorage.

When db is passed as a connection string, then this module automatically manages a connection pool between min_pool_conns and max_pool_conns. When db is an explicit psycopg2 connection, then min_pool_conns and max_pool_conns are ignored and the connection is used directly.

Parameters:
  • db – either a libpq connection string, or a psycopg2 connection

  • objstorage – configuration for the backend ObjStorage; if unset, use a NoopObjStorage

  • min_pool_conns – min number of connections in the psycopg2 pool

  • max_pool_conns – max number of connections in the psycopg2 pool

  • journal_writer – configuration for the JournalWriter

  • query_options

    configuration for the sql connections; keys of the dict are the method names decorated with db_transaction() or db_transaction_generator() (eg. content_find()), and values are dicts (config_name, config_value) used to configure the sql connection for the method_name. For example, using:

    {"content_get": {"statement_timeout": 5000}}
    

    will override the default statement timeout for the content_get() endpoint from 500ms to 5000ms.

    See swh.core.db.common for more details.

current_version: int = 193#
get_db()[source]#
put_db(db)[source]#
db()[source]#
get_flavor() str[source]#
property flavor: str#
check_config(*, check_write: bool) bool[source]#
content_add(content: List[Content]) Dict[str, int][source]#
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#
content_add_metadata(content: List[Content]) Dict[str, int][source]#
content_get_data(content: HashDict | bytes) bytes | None[source]#
content_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Content, str][source]#
content_get(contents: List[bytes], algo: str = 'sha1') List[Content | None][source]#
content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#
content_find(content: HashDict) List[Content][source]#
content_get_random() bytes[source]#
skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#
skipped_content_find(content: HashDict) List[SkippedContent][source]#
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#
directory_add(directories: List[Directory]) Dict[str, int][source]#
directory_missing(directories: List[bytes]) Iterable[bytes][source]#
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Dict[str, Any] | None[source]#
directory_get_random() bytes[source]#
directory_get_entries(directory_id: bytes, page_token: bytes | None = None, limit: int = 1000) PagedResult[DirectoryEntry, str] | None[source]#
directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, bytes | None][source]#
directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
revision_add(revisions: List[Revision]) Dict[str, int][source]#
revision_missing(revisions: List[bytes]) Iterable[bytes][source]#
revision_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Revision, str][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Revision | None][source]#
revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: int | None = None) Iterable[Dict[str, Any] | None][source]#
revision_shortlog(revisions: List[bytes], limit: int | None = None) Iterable[Tuple[bytes, Tuple[bytes, ...]] | None][source]#
revision_get_random() bytes[source]#
extid_get_from_extid(id_type: str, ids: List[bytes], version: int | None = None) List[ExtID][source]#
extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: str | None = None, extid_version: int | None = None) List[ExtID][source]#
extid_add(ids: List[ExtID]) Dict[str, int][source]#
release_add(releases: List[Release]) Dict[str, int][source]#
release_missing(releases: List[bytes]) Iterable[bytes][source]#
release_get(releases: List[bytes], ignore_displayname: bool = False) List[Release | None][source]#
release_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Release, str][source]#
release_get_random() bytes[source]#
snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#
snapshot_get(snapshot_id: bytes) Dict[str, Any] | None[source]#
snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: bytes | None = None) Dict[str | None, int] | None[source]#
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: List[str] | None = None, branch_name_include_substring: bytes | None = None, branch_name_exclude_prefix: bytes | None = None) PartialBranches | None[source]#
snapshot_get_random() bytes[source]#
snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) SnapshotBranchByNameResponse | None[source]#
origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#
origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisitStatus | None[source]#
origin_visit_get(origin: str, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#
origin_visit_get_with_statuses(origin: str, allowed_statuses: List[str] | None = None, require_snapshot: bool = False, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#
origin_visit_find_by_date(origin: str, visit_date: datetime, type: str | None = None) OriginVisit | None[source]#
origin_visit_get_by(origin: str, visit: int) OriginVisit | None[source]#
origin_visit_get_latest(origin: str, type: str | None = None, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisit | None[source]#
origin_visit_status_get(origin: str, visit: int, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#
origin_visit_status_get_random(type: str) OriginVisitStatus | None[source]#
origin_get(origins: List[str]) List[Origin | None][source]#
origin_get_by_sha1(sha1s: List[bytes]) List[Dict[str, Any] | None][source]#
origin_get_range(origin_from=1, origin_count=100)[source]#
origin_list(page_token: str | None = None, limit: int = 100) PagedResult[Origin, str][source]#
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#
origin_snapshot_get_all(origin_url: str) List[bytes][source]#
origin_add(origins: List[Origin]) Dict[str, int][source]#
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#
stat_counters()[source]#
refresh_stat_counters()[source]#
raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#
raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: datetime | None = None, page_token: bytes | None = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#
metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#
metadata_fetcher_get(name: str, version: str) MetadataFetcher | None[source]#
raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#
metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#
metadata_authority_get(type: MetadataAuthorityType, url: str) MetadataAuthority | None[source]#
object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#
object_references_add(references: List[ObjectReference]) Dict[str, int][source]#
clear_buffers(object_types: Sequence[str] = ()) None[source]#

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#
object_delete(swhids: List[ExtendedSWHID]) Dict[str, int][source]#

Delete objects from the storage

All skipped content objects matching the given SWHID will be removed, including those who have the same SWHID due to hash collisions.

Origin objects are removed alongside their associated origin visit and origin visit status objects.

Parameters:

swhids – list of SWHID of the objects to remove

Returns:

number of objects removed. Details of each key:

content:delete

Number of content objects removed

content:delete:bytes

Sum of the removed contents’ data length

skipped_content:delete

Number of skipped content objects removed

directory:delete

Number of directory objects removed

revision:delete

Number of revision objects removed

release:delete

Number of release objects removed

snapshot:delete

Number of snapshot objects removed

origin:delete

Number of origin objects removed

origin_visit:delete

Number of origin visit objects removed

origin_visit_status:delete

Number of origin visit status objects removed

ori_metadata:delete

Number of raw extrinsic metadata objects targeting an origin that have been removed

snp_metadata:delete

Number of raw extrinsic metadata objects targeting a snapshot that have been removed

rev_metadata:delete

Number of raw extrinsic metadata objects targeting a revision that have been removed

rel_metadata:delete

Number of raw extrinsic metadata objects targeting a release that have been removed

dir_metadata:delete

Number ef raw extrinsic metadata objects targeting a directory that have been removed

cnt_metadata:delete

Number of raw extrinsic metadata objects targeting a content that have been removed

emd_metadata:delete

Number of raw extrinsic metadata objects targeting a raw extrinsic metadata object that have been removed

Return type:

dict

extid_delete_for_target(target_swhids: List[CoreSWHID]) Dict[str, int][source]#

Delete ExtID objects from the storage

Parameters:

target_swhids – list of SWHIDs targeted by the ExtID objects to remove

Returns:

extid:delete: Number of ExtID objects removed

Return type:

Summary dict with the following keys and associated values

object_references_create_partition(year: int, week: int) Tuple[date, date][source]#

Create the partition of the object_references table for the given ISO year and week.

object_references_drop_partition(partition: ObjectReferencesPartition) None[source]#

Delete the partition of the object_references table for the given partition.

object_references_list_partitions() List[ObjectReferencesPartition][source]#

List existing partitions of the object_references table, ordered from oldest to the most recent.