swh.storage.postgresql.db module#

class swh.storage.postgresql.db.QueryBuilder[source]#

Bases: object

add_query_part(query_part, params: List[Any] = []) None[source]#
add_pagination_clause(pagination_key: List[str], cursor: Any | None, direction: ListOrder | None, limit: int | None, separator: str = 'AND') None[source]#

Create and add a pagination clause to the query

  • pagination_key – Pagination key to be used. Use list of strings to support alias fields

  • cursor – Pagination cursor as a query parameter

  • direction – Sort order

  • limit – Limit as a query parameter

  • separator – Separator to be used as the prefix for the clause

get_query(db_cursor, separator: str = ' ') str[source]#
execute(db_cursor, separator: str = ' ') None[source]#
class swh.storage.postgresql.db.ObjectReferencesPartition(table_name: str, year: int, week: int, start: datetime.datetime, end: datetime.datetime)[source]#

Bases: object

table_name: str#
year: int#
week: int#
start: datetime#
end: datetime#
class swh.storage.postgresql.db.Db(conn: connection, pool: AbstractConnectionPool | None = None)[source]#

Bases: BaseDb

Proxy to the SWH DB, with wrappers around stored procedures

create a DB proxy

  • conn – psycopg2 connection to the SWH DB

  • pool – psycopg2 pool of connections

mktemp_dir_entry(entry_type, cur=None)[source]#
content_update_from_temp(keys_to_update, cur=None)[source]#
content_get_metadata_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status']#
content_add_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', 'ctime']#
skipped_content_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'reason', 'status', 'origin']#
content_get_metadata_from_hashes(hashes: List[bytes], algo: str, cur=None)[source]#
content_get_range(start, end, limit=None, cur=None) Iterator[Tuple][source]#

Retrieve contents within range [start, end].

content_hash_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256']#
content_missing_from_list(contents, cur=None)[source]#
content_missing_per_sha1(sha1s, cur=None)[source]#
content_missing_per_sha1_git(contents, cur=None)[source]#
content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', 'status']#
content_find(sha1: bytes | None = None, sha1_git: bytes | None = None, sha256: bytes | None = None, blake2s256: bytes | None = None, cur=None)[source]#

Find the content optionally on a combination of the following checksums sha1, sha1_git, sha256 or blake2s256.

  • sha1 – sha1 content

  • git_sha1 – the sha1 computed a la git sha1 of the content

  • sha256 – sha256 content

  • blake2s256 – blake2s256 content


The tuple (sha1, sha1_git, sha256, blake2s256) if found or None.

skipped_content_missing(contents, cur=None)[source]#
skipped_content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', 'reason', 'ctime']#
skipped_content_find(sha1: bytes | None = None, sha1_git: bytes | None = None, sha256: bytes | None = None, blake2s256: bytes | None = None, cur=None) List[Tuple[Any]][source]#
directory_missing_from_list(directories, cur=None)[source]#
directory_ls_cols = ['dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256', 'length']#
directory_walk_one(directory, cur=None)[source]#
directory_walk(directory, cur=None)[source]#
directory_entry_get_by_path(directory, paths, cur=None)[source]#

Retrieve a directory entry by path.

