swh.storage.postgresql.storage module

swh.storage.postgresql.storage.EMPTY_SNAPSHOT_ID = b'\x1a\x88\x93\xe6\xa8oDN\x8b\xe8\xe7\xbd\xa6\xcb4\xfb\x175\xa0\x0e'

Identifier for the empty snapshot

swh.storage.postgresql.storage.VALIDATION_EXCEPTIONS = (<class 'KeyError'>, <class 'TypeError'>, <class 'ValueError'>, <class 'psycopg2.errors.CheckViolation'>, <class 'psycopg2.IntegrityError'>, <class 'psycopg2.errors.InvalidTextRepresentation'>, <class 'psycopg2.errors.NotNullViolation'>, <class 'psycopg2.errors.NumericValueOutOfRange'>, <class 'psycopg2.errors.UndefinedFunction'>)

Exceptions raised by postgresql when validation of the arguments failed.

swh.storage.postgresql.storage.convert_validation_exceptions()[source]

Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.

class swh.storage.postgresql.storage.Storage(db, objstorage, min_pool_conns=1, max_pool_conns=10, journal_writer=None)[source]

Bases: object

SWH storage proxy, encompassing DB and object storage

Parameters
  • db_conn – either a libpq connection string, or a psycopg2 connection

  • obj_root – path to the root of the object storage

get_db()[source]
put_db(db)[source]
db()[source]
check_config(*, check_write: bool) bool[source]
content_add(content: List[swh.model.model.Content]) Dict[str, int][source]
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]
content_add_metadata(content: List[swh.model.model.Content]) Dict[str, int][source]
content_get_data(content: bytes) Optional[bytes][source]
content_get_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) swh.core.api.classes.PagedResult[swh.model.model.Content, str][source]
content_get(contents: List[bytes], algo: str = 'sha1') List[Optional[swh.model.model.Content]][source]
content_missing(contents: List[Dict[str, Any]], key_hash: str = 'sha1') Iterable[bytes][source]
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]
content_find(content: Dict[str, Any]) List[swh.model.model.Content][source]
content_get_random() bytes[source]
skipped_content_add(content: List[swh.model.model.SkippedContent]) Dict[str, int][source]
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]
directory_add(directories: List[swh.model.model.Directory]) Dict[str, int][source]
directory_missing(directories: List[bytes]) Iterable[bytes][source]
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Optional[Dict[str, Any]][source]
directory_get_random() bytes[source]
directory_get_entries(directory_id: bytes, page_token: Optional[bytes] = None, limit: int = 1000) Optional[swh.core.api.classes.PagedResult[swh.model.model.DirectoryEntry, str]][source]
revision_add(revisions: List[swh.model.model.Revision]) Dict[str, int][source]
revision_missing(revisions: List[bytes]) Iterable[bytes][source]
revision_get(revision_ids: List[bytes]) List[Optional[swh.model.model.Revision]][source]
revision_log(revisions: List[bytes], limit: Optional[int] = None) Iterable[Optional[Dict[str, Any]]][source]
revision_shortlog(revisions: List[bytes], limit: Optional[int] = None) Iterable[Optional[Tuple[bytes, Tuple[bytes, ...]]]][source]
revision_get_random() bytes[source]
extid_get_from_extid(id_type: str, ids: List[bytes], version: Optional[int] = None) List[swh.model.model.ExtID][source]
extid_get_from_target(target_type: swh.model.swhids.ObjectType, ids: List[bytes], extid_type: Optional[str] = None, extid_version: Optional[int] = None) List[swh.model.model.ExtID][source]
extid_add(ids: List[swh.model.model.ExtID]) Dict[str, int][source]
release_add(releases: List[swh.model.model.Release]) Dict[str, int][source]
release_missing(releases: List[bytes]) Iterable[bytes][source]
release_get(releases: List[bytes]) List[Optional[swh.model.model.Release]][source]
release_get_random() bytes[source]
snapshot_add(snapshots: List[swh.model.model.Snapshot]) Dict[str, int][source]
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]
snapshot_get(snapshot_id: bytes) Optional[Dict[str, Any]][source]
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: Optional[bytes] = None) Optional[Dict[Optional[str], int]][source]
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: Optional[List[str]] = None, branch_name_include_substring: Optional[bytes] = None, branch_name_exclude_prefix: Optional[bytes] = None) Optional[swh.storage.interface.PartialBranches][source]
snapshot_get_random() bytes[source]
origin_visit_add(visits: List[swh.model.model.OriginVisit]) Iterable[swh.model.model.OriginVisit][source]
origin_visit_status_add(visit_statuses: List[swh.model.model.OriginVisitStatus]) Dict[str, int][source]
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) Optional[swh.model.model.OriginVisitStatus][source]
origin_visit_get(origin: str, page_token: Optional[str] = None, order: swh.storage.interface.ListOrder = ListOrder.ASC, limit: int = 10) swh.core.api.classes.PagedResult[swh.model.model.OriginVisit, str][source]
origin_visit_find_by_date(origin: str, visit_date: datetime.datetime) Optional[swh.model.model.OriginVisit][source]
origin_visit_get_by(origin: str, visit: int) Optional[swh.model.model.OriginVisit][source]
origin_visit_get_latest(origin: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) Optional[swh.model.model.OriginVisit][source]
origin_visit_status_get(origin: str, visit: int, page_token: Optional[str] = None, order: swh.storage.interface.ListOrder = ListOrder.ASC, limit: int = 10) swh.core.api.classes.PagedResult[swh.model.model.OriginVisitStatus, str][source]
origin_visit_status_get_random(type: str) Optional[swh.model.model.OriginVisitStatus][source]
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]
origin_get(origins: List[str]) Iterable[Optional[swh.model.model.Origin]][source]
origin_get_by_sha1(sha1s: List[bytes]) List[Optional[Dict[str, Any]]][source]
origin_get_range(origin_from=1, origin_count=100)[source]
origin_list(page_token: Optional[str] = None, limit: int = 100) swh.core.api.classes.PagedResult[swh.model.model.Origin, str][source]
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]
origin_snapshot_get_all(origin_url: str) List[bytes][source]
origin_add(origins: List[swh.model.model.Origin]) Dict[str, int][source]
stat_counters()[source]
refresh_stat_counters()[source]
raw_extrinsic_metadata_add(metadata: List[swh.model.model.RawExtrinsicMetadata]) Dict[str, int][source]
raw_extrinsic_metadata_get(target: swh.model.swhids.ExtendedSWHID, authority: swh.model.model.MetadataAuthority, after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000) swh.core.api.classes.PagedResult[swh.model.model.RawExtrinsicMetadata, str][source]
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[swh.model.model.RawExtrinsicMetadata][source]
raw_extrinsic_metadata_get_authorities(target: swh.model.swhids.ExtendedSWHID) List[swh.model.model.MetadataAuthority][source]
metadata_fetcher_add(fetchers: List[swh.model.model.MetadataFetcher]) Dict[str, int][source]
metadata_fetcher_get(name: str, version: str) Optional[swh.model.model.MetadataFetcher][source]
metadata_authority_add(authorities: List[swh.model.model.MetadataAuthority]) Dict[str, int][source]
metadata_authority_get(type: swh.model.model.MetadataAuthorityType, url: str) Optional[swh.model.model.MetadataAuthority][source]
clear_buffers(object_types: Sequence[str] = ()) None[source]

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]