swh.storage.interface module#

class swh.storage.interface.ListOrder(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

Specifies the order for paginated endpoints returning sorted results.

ASC = 'asc'#
DESC = 'desc'#
class swh.storage.interface.PartialBranches[source]#

Bases: TypedDict

Type of the dictionary returned by snapshot_get_branches

id: bytes#

Identifier of the snapshot

branches: Dict[bytes, SnapshotBranch | None]#

A dict of branches contained in the snapshot whose keys are the branches’ names

next_branch: bytes | None#

The name of the first branch not returned or None if the snapshot has less than the request number of branches.

class swh.storage.interface.SnapshotBranchByNameResponse(branch_found: bool, target: SnapshotBranch | None, aliases_followed: List[bytes])[source]#

Bases: object

Object returned by snapshot_branch_get_by_name

Method generated by attrs for class SnapshotBranchByNameResponse.

branch_found#

Branch with the name exists, with or without a target.

target#

Branch target, will be None in case of a dangling branch.

aliases_followed#

List of alias names until (including) the target. This will be of length one for all non alias branches.

class swh.storage.interface.HashDict[source]#

Bases: TypedDict

sha1: bytes#
sha1_git: bytes#
sha256: bytes#
blake2s256: bytes#
class swh.storage.interface.TotalHashDict[source]#

Bases: HashDict

sha1: bytes#
sha1_git: bytes#
sha256: bytes#
blake2s256: bytes#
class swh.storage.interface.OriginVisitWithStatuses(visit: OriginVisit, statuses: List[OriginVisitStatus])[source]#

Bases: object

Method generated by attrs for class OriginVisitWithStatuses.

class swh.storage.interface.ObjectReference(source: ExtendedSWHID, target: ExtendedSWHID)[source]#

Bases: object

Record that the object with SWHID source references the object with SWHID target, meaning that the target needs to exist for the source object to be consistent within the archive.

Method generated by attrs for class ObjectReference.

swh.storage.interface.deprecated(f)[source]#
class swh.storage.interface.StorageInterface(*args, **kwargs)[source]#

Bases: Protocol

check_config(*, check_write: bool) bool[source]#

Check that the storage is configured and ready to go.

content_add(content: List[Content]) Dict[str, int][source]#

Add content blobs to the storage

Parameters:

contents (iterable) –

iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys:

  • data (bytes): the actual content

  • length (int): content length

  • one key for each checksum algorithm in swh.model.hashutil.ALGORITHMS, mapped to the corresponding checksum

  • status (str): one of visible, hidden

Raises:
  • The following exceptions can occur

  • - HashCollision in case of collision

  • - Any other exceptions raise by the db

  • In case of errors, some of the content may have been stored in

  • the DB and in the objstorage.

  • Since additions to both idempotent, that should not be a problem.

Returns:

content:add: New contents added content:add:bytes: Sum of the contents’ length data

Return type:

Summary dict with the following keys and associated values

content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#

Update content blobs to the storage. Does nothing for unknown contents or skipped ones.

Parameters:
  • content

    iterable of dictionaries representing individual pieces of content to update. Each dictionary has the following keys:

    • data (bytes): the actual content

    • length (int): content length (default: -1)

    • one key for each checksum algorithm in swh.model.hashutil.ALGORITHMS, mapped to the corresponding checksum

    • status (str): one of visible, hidden, absent

  • keys (list) – List of keys (str) whose values needs an update, e.g., new hash column

content_add_metadata(content: List[Content]) Dict[str, int][source]#

Add content metadata to the storage (like content_add, but without inserting to the objstorage).

Parameters:

content (iterable) –

iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys:

  • length (int): content length (default: -1)

  • one key for each checksum algorithm in swh.model.hashutil.ALGORITHMS, mapped to the corresponding checksum

  • status (str): one of visible, hidden, absent

  • reason (str): if status = absent, the reason why

  • origin (int): if status = absent, the origin we saw the content in

  • ctime (datetime): time of insertion in the archive

Returns:

content:add: New contents added skipped_content:add: New skipped contents (no data) added

Return type:

Summary dict with the following key and associated values

content_get_data(content: HashDict | bytes) bytes | None[source]#

Given a content identifier, returns its associated data if any.

Parameters:

content – dict of hashes (or just sha1 identifier)

Returns:

raw content data (bytes)

content_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Content, str][source]#

