swh.storage.postgresql.storage module#

swh.storage.postgresql.storage.EMPTY_SNAPSHOT_ID = b'\x1a\x88\x93\xe6\xa8oDN\x8b\xe8\xe7\xbd\xa6\xcb4\xfb\x175\xa0\x0e'#

Identifier for the empty snapshot

swh.storage.postgresql.storage.VALIDATION_EXCEPTIONS = (<class 'KeyError'>, <class 'TypeError'>, <class 'ValueError'>, <class 'psycopg2.errors.CheckViolation'>, <class 'psycopg2.IntegrityError'>, <class 'psycopg2.errors.InvalidTextRepresentation'>, <class 'psycopg2.errors.NotNullViolation'>, <class 'psycopg2.errors.NumericValueOutOfRange'>, <class 'psycopg2.errors.UndefinedFunction'>, <class 'psycopg2.errors.ProgramLimitExceeded'>)#

Exceptions raised by postgresql when validation of the arguments failed.

swh.storage.postgresql.storage.convert_validation_exceptions()[source]#

Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.

class swh.storage.postgresql.storage.Storage(db, objstorage=None, min_pool_conns=1, max_pool_conns=10, journal_writer=None, query_options=None)[source]#

Bases: object

SWH storage datastore proxy, encompassing DB and object storage

Instantiate a storage instance backed by a PostgreSQL database and an objstorage.

When db is passed as a connection string, then this module automatically manages a connection pool between min_pool_conns and max_pool_conns. When db is an explicit psycopg2 connection, then min_pool_conns and max_pool_conns are ignored and the connection is used directly.

Parameters:
  • db – either a libpq connection string, or a psycopg2 connection

  • objstorage – configuration for the backend ObjStorage; if unset, use a NoopObjStorage

  • min_pool_conns – min number of connections in the psycopg2 pool

  • max_pool_conns – max number of connections in the psycopg2 pool

  • journal_writer – configuration for the JournalWriter

  • query_options

    configuration for the sql connections; keys of the dict are the method names decorated with db_transaction() or db_transaction_generator() (eg. content_find()), and values are dicts (config_name, config_value) used to configure the sql connection for the method_name. For example, using:

    {"content_get": {"statement_timeout": 5000}}
    

    will override the default statement timeout for the content_get() endpoint from 500ms to 5000ms.

    See swh.core.db.common for more details.

current_version: int = 190#
get_db()[source]#
put_db(db)[source]#
db()[source]#
get_flavor() str[source]#
property flavor: str#
check_config(*, check_write: bool) bool[source]#
content_add(content: List[Content]) Dict[str, int][source]#
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#
content_add_metadata(content: List[Content]) Dict[str, int][source]#
content_get_data(content: Union[HashDict, bytes]) Optional[bytes][source]#
content_get_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[Content, str][source]#
content_get(contents: List[bytes], algo: str = 'sha1') List[Optional[Content]][source]#
content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#
content_find(content: HashDict) List[Content][source]#
content_get_random() bytes[source]#
skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#
skipped_content_find(content: HashDict) List[SkippedContent][source]#
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#
directory_add(directories: List[Directory]) Dict[str, int][source]#
directory_missing(directories: List[bytes]) Iterable[bytes][source]#
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Optional[Dict[str, Any]][source]#
directory_get_random() bytes[source]#
directory_get_entries(directory_id: bytes, page_token: Optional[bytes] = None, limit: int = 1000) Optional[PagedResult[DirectoryEntry, str]][source]#
directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, Optional[bytes]][source]#
directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[bytes, str][source]#
revision_add(revisions: List[Revision]) Dict[str, int][source]#
revision_missing(revisions: List[bytes]) Iterable[bytes][source]#
revision_get_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[Revision, str][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Optional[Revision]][source]#
revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: Optional[int] = None) Iterable[Optional[Dict[str, Any]]][source]#
revision_shortlog(revisions: List[bytes], limit: Optional[int] = None) Iterable[Optional[Tuple[bytes, Tuple[bytes, ...]]]][source]#
revision_get_random() bytes[source]#
extid_get_from_extid(id_type: str, ids: List[bytes], version: Optional[int] = None) List[ExtID][source]#
extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: Optional[str] = None, extid_version: Optional[int] = None) List[ExtID][source]#
extid_add(ids: List[ExtID]) Dict[str, int][source]#
release_add(releases: List[Release]) Dict[str, int][source]#
release_missing(releases: List[bytes]) Iterable[bytes][source]#
release_get(releases: List[bytes], ignore_displayname: bool = False) List[Optional[Release]][source]#
release_get_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[Release, str][source]#
release_get_random() bytes[source]#
snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#
snapshot_get(snapshot_id: bytes) Optional[Dict[str, Any]][source]#
snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[bytes, str][source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: Optional[bytes] = None) Optional[Dict[Optional[str], int]][source]#
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: Optional[List[str]] = None, branch_name_include_substring: Optional[bytes] = None, branch_name_exclude_prefix: Optional[bytes] = None) Optional[PartialBranches][source]#
snapshot_get_random() bytes[source]#
snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) Optional[SnapshotBranchByNameResponse][source]#
origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#
origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) Optional[OriginVisitStatus][source]#
origin_visit_get(origin: str, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#
origin_visit_get_with_statuses(origin: str, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#
origin_visit_find_by_date(origin: str, visit_date: datetime) Optional[OriginVisit][source]#
origin_visit_get_by(origin: str, visit: int) Optional[OriginVisit][source]#
origin_visit_get_latest(origin: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) Optional[OriginVisit][source]#
origin_visit_status_get(origin: str, visit: int, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#
origin_visit_status_get_random(type: str) Optional[OriginVisitStatus][source]#
origin_get(origins: List[str]) Iterable[Optional[Origin]][source]#
origin_get_by_sha1(sha1s: List[bytes]) List[Optional[Dict[str, Any]]][source]#
origin_get_range(origin_from=1, origin_count=100)[source]#
origin_list(page_token: Optional[str] = None, limit: int = 100) PagedResult[Origin, str][source]#
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#
origin_snapshot_get_all(origin_url: str) List[bytes][source]#
origin_add(origins: List[Origin]) Dict[str, int][source]#
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#
stat_counters()[source]#
refresh_stat_counters()[source]#
raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#
raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: Optional[datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#
metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#
metadata_fetcher_get(name: str, version: str) Optional[MetadataFetcher][source]#
raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#
metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#
metadata_authority_get(type: MetadataAuthorityType, url: str) Optional[MetadataAuthority][source]#
object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#
object_references_add(references: List[ObjectReference]) Dict[str, int][source]#
clear_buffers(object_types: Sequence[str] = ()) None[source]#

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#