swh.storage.storage module

swh.storage.storage.EMPTY_SNAPSHOT_ID = b'\x1a\x88\x93\xe6\xa8oDN\x8b\xe8\xe7\xbd\xa6\xcb4\xfb\x175\xa0\x0e'

Identifier for the empty snapshot

swh.storage.storage.VALIDATION_EXCEPTIONS = [<class 'KeyError'>, <class 'TypeError'>, <class 'ValueError'>, <class 'psycopg2.errors.CheckViolation'>, <class 'psycopg2.IntegrityError'>, <class 'psycopg2.errors.InvalidTextRepresentation'>, <class 'psycopg2.errors.NotNullViolation'>, <class 'psycopg2.errors.NumericValueOutOfRange'>, <class 'psycopg2.errors.UndefinedFunction'>]

Exceptions raised by postgresql when validation of the arguments failed.

swh.storage.storage.convert_validation_exceptions()[source]

Catches postgresql errors related to invalid arguments, and re-raises a StorageArgumentException.

class swh.storage.storage.Storage(db, objstorage, min_pool_conns=1, max_pool_conns=10, journal_writer=None)[source]

Bases: object

SWH storage proxy, encompassing DB and object storage

get_db()[source]
put_db(db)[source]
db()[source]
check_config(*, check_write)[source]
content_add(content: Iterable[swh.model.model.Content]) → Dict[source]
content_update(content, keys=[])[source]
content_add_metadata(content: Iterable[swh.model.model.Content]) → Dict[source]
content_get(content)[source]
content_get_range(start, end, limit=1000)[source]
content_get_partition(partition_id: int, nb_partitions: int, limit: int = 1000, page_token: str = None)[source]
content_get_metadata(contents: List[bytes]) → Dict[bytes, List[Dict]][source]
content_missing(content, key_hash='sha1')[source]
content_missing_per_sha1(contents)[source]
content_missing_per_sha1_git(contents)[source]
content_find(content)[source]
content_get_random()[source]
skipped_content_add(content: Iterable[swh.model.model.SkippedContent]) → Dict[source]
skipped_content_missing(contents)[source]
directory_add(directories: Iterable[swh.model.model.Directory]) → Dict[source]
directory_missing(directories)[source]
directory_ls(directory, recursive=False)[source]
directory_entry_get_by_path(directory, paths)[source]
directory_get_random()[source]
revision_add(revisions: Iterable[swh.model.model.Revision]) → Dict[source]
revision_missing(revisions)[source]
revision_get(revisions)[source]
revision_log(revisions, limit=None)[source]
revision_shortlog(revisions, limit=None)[source]
revision_get_random()[source]
release_add(releases: Iterable[swh.model.model.Release]) → Dict[source]
release_missing(releases)[source]
release_get(releases)[source]
release_get_random()[source]
snapshot_add(snapshots: Iterable[swh.model.model.Snapshot]) → Dict[source]
snapshot_missing(snapshots)[source]
snapshot_get(snapshot_id)[source]
snapshot_get_by_origin_visit(origin, visit)[source]
snapshot_count_branches(snapshot_id)[source]
snapshot_get_branches(snapshot_id, branches_from=b'', branches_count=1000, target_types=None)[source]
snapshot_get_random()[source]
origin_visit_add(visits: Iterable[swh.model.model.OriginVisit]) → Iterable[swh.model.model.OriginVisit][source]
origin_visit_status_add(visit_statuses: Iterable[swh.model.model.OriginVisitStatus]) → None[source]
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) → Optional[swh.model.model.OriginVisitStatus][source]
origin_visit_get(origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None, order: str = 'asc') → Iterable[Dict[str, Any]][source]
origin_visit_find_by_date(origin: str, visit_date: datetime.datetime) → Optional[Dict[str, Any]][source]
origin_visit_get_by(origin: str, visit: int) → Optional[Dict[str, Any]][source]
origin_visit_get_latest(origin: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) → Optional[Dict[str, Any]][source]
origin_visit_get_random(type: str) → Optional[Dict[str, Any]][source]
object_find_by_sha1_git(ids)[source]
origin_get(origins)[source]
origin_get_by_sha1(sha1s)[source]
origin_get_range(origin_from=1, origin_count=100)[source]
origin_list(page_token: Optional[str] = None, limit: int = 100) → dict[source]
origin_count(url_pattern, regexp=False, with_visit=False)[source]
origin_add(origins: Iterable[swh.model.model.Origin]) → Dict[str, int][source]
origin_add_one(origin: swh.model.model.Origin) → str[source]
stat_counters()[source]
refresh_stat_counters()[source]
content_metadata_add(id: str, context: Dict[str, Union[str, bytes, int]], discovery_date: datetime.datetime, authority: Dict[str, Any], fetcher: Dict[str, Any], format: str, metadata: bytes) → None[source]
content_metadata_get(id: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000) → Dict[str, Any][source]
origin_metadata_add(origin_url: str, discovery_date: datetime.datetime, authority: Dict[str, Any], fetcher: Dict[str, Any], format: str, metadata: bytes) → None[source]
origin_metadata_get(origin_url: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000) → Dict[str, Any][source]
metadata_fetcher_add(name: str, version: str, metadata: Dict[str, Any]) → None[source]
metadata_fetcher_get(name: str, version: str) → Optional[Dict[str, Any]][source]
metadata_authority_add(type: str, url: str, metadata: Dict[str, Any]) → None[source]
metadata_authority_get(type: str, url: str) → Optional[Dict[str, Any]][source]
diff_directories(from_dir, to_dir, track_renaming=False)[source]
diff_revisions(from_rev, to_rev, track_renaming=False)[source]
diff_revision(revision, track_renaming=False)[source]
clear_buffers(object_types: Optional[Iterable[str]] = None) → None[source]

Do nothing

flush(object_types: Optional[Iterable[str]] = None) → Dict[source]