Splits contents into nb_partitions, and returns one of these based on partition_id (which must be in [0, nb_partitions-1])

There is no guarantee on how the partitioning is done, or the result order.

Parameters:
  • partition_id – index of the partition to fetch

  • nb_partitions – total number of partitions to split into

  • page_token – opaque token used for pagination.

  • limit – Limit result (default to 1000)

Returns:

PagedResult of Content model objects within the partition. If next_page_token is None, there is no longer data to retrieve.

content_get(contents: List[bytes], algo: str = 'sha1') List[Content | None][source]#

Retrieve content metadata in bulk

Parameters:
Returns:

List of contents model objects when they exist, None otherwise.

content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#

List content missing from storage

Parameters:
  • content – iterable of dictionaries whose keys are either ‘length’ or an item of swh.model.hashutil.ALGORITHMS; mapped to the corresponding checksum (or length).

  • key_hash – name of the column to use as hash id result (default: ‘sha1’)

Raises:
  • StorageArgumentException when key_hash is unknown.

  • TODO – an exception when we get a hash collision.

Returns:

iterable of missing content ids (as per the key_hash column)

content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#

List content missing from storage based only on sha1.

Parameters:

contents – List of sha1 to check for absence.

Raises:

TODO – an exception when we get a hash collision.

Returns:

Iterable of missing content ids (sha1)

content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#

List content missing from storage based only on sha1_git.

Parameters:

contents (List) – An iterable of content id (sha1_git)

Yields:

missing contents sha1_git

content_find(content: HashDict) List[Content][source]#

Find a content hash in db.

Parameters:

content – a dictionary representing one content hash, mapping checksum algorithm names (see swh.model.hashutil.ALGORITHMS) to checksum values

Raises:

ValueError – in case the key of the dictionary is not sha1, sha1_git nor sha256.

Returns:

an iterable of Content objects matching the search criteria if the content exist. Empty iterable otherwise.

content_get_random() bytes[source]#

Finds a random content id.

Returns:

a sha1_git

skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#

Add contents to the skipped_content list, which contains (partial) information about content missing from the archive.

Parameters:

contents (iterable) –

iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys:

  • length (Optional[int]): content length (default: -1)

  • one key for each checksum algorithm in swh.model.hashutil.ALGORITHMS, mapped to the corresponding checksum; each is optional

  • status (str): must be “absent”

  • reason (str): the reason why the content is absent

  • origin (int): if status = absent, the origin we saw the content in

Raises:
  • The following exceptions can occur

  • - HashCollision in case of collision

  • - Any other exceptions raise by the backend

  • In case of errors, some content may have been stored in

  • the DB and in the objstorage.

  • Since additions to both idempotent, that should not be a problem.

Returns:

skipped_content:add: New skipped contents (no data) added

Return type:

Summary dict with the following key and associated values

skipped_content_find(content: HashDict) List[SkippedContent][source]#

Find skipped content for the given hashes

Parameters:

content – a dictionary representing one content hash, mapping checksum algorithm names (see swh.model.hashutil.ALGORITHMS) to checksum values

Raises:

ValueError – in case the key of the dictionary is not sha1, sha1_git nor sha256.

Returns:

a list of SkippedContent objects matching the search criteria if the skipped content exists. Empty list otherwise.

skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#

List skipped contents missing from storage.

Parameters:

contents – iterable of dictionaries containing the data for each checksum algorithm.

Returns:

Iterable of missing skipped contents as dict

directory_add(directories: List[Directory]) Dict[str, int][source]#

Add directories to the storage

Parameters:

directories (iterable) –

iterable of dictionaries representing the individual directories to add. Each dict has the following keys:

  • id (sha1_git): the id of the directory to add

  • entries (list): list of dicts for each entry in the

    directory. Each dict has the following keys:

    • name (bytes)

    • type (one of ‘file’, ‘dir’, ‘rev’): type of the directory entry (file, directory, revision)

    • target (sha1_git): id of the object pointed at by the directory entry

    • perms (int): entry permissions

Returns:

directory:add: Number of directories actually added

Return type:

Summary dict of keys with associated count as values

directory_missing(directories: List[bytes]) Iterable[bytes][source]#