directory_get_entries_cols = ['type', 'target', 'name', 'perms']#
directory_get_entries(directory: bytes, cur=None) List[Tuple][source]#
directory_get_raw_manifest(directory_ids: List[bytes], cur=None) Iterable[Tuple[bytes, bytes]][source]#
directory_get_id_range(start, end, limit=None, cur=None) Iterator[Tuple[bytes]][source]#
revision_missing_from_list(revisions, cur=None)[source]#
revision_add_cols = ['id', 'date', 'date_offset', 'date_neg_utc_offset', 'date_offset_bytes', 'committer_date', 'committer_date_offset', 'committer_date_neg_utc_offset', 'committer_date_offset_bytes', 'type', 'directory', 'message', 'author_fullname', 'author_name', 'author_email', 'committer_fullname', 'committer_name', 'committer_email', 'metadata', 'synthetic', 'extra_headers', 'raw_manifest']#
revision_get_cols = ['id', 'date', 'date_offset', 'date_neg_utc_offset', 'date_offset_bytes', 'committer_date', 'committer_date_offset', 'committer_date_neg_utc_offset', 'committer_date_offset_bytes', 'type', 'directory', 'message', 'author_fullname', 'author_name', 'author_email', 'committer_fullname', 'committer_name', 'committer_email', 'metadata', 'synthetic', 'extra_headers', 'raw_manifest', 'parents']#
static mangle_query_key(key, main_table, id_col, ignore_displayname=False)[source]#
revision_get_from_list(revisions, ignore_displayname=False, cur=None)[source]#
revision_get_range(start, end, limit, cur=None) Iterator[Tuple][source]#
revision_log(root_revisions, ignore_displayname=False, limit=None, cur=None)[source]#
revision_shortlog_cols = ['id', 'parents']#
revision_shortlog(root_revisions, limit=None, cur=None)[source]#
extid_cols = ['extid', 'extid_version', 'extid_type', 'target', 'target_type']#
extid_get_from_extid_list(extid_type: str, ids: List[bytes], version: int | None = None, cur=None)[source]#
extid_get_from_swhid_list(target_type: str, ids: List[bytes], extid_version: int | None = None, extid_type: str | None = None, cur=None)[source]#
extid_delete_for_target(target_rows: List[Tuple[str, bytes]], cur=None) Dict[str, int][source]#
release_missing_from_list(releases, cur=None)[source]#
release_add_cols = ['id', 'target', 'target_type', 'date', 'date_offset', 'date_neg_utc_offset', 'date_offset_bytes', 'name', 'comment', 'synthetic', 'raw_manifest', 'author_fullname', 'author_name', 'author_email']#
release_get_cols = ['id', 'target', 'target_type', 'date', 'date_offset', 'date_neg_utc_offset', 'date_offset_bytes', 'name', 'comment', 'synthetic', 'raw_manifest', 'author_fullname', 'author_name', 'author_email']#
release_get_from_list(releases, ignore_displayname=False, cur=None)[source]#
release_get_range(start, end, limit, cur=None) Iterator[Tuple][source]#
snapshot_exists(snapshot_id, cur=None)[source]#

Check whether a snapshot with the given id exists

snapshot_missing_from_list(snapshots, cur=None)[source]#
snapshot_add(snapshot_id, cur=None)[source]#

Add a snapshot from the temporary table

snapshot_count_cols = ['target_type', 'count']#
snapshot_count_branches(snapshot_id, branch_name_exclude_prefix=None, cur=None)[source]#
snapshot_get_cols = ['snapshot_id', 'name', 'target', 'target_type']#
snapshot_get_by_id(snapshot_id, branches_from=b'', branches_count=None, target_types=None, branch_name_include_substring=None, branch_name_exclude_prefix=None, cur=None)[source]#
snapshot_branch_get_by_name(cols_to_fetch, snapshot_id, branch_name, cur=None)[source]#
snapshot_get_id_range(start, end, limit=None, cur=None) Iterator[Tuple[bytes]][source]#
origin_visit_add(origin, ts, type, cur=None)[source]#

Add a new origin_visit for origin origin at timestamp ts.

  • origin – origin concerned by the visit

  • ts – the date of the visit

  • type – type of loader for the visit


The new visit index step for that origin

origin_visit_status_cols = ['origin', 'visit', 'date', 'type', 'status', 'snapshot', 'metadata']#
origin_visit_status_add(visit_status: OriginVisitStatus, cur=None) None[source]#

Add new origin visit status

origin_visit_cols = ['origin', 'visit', 'date', 'type']#
origin_visit_add_with_id(origin_visit: OriginVisit, cur=None) None[source]#

Insert origin visit when id are already set

origin_visit_get_cols = ['origin', 'visit', 'date', 'type', 'status', 'metadata', 'snapshot']#
origin_visit_select_cols = ['o.url AS origin', 'ov.visit', 'ov.date', 'ov.type AS type', 'ovs.status', 'ovs.snapshot', 'ovs.metadata']#
origin_visit_status_select_cols = ['o.url AS origin', 'ovs.visit', 'ovs.date', 'ovs.type AS type', 'ovs.status', 'ovs.snapshot', 'ovs.metadata']#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: List[str] | None = None, require_snapshot: bool = False, cur=None) Dict[str, Any] | None[source]#

Given an origin visit id, return its latest origin_visit_status

origin_visit_status_get_range(origin: str, visit: int, date_from: datetime | None, order: ListOrder, limit: int, cur=None)[source]#

Retrieve visit_status rows for visit (origin, visit) in a paginated way.

origin_visit_get_range(origin: str, visit_from: int, order: ListOrder, limit: int, cur=None)[source]#
origin_visit_status_get_all_in_range(origin: str, allowed_statuses: List[str] | None, require_snapshot: bool, visit_from: int, visit_to: int, cur=None)[source]#
origin_visit_get(origin_id, visit_id, cur=None)[source]#

Retrieve information on visit visit_id of origin origin_id.

  • origin_id – the origin concerned

  • visit_id – The visit step for that origin


The origin_visit information

origin_visit_find_by_date(origin, visit_date, type=None, cur=None)[source]#
origin_visit_exists(origin_id, visit_id, cur=None)[source]#

Check whether an origin visit with the given ids exists

origin_visit_get_latest(origin_id: str, type: str | None, allowed_statuses: Iterable[str] | None, require_snapshot: bool, cur=None)[source]#

Retrieve the most recent origin_visit of the given origin, with optional filters.

  • origin_id – the origin concerned

  • type – Optional visit type to filter on

  • allowed_statuses – the visit statuses allowed for the returned visit

  • require_snapshot (bool) – If True, only a visit with a known snapshot will be returned.


The origin_visit information, or None if no visit matches.

origin_visit_get_random(type, cur=None)[source]#

Randomly select one origin visit that was full and in the last 3 months

origin_add(url, cur=None)[source]#

Insert a new origin and return the new identifier.

origin_cols = ['url']#
origin_get_by_url(origins, cur=None)[source]#

Retrieve origin (type, url) from urls if found.

origin_get_by_sha1(sha1s, cur=None)[source]#

Retrieve origin urls from sha1s if found.

origin_id_get_by_url(origins, cur=None)[source]#

Retrieve origin (type, url) from urls if found.

origin_get_range_cols = ['id', 'url']#
origin_get_range(origin_from: int = 1, origin_count: int = 100, cur=None)[source]#

Retrieve origin_count origins whose ids are greater or equal than origin_from.

Origins are sorted by id before retrieving them.

  • origin_from – the minimum id of origins to retrieve

  • origin_count – the maximum number of origins to retrieve

Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way.

  • url_pattern – the string pattern to search for in origin urls

  • offset – number of found origins to skip before returning results

  • limit – the maximum number of found origins to return

  • regexp – if True, consider the provided pattern as a regular expression and returns origins whose urls match it

  • with_visit – if True, filter out origins with no visit

origin_count(url_pattern, regexp=False, with_visit=False, cur=None)[source]#

Count origins whose urls contain a provided string pattern or match a provided regular expression. The pattern search in origin urls is performed in a case insensitive way.

  • url_pattern (str) – the string pattern to search for in origin urls

  • regexp (bool) – if True, consider the provided pattern as a regular expression and returns origins whose urls match it

  • with_visit (bool) – if True, filter out origins with no visit

origin_snapshot_get_all(origin_url: str, cur=None) Iterable[bytes][source]#
object_find_by_sha1_git_cols = ['sha1_git', 'type']#
object_find_by_sha1_git(ids, cur=None)[source]#
raw_extrinsic_metadata_get_cols = ['raw_extrinsic_metadata.target', 'raw_extrinsic_metadata.type', 'discovery_date', 'metadata_authority.type', 'metadata_authority.url', 'metadata_fetcher.id', 'metadata_fetcher.name', 'metadata_fetcher.version', 'origin', 'visit', 'snapshot', 'release', 'revision', 'path', 'directory', 'format', 'raw_extrinsic_metadata.metadata']#

List of columns of the raw_extrinsic_metadata, metadata_authority, and metadata_fetcher tables, used when reading object metadata.

raw_extrinsic_metadata_add(id: bytes, type: str, target: str, discovery_date: datetime, authority_id: int, fetcher_id: int, format: str, metadata: bytes, origin: str | None, visit: int | None, snapshot: str | None, release: str | None, revision: str | None, path: bytes | None, directory: str | None, cur)[source]#
raw_extrinsic_metadata_get(target: str, authority_id: int, after_time: datetime | None, after_fetcher: int | None, limit: int, cur)[source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes], cur=None)[source]#
raw_extrinsic_metadata_get_authorities(id: str, cur=None)[source]#
metadata_fetcher_cols = ['name', 'version']#
metadata_fetcher_add(name: str, version: str, cur=None) None[source]#
metadata_fetcher_get(name: str, version: str, cur=None)[source]#
metadata_fetcher_get_id(name: str, version: str, cur=None) int | None[source]#
metadata_authority_cols = ['type', 'url']#
metadata_authority_add(type: str, url: str, cur=None) None[source]#
metadata_authority_get(type: str, url: str, cur=None)[source]#
metadata_authority_get_id(type: str, url: str, cur=None) int | None[source]#
object_references_get(target_type: str, target: bytes, limit: int, cur=None)[source]#
object_references_add(reference_rows, cur=None) None[source]#
object_references_create_partition(year: int, week: int, cur=None) Tuple[date, date][source]#

Create the partition of the object_references table for the given ISO year and week.

object_references_drop_partition(year: int, week: int, cur=None) None[source]#

Delete the partition of the object_references table for the given ISO year and week.

object_references_list_partitions(cur=None) List[ObjectReferencesPartition][source]#

List existing partitions of the object_references table, ordered from oldest to the most recent.

object_delete(object_rows: List[Tuple[str, bytes]], cur=None) Dict[str, int][source]#