List directories missing from storage.

Parameters:

directories – list of directory ids

Yields:

missing directory ids

directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#

List entries for one directory.

If recursive=True, names in the path of a dir/file not at the root are concatenated with a slash (/).

Parameters:
  • directory – the directory to list entries from.

  • recursive – if flag on, this list recursively from this directory.

Yields:

directory entries for such directory.

directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Dict[str, Any] | None[source]#

Get the directory entry (either file or dir) from directory with path.

Parameters:
  • directory – directory id

  • paths – path to lookup from the top level directory. From left (top) to right (bottom).

Returns:

The corresponding directory entry as dict if found, None otherwise.

directory_get_entries(directory_id: bytes, page_token: bytes | None = None, limit: int = 1000) PagedResult[DirectoryEntry, str] | None[source]#

Get the content, possibly partial, of a directory with the given id

The entries of the directory are not guaranteed to be returned in any particular order.

The number of results is not guaranteed to be lower than the limit.

Parameters:
  • directory_id – identifier of the directory

  • page_token – opaque string used to get the next results of a search

  • limit – Number of entries to return

Returns:

None if the directory does not exist; a page of DirectoryEntry

objects otherwise.

See also

swh.storage.algos.directories.directory_get() will get all entries for a given directory. swh.storage.algos.directories.directory_get_many() will do the same for a set of directories.

directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, bytes | None][source]#

Returns the raw manifest of directories that do not fit the SWH data model, or None if they do. Directories missing from the archive are not returned at all.

Parameters:

directory_ids – List of directory ids to query

directory_get_random() bytes[source]#

Finds a random directory id.

Returns:

a sha1_git

directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#

Splits directories into nb_partitions, and returns all the ids and raw manifests in one of these based on partition_id (which must be in [0, nb_partitions-1]). This does not return directory entries themselves; they should be retrieved using directory_get_entries() and directory_get_raw_manifest() instead.

There is no guarantee on how the partitioning is done, or the result order.

Parameters:
  • partition_id – index of the partition to fetch

  • nb_partitions – total number of partitions to split into

Returns:

Page of the directories’ sha1_git hashes.

revision_add(revisions: List[Revision]) Dict[str, int][source]#

Add revisions to the storage

Parameters:

revisions (List[dict]) –

iterable of dictionaries representing the individual revisions to add. Each dict has the following keys:

  • id (sha1_git): id of the revision to add

  • date (dict): date the revision was written

  • committer_date (dict): date the revision got added to the origin

  • type (one of ‘git’, ‘tar’): type of the revision added

  • directory (sha1_git): the directory the revision points at

  • message (bytes): the message associated with the revision

  • author (Dict[str, bytes]): dictionary with keys: name, fullname, email

  • committer (Dict[str, bytes]): dictionary with keys: name, fullname, email

  • metadata (jsonb): extra information as dictionary

  • synthetic (bool): revision’s nature (tarball, directory creates synthetic revision`)

  • parents (list[sha1_git]): the parents of this revision

date dictionaries have the form defined in swh.model.

Returns:

Summary dict of keys with associated count as values

revision:add: New objects actually stored in db

revision_missing(revisions: List[bytes]) Iterable[bytes][source]#

List revisions missing from storage

Parameters:

revisions – revision ids

Yields:

missing revision ids

revision_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Revision, str][source]#

Splits revisions into nb_partitions, and returns one of these based on partition_id (which must be in [0, nb_partitions-1])

There is no guarantee on how the partitioning is done, or the result order.

Parameters:
  • partition_id – index of the partition to fetch

  • nb_partitions – total number of partitions to split into

Returns:

Page of Revision model objects within the partition.

revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Revision | None][source]#

Get revisions from storage

Parameters:
  • revisions – revision ids

  • ignore_displayname – return the original author/committer’s full name even if it’s masked by a displayname.

Returns:

list of revision object (if the revision exists or None otherwise)

revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: int | None = None) Iterable[Dict[str, Any] | None][source]#

Fetch revision entry from the given root revisions.

Parameters:
  • revisions – array of root revisions to lookup

  • ignore_displayname – return the original author/committer’s full name even if it’s masked by a displayname.

  • limit – limitation on the output result. Default to None.

Yields:

revision entries log from the given root root revisions

revision_shortlog(revisions: List[bytes], limit: int | None = None) Iterable[Tuple[bytes, Tuple[bytes, ...]] | None][source]#

Fetch the shortlog for the given revisions

Parameters:
  • revisions – list of root revisions to lookup

  • limit – depth limitation for the output

Yields:

a list of (id, parents) tuples

revision_get_random() bytes[source]#

Finds a random revision id.

Returns:

a sha1_git

extid_get_from_extid(id_type: str, ids: List[bytes], version: int | None = None) List[ExtID][source]#

Get ExtID objects from external IDs

Parameters:
  • id_type – type of the given external identifiers (e.g. ‘mercurial’)

  • ids – list of external IDs

  • version – (Optional) version to use as filter

Returns:

list of ExtID objects

extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: str | None = None, extid_version: int | None = None) List[ExtID][source]#

Get ExtID objects from target IDs and target_type

Parameters:
  • target_type – type the SWH object

  • ids – list of target IDs

  • extid_type – (Optional) extid_type to use as filter. This cannot be empty if extid_version is provided.

  • extid_version – (Optional) version to use as filter. This cannot be empty if extid_type is provided.

Raises:

ValueError if extid_version is provided without extid_type and vice versa.

Returns:

list of ExtID objects

extid_add(ids: List[ExtID]) Dict[str, int][source]#

Add a series of ExtID objects

Parameters:

ids – list of ExtID objects

Returns:

Summary dict of keys with associated count as values

extid:add: New ExtID objects actually stored in db

release_add(releases: List[Release]) Dict[str, int][source]#

Add releases to the storage

Parameters:

releases (List[dict]) –

iterable of dictionaries representing the individual releases to add. Each dict has the following keys:

  • id (sha1_git): id of the release to add

  • revision (sha1_git): id of the revision the release points to

  • date (dict): the date the release was made

  • name (bytes): the name of the release

  • comment (bytes): the comment associated with the release

  • author (Dict[str, bytes]): dictionary with keys: name, fullname, email

the date dictionary has the form defined in swh.model.

Returns:

Summary dict of keys with associated count as values

release:add: New objects contents actually stored in db

release_missing(releases: List[bytes]) Iterable[bytes][source]#

List missing release ids from storage

Parameters:

releases – release ids

Yields:

a list of missing release ids

release_get(releases: List[bytes], ignore_displayname: bool = False) List[Release | None][source]#

Given a list of sha1, return the releases’s information

Parameters:
  • releases – list of sha1s

  • ignore_displayname – return the original author’s full name even if it’s masked by a displayname.

Returns:

List of releases matching the identifiers or None if the release does not exist.

release_get_random() bytes[source]#

Finds a random release id.

Returns:

a sha1_git

release_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Release, str][source]#

Splits releases into nb_partitions, and returns one of these based on partition_id (which must be in [0, nb_partitions-1])

There is no guarantee on how the partitioning is done, or the result order.

Parameters:
  • partition_id – index of the partition to fetch

  • nb_partitions – total number of partitions to split into

Returns:

Page of Release model objects within the partition.

snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#

Add snapshots to the storage.

Parameters:

snapshot ([dict]) –

the snapshots to add, containing the following keys:

  • id (bytes): id of the snapshot

  • branches (dict): branches the snapshot contains, mapping the branch name (bytes) to the branch target, itself a dict (or None if the branch points to an unknown object)

    • target_type (str): one of content, directory, revision, release, snapshot, alias

    • target (bytes): identifier of the target (currently a sha1_git for all object kinds, or the name of the target branch for aliases)

Raises:

ValueError – if the origin or visit id does not exist.

Returns:

Summary dict of keys with associated count as values

snapshot:add: Count of object actually stored in db

snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#

List snapshots missing from storage

Parameters:

snapshots – snapshot ids

Yields:

missing snapshot ids

snapshot_get(snapshot_id: bytes) Dict[str, Any] | None[source]#

Get the content, possibly partial, of a snapshot with the given id

The branches of the snapshot are iterated in the lexicographical order of their names.

Warning

At most 1000 branches contained in the snapshot will be returned for performance reasons. In order to browse the whole set of branches, the method snapshot_get_branches() should be used instead.

Parameters:

snapshot_id – snapshot identifier

Returns:

a dict with three keys:
  • id: identifier of the snapshot

  • branches: a dict of branches contained in the snapshot whose keys are the branches’ names.

  • next_branch: the name of the first branch not returned or None if the snapshot has less than 1000 branches.

Return type:

dict

snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: bytes | None = None) Dict[str | None, int] | None[source]#

Count the number of branches in the snapshot with the given id

Parameters:
  • snapshot_id – snapshot identifier

  • branch_name_exclude_prefix – if provided, do not count branches whose name starts with given prefix

Returns:

A dict whose keys are the target types of branches and values their corresponding amount

snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: List[str] | None = None, branch_name_include_substring: bytes | None = None, branch_name_exclude_prefix: bytes | None = None) PartialBranches | None[source]#

Get the content, possibly partial, of a snapshot with the given id

The branches of the snapshot are iterated in the lexicographical order of their names.

Parameters:
  • snapshot_id – identifier of the snapshot

  • branches_from – optional parameter used to skip branches whose name is lesser than it before returning them

  • branches_count – optional parameter used to restrain the amount of returned branches

  • target_types – optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are ‘content’, ‘directory’, ‘revision’, ‘release’, ‘snapshot’, ‘alias’)

  • branch_name_include_substring – if provided, only return branches whose name contains given substring

  • branch_name_exclude_prefix – if provided, do not return branches whose name contains given prefix

Returns:

a PartialBranches object listing a limited amount of branches matching the given criteria or None if the snapshot does not exist.

See also

swh.storage.algos.snapshot.snapshot_get_all_branches() will get all branches for a given snapshot.

snapshot_get_random() bytes[source]#

Finds a random snapshot id.

Returns:

a sha1_git

snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) SnapshotBranchByNameResponse | None[source]#

Get a snapshot branch by its name

Parameters:
  • snapshot_id – Snapshot identifier

  • branch_name – Branch name to look for

  • follow_alias_chain – If True, find the first non alias branch. Return the first branch (alias or non alias) otherwise

  • max_alias_chain_length – Maximum number of alias chains to be followed before treating the branch as dangling. This has no significance when follow_alias_chain is False.

Returns:

A SnapshotBranchByNameResponse object

snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#

Splits directories into nb_partitions, and returns all the ids and raw manifests in one of these based on partition_id (which must be in [0, nb_partitions-1]). This does not return directory entries themselves; they should be retrieved using snapshot_get_branches() instead.

There is no guarantee on how the partitioning is done, or the result order.

Parameters:
  • partition_id – index of the partition to fetch

  • nb_partitions – total number of partitions to split into

Returns:

Page of the snapshots’ sha1_git hashes

origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#

Add visits to storage. If the visits have no id, they will be created and assigned one. The resulted visits are visits with their visit id set.

Parameters:

visits – List of OriginVisit objects to add

Raises:

StorageArgumentException if some origin visit reference unknown origins

Returns:

List[OriginVisit] stored

origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#

Add origin visit statuses.

If there is already a status for the same origin and visit id at the same date, the new one will be either dropped or will replace the existing one (it is unspecified which one of these two behaviors happens).

Parameters:

visit_statuses – origin visit statuses to add

Raises: StorageArgumentException if the origin of the visit status is unknown

origin_visit_get(origin: str, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#

Retrieve page of OriginVisit information.

Parameters:
  • origin – The visited origin

  • page_token – opaque string used to get the next results of a search

  • order – Order on visit id fields to list origin visits (default to asc)

  • limit – Number of visits to return

Raises:
  • StorageArgumentException if the order is wrong or the page_token type is

  • mistyped.

Returns: Page of OriginVisit data model objects. if next_page_token is None,

there is no longer data to retrieve.

See also

swh.storage.algos.origin.iter_origin_visits() will iterate over all OriginVisits for a given origin.

origin_visit_find_by_date(origin: str, visit_date: datetime, type: str | None = None) OriginVisit | None[source]#

Retrieves the origin visit whose date is closest to the provided timestamp. In case of a tie, the visit with largest id is selected.

Parameters:
  • origin – origin (URL)

  • visit_date – expected visit date

  • type – filter on a specific visit type if provided

Returns:

A visit if found, None otherwise

origin_visit_get_by(origin: str, visit: int) OriginVisit | None[source]#

Retrieve origin visit’s information.

Parameters:
  • origin – origin (URL)

  • visit – visit id

Returns:

The information on that particular OriginVisit or None if it does not exist

origin_visit_get_latest(origin: str, type: str | None = None, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisit | None[source]#

Get the latest origin visit for the given origin, optionally looking only for those with one of the given allowed_statuses or for those with a snapshot.

Parameters:
  • origin – origin URL

  • type – Optional visit type to filter on (e.g git, tar, dsc, svn,

  • hg

  • npm

  • pypi

  • ...)

  • allowed_statuses – list of visit statuses considered to find the latest visit. For instance, allowed_statuses=['full'] will only consider visits that have successfully run to completion.

  • require_snapshot – If True, only a visit with a snapshot will be returned.

Raises:
  • StorageArgumentException if values for the allowed_statuses parameters

  • are unknown

Returns:

OriginVisit matching the criteria if found, None otherwise. Note that as OriginVisit no longer held reference on the visit status or snapshot, you may want to use origin_visit_status_get_latest for those information.

origin_visit_status_get(origin: str, visit: int, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#

Retrieve page of OriginVisitStatus information.

Parameters:
  • origin – The visited origin

  • visit – The visit identifier

  • page_token – opaque string used to get the next results of a search

  • order – Order on visit status objects to list (default to asc)

  • limit – Number of visit statuses to return

Returns: Page of OriginVisitStatus data model objects. if next_page_token is

None, there is no longer data to retrieve.

See also

swh.storage.algos.origin.iter_origin_visit_statuses() will iterate over all OriginVisitStatus objects for a given origin and visit.

origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisitStatus | None[source]#

Get the latest origin visit status for the given origin visit, optionally looking only for those with one of the given allowed_statuses or with a snapshot.

Parameters:
  • origin – origin URL

  • allowed_statuses – list of visit statuses considered to find the latest visit. Possible values are {created, ongoing, partial, full}. For instance, allowed_statuses=['full'] will only consider visits that have successfully run to completion.

  • require_snapshot – If True, only a visit with a snapshot will be returned.

Raises:
  • StorageArgumentException if values for the allowed_statuses parameters

  • are unknown

Returns:

The OriginVisitStatus matching the criteria

origin_visit_get_with_statuses(origin: str, allowed_statuses: List[str] | None = None, require_snapshot: bool = False, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#

Retrieve page of origin visits and all their statuses.

Origin visit statuses are always sorted in ascending order of their dates.

Parameters:
  • origin – The visited origin URL

  • allowed_statuses – Only visit statuses matching that list will be returned. If empty, all visit statuses will be returned. Possible status values are created, not_found, ongoing, failed, partial and full.

  • require_snapshot – If True, only visit statuses with a snapshot will be returned.

  • page_token – opaque string used to get the next results

  • order – Order on visit objects to list (default to asc)

  • limit – Number of visits with their statuses to return

Returns: Page of OriginVisitWithStatuses objects. if next_page_token is

None, there is no longer data to retrieve.

origin_visit_status_get_random(type: str) OriginVisitStatus | None[source]#

Randomly select one successful origin visit with <type> made in the last 3 months.

Returns:

One random OriginVisitStatus matching the selection criteria

origin_get(origins: List[str]) List[Origin | None][source]#

Return origins.

Parameters:

origin – a list of urls to find

Returns:

the list of associated existing origin model objects. The unknown origins will be returned as None at the same index as the input.

origin_get_by_sha1(sha1s: List[bytes]) List[Dict[str, Any] | None][source]#

Return origins, identified by the sha1 of their URLs.

Parameters:

sha1s – a list of sha1s

Returns:

List of origins dict whose sha1 of their url match, None otherwise.

origin_list(page_token: str | None = None, limit: int = 100) PagedResult[Origin, str][source]#

Returns the list of origins

Parameters:
  • page_token – opaque token used for pagination.

  • limit – the maximum number of results to return

Returns:

Page of Origin data model objects. if next_page_token is None, there is no longer data to retrieve.

Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way.

Parameters:
  • url_pattern – the string pattern to search for in origin urls

  • page_token – opaque token used for pagination

  • limit – the maximum number of found origins to return

  • regexp – if True, consider the provided pattern as a regular expression and return origins whose urls match it

  • with_visit – if True, filter out origins with no visit

  • visit_types – Only origins having any of the provided visit types (e.g. git, svn, pypi) will be returned

Yields:

PagedResult of Origin

origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#

Count origins whose urls contain a provided string pattern or match a provided regular expression. The pattern search in origin urls is performed in a case insensitive way.

Parameters:
  • url_pattern (str) – the string pattern to search for in origin urls

  • regexp (bool) – if True, consider the provided pattern as a regular expression and return origins whose urls match it

  • with_visit (bool) – if True, filter out origins with no visit

Returns:

The number of origins matching the search criterion.

Return type:

int

origin_snapshot_get_all(origin_url: str) List[bytes][source]#

Return all unique snapshot identifiers resulting from origin visits.

Parameters:

origin_url – origin URL

Returns:

list of sha1s

origin_add(origins: List[Origin]) Dict[str, int][source]#

Add origins to the storage

Parameters:

origins

list of dictionaries representing the individual origins, with the following keys:

  • type: the origin type (‘git’, ‘svn’, ‘deb’, …)

  • url (bytes): the url the origin points to

Returns:

Summary dict of keys with associated count as values

origin:add: Count of object actually stored in db

object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#

Return the SWHIDs of objects that are known to reference the object target_swhid.

Parameters:
  • target_swhid – the SWHID of the object targeted by the returned objects

  • limit – the maximum number of SWHIDs to return

Note

The data returned by this function is by essence limited to objects that were recently added to the archive, and is pruned regularly. For completeness, one must also query swh.graph for backwards edges targeting the requested object.

object_references_add(references: List[ObjectReference]) Dict[str, int][source]#

For each object reference (source, target), record that the source object references the target object (meaning that the target needs to exist for the source object to be consistent within the archive).

This function will only be called internally by a reference recording proxy, through one of directory_add(), revision_add(), release_add(), snapshot_add(), or origin_visit_status_add(). External users of swh.storage should not need to use this function directly.

Note

these records are inserted in time-based partitions that can be pruned when the objects are known in an up-to-date swh.graph instance.

Parameters:

references – a list of (source, target) SWHID tuples

Returns:

object_reference:add: the number of object references added

Return type:

A summary dict with the following keys

object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#

Return the objects found with the given ids.

Parameters:

ids – a generator of sha1_gits

Returns:

A dict from id to the list of objects found for that id. Each object found is itself a dict with keys:

  • sha1_git: the input id

  • type: the type of object found

stat_counters()[source]#

compute statistics about the number of tuples in various tables

Returns:

a dictionary mapping textual labels (e.g., content) to integer values (e.g., the number of tuples in table content)

Return type:

dict

refresh_stat_counters()[source]#

Recomputes the statistics for stat_counters.

raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#

Add extrinsic metadata on objects (contents, directories, …).

The authority and fetcher must be known to the storage before using this endpoint.

If there is already metadata for the same object, authority, fetcher, and at the same date; the new one will be either dropped or will replace the existing one (it is unspecified which one of these two behaviors happens).

Parameters:

metadata – iterable of RawExtrinsicMetadata objects to be inserted.

raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: datetime | None = None, page_token: bytes | None = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#

Retrieve list of all raw_extrinsic_metadata entries targeting the id

Parameters:
  • target – the SWHID of the objects to find metadata on

  • authority – a dict containing keys type and url.

  • after – minimum discovery_date for a result to be returned

  • page_token – opaque token, used to get the next page of results

  • limit – maximum number of results to be returned

Returns:

PagedResult of RawExtrinsicMetadata

Raises:

UnknownMetadataAuthority – if the metadata authority does not exist at all

raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#

Retrieve list of raw_extrinsic_metadata entries of the given id (unlike raw_extrinsic_metadata_get, which returns metadata entries targeting the id)

Parameters:

ids – list of hashes of RawExtrinsicMetadata objects

raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#

Returns all authorities that provided metadata on the given object.

metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#

Add new metadata fetchers to the storage.

Their name and version together are unique identifiers of this fetcher; and metadata is an arbitrary dict of JSONable data with information about this fetcher, which must not be None (but may be empty).

Parameters:

fetchers – iterable of MetadataFetcher to be inserted

metadata_fetcher_get(name: str, version: str) MetadataFetcher | None[source]#

Retrieve information about a fetcher

Parameters:
  • name – the name of the fetcher

  • version – version of the fetcher

Returns:

a MetadataFetcher object (with a non-None metadata field) if it is known, else None.

metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#

Add new metadata authorities to the storage.

Their type and url together are unique identifiers of this authority; and metadata is an arbitrary dict of JSONable data with information about this authority, which must not be None (but may be empty).

Parameters:

authorities – iterable of MetadataAuthority to be inserted

metadata_authority_get(type: MetadataAuthorityType, url: str) MetadataAuthority | None[source]#

Retrieve information about an authority

Parameters:
  • type – one of “deposit_client”, “forge”, or “registry”

  • url – unique URI identifying the authority

Returns:

a MetadataAuthority object (with a non-None metadata field) if it is known, else None.

clear_buffers(object_types: Sequence[str] = ()) None[source]#

For backend storages (pg, storage, in-memory), this is a noop operation. For proxy storages (especially filter, buffer), this is an operation which cleans internal state.

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#

For backend storages (pg, storage, in-memory), this is expected to be a noop operation. For proxy storages (especially buffer), this is expected to trigger actual writes to the backend.

class swh.storage.interface.ObjectDeletionInterface(*args, **kwargs)[source]#

Bases: Protocol

object_delete(swhids: List[ExtendedSWHID]) Dict[str, int][source]#

Delete objects from the storage

All skipped content objects matching the given SWHID will be removed, including those who have the same SWHID due to hash collisions.

Origin objects are removed alongside their associated origin visit and origin visit status objects.

Only objects from this facility will be removed. The same method should be called on other storage, objstorage, or journal instances where the specified objects need to be removed.

Parameters:

swhids – list of SWHID of the objects to remove

Returns:

number of objects removed. Details of each key:

content:delete

Number of content objects removed

content:delete:bytes

Sum of the removed contents’ data length

skipped_content:delete

Number of skipped content objects removed

directory:delete

Number of directory objects removed

revision:delete

Number of revision objects removed

release:delete

Number of release objects removed

snapshot:delete

Number of snapshot objects removed

origin:delete

Number of origin objects removed

origin_visit:delete

Number of origin visit objects removed

origin_visit_status:delete

Number of origin visit status objects removed

ori_metadata:delete

Number of raw extrinsic metadata objects targeting an origin that have been removed

snp_metadata:delete

Number of raw extrinsic metadata objects targeting a snapshot that have been removed

rev_metadata:delete

Number of raw extrinsic metadata objects targeting a revision that have been removed

rel_metadata:delete

Number of raw extrinsic metadata objects targeting a release that have been removed

dir_metadata:delete

Number ef raw extrinsic metadata objects targeting a directory that have been removed

cnt_metadata:delete

Number of raw extrinsic metadata objects targeting a content that have been removed

emd_metadata:delete

Number of raw extrinsic metadata objects targeting a raw extrinsic metadata object that have been removed

Return type:

dict

extid_delete_for_target(target_swhids: List[CoreSWHID]) Dict[str, int][source]#

Delete ExtID objects from the storage

Parameters:

target_swhids – list of SWHIDs targeted by the ExtID objects to remove

Returns:

extid:delete: Number of ExtID objects removed

Return type:

Summary dict with the following keys and associated values

class swh.storage.interface.ObjectReferencesPartition(table_name: str, year: int, week: int, start: date, end: date)[source]#

Bases: object

Represents a subset of ObjectReference rows inserted into the database within a certain time range

Method generated by attrs for class ObjectReferencesPartition.

year#

ISO year.

week#

ISO week.

class swh.storage.interface.PartitionsManagementInterface(*args, **kwargs)[source]#

Bases: Protocol

object_references_create_partition(year: int, week: int) Tuple[date, date][source]#

Create the partition of the object_references table for the given ISO year and week.

object_references_drop_partition(partition: ObjectReferencesPartition) None[source]#

Delete the partition of the object_references table for the given partition.

object_references_list_partitions() List[ObjectReferencesPartition][source]#

List existing partitions of the object_references table, ordered from oldest to the most recent.