swh.storage.cassandra package#


Module contents#

swh.storage.cassandra.create_keyspace(cql_runner: CqlRunner, *, durable_writes=True)[source]#
class swh.storage.cassandra.CassandraStorage(hosts, keyspace, objstorage=None, port=9042, journal_writer=None, allow_overwrite=False, consistency_level='ONE', directory_entries_insert_algo='one-by-one', auth_provider: Dict | None = None, table_options: Dict[str, str] | None = None)[source]#

Bases: object

A backend of swh-storage backed by Cassandra

  • hosts – Seed Cassandra nodes, to start connecting to the cluster

  • keyspace – Name of the Cassandra database to use

  • objstorage – Passed as argument to ObjStorage; if unset, use a NoopObjStorage

  • port – Cassandra port

  • journal_writer – Passed as argument to JournalWriter

  • allow_overwrite – Whether *_add functions will check if an object already exists in the database before sending it in an INSERT. False is the default as it is more efficient when there is a moderately high probability the object is already known, but True can be useful to overwrite existing objects (eg. when applying a schema update), or when the database is known to be mostly empty. Note that a False value does not guarantee there won’t be any overwrite.

  • consistency_level – The default read/write consistency to use

  • directory_entries_insert_algo – Must be one of: * one-by-one: naive, one INSERT per directory entry, serialized * concurrent: one INSERT per directory entry, concurrent * batch: using UNLOGGED BATCH to insert many entries in a few statements

  • auth_provider

    An optional dict describing the authentication provider to use. Must contain at least a cls entry and the parameters to pass to the constructor. For example:

        cls: cassandra.auth.PlainTextAuthProvider
        username: myusername
        password: mypassword

  • table_options – An optional dict mapping each table name (or the literal object_references_*) to CQL table options

property hosts: List[str]#
property keyspace: str#
property port: int#
check_config(*, check_write: bool) bool[source]#
content_add(content: List[Content]) Dict[str, int][source]#
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#
content_add_metadata(content: List[Content]) Dict[str, int][source]#
content_get_data(content: bytes | HashDict) bytes | None[source]#
content_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Content, str][source]#
content_get(contents: List[bytes], algo: str = 'sha1') List[Content | None][source]#
content_find(content: HashDict) List[Content][source]#
content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#
content_get_random() bytes[source]#
skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#
skipped_content_find(content: HashDict) List[SkippedContent][source]#
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#
directory_add(directories: List[Directory]) Dict[str, int][source]#
directory_missing(directories: List[bytes]) Iterable[bytes][source]#
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Dict[str, Any] | None[source]#
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#
directory_get_entries(directory_id: bytes, page_token: bytes | None = None, limit: int = 1000) PagedResult[DirectoryEntry, str] | None[source]#
directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, bytes | None][source]#
directory_get_random() bytes[source]#
directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
revision_add(revisions: List[Revision]) Dict[str, int][source]#
revision_missing(revisions: List[bytes]) Iterable[bytes][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Revision | None][source]#
revision_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Revision, str][source]#
revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: int | None = None) Iterable[Dict[str, Any] | None][source]#
revision_shortlog(revisions: List[bytes], limit: int | None = None) Iterable[Tuple[bytes, Tuple[bytes, ...]] | None][source]#
revision_get_random() bytes[source]#
release_add(releases: List[Release]) Dict[str, int][source]#
release_missing(releases: List[bytes]) Iterable[bytes][source]#
release_get(releases: List[bytes], ignore_displayname: bool = False) List[Release | None][source]#
release_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Release, str][source]#
release_get_random() bytes[source]#
snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#
snapshot_get(snapshot_id: bytes) Dict[str, Any] | None[source]#
snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: bytes | None = None) Dict[str | None, int] | None[source]#
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: List[str] | None = None, branch_name_include_substring: bytes | None = None, branch_name_exclude_prefix: bytes | None = None) PartialBranches | None[source]#
snapshot_get_random() bytes[source]#
snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) SnapshotBranchByNameResponse | None[source]#
origin_get(origins: List[str]) List[Origin | None][source]#
origin_get_one(origin_url: str) Origin | None[source]#

Given an origin url, return the origin if it exists, None otherwise

origin_get_by_sha1(sha1s: List[bytes]) List[Dict[str, Any] | None][source]#
origin_list(page_token: str | None = None, limit: int = 100) PagedResult[Origin, str][source]#
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#
origin_snapshot_get_all(origin_url: str) List[bytes][source]#
origin_add(origins: List[Origin]) Dict[str, int][source]#
origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#
origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#
origin_visit_get(origin: str, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#
origin_visit_get_with_statuses(origin: str, allowed_statuses: List[str] | None = None, require_snapshot: bool = False, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#
origin_visit_status_get(origin: str, visit: int, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#
origin_visit_find_by_date(origin: str, visit_date: datetime, type: str | None = None) OriginVisit | None[source]#
origin_visit_get_by(origin: str, visit: int) OriginVisit | None[source]#
origin_visit_get_latest(origin: str, type: str | None = None, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisit | None[source]#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisitStatus | None[source]#
origin_visit_status_get_random(type: str) OriginVisitStatus | None[source]#
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#
raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#

Add extrinsic metadata on objects (contents, directories, …).

raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: datetime | None = None, page_token: bytes | None = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#
raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#
metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#
metadata_fetcher_get(name: str, version: str) MetadataFetcher | None[source]#
metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#
metadata_authority_get(type: MetadataAuthorityType, url: str) MetadataAuthority | None[source]#
extid_add(ids: List[ExtID]) Dict[str, int][source]#
extid_get_from_extid(id_type: str, ids: List[bytes], version: int | None = None) List[ExtID][source]#
extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: str | None = None, extid_version: int | None = None) List[ExtID][source]#
object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#
object_references_add(references: List[ObjectReference]) Dict[str, int][source]#
object_references_create_partition(year: int, week: int) Tuple[date, date][source]#

Create the partition of the object_references table for the given ISO year and week.

object_references_drop_partition(partition: ObjectReferencesPartition) None[source]#

Delete the partition of the object_references table for the given partition.

object_references_list_partitions() List[ObjectReferencesPartition][source]#

List existing partitions of the object_references table, ordered from oldest to the most recent.

object_delete(swhids: List[ExtendedSWHID]) Dict[str, int][source]#

Delete objects from the storage

All skipped content objects matching the given SWHID will be removed, including those who have the same SWHID due to hash collisions.

Origin objects are removed alongside their associated origin visit and origin visit status objects.


swhids – list of SWHID of the objects to remove


number of objects removed. Details of each key:


Number of content objects removed


Sum of the removed contents’ data length


Number of skipped content objects removed


Number of directory objects removed


Number of revision objects removed


Number of release objects removed


Number of snapshot objects removed


Number of origin objects removed


Number of origin visit objects removed


Number of origin visit status objects removed


Number of raw extrinsic metadata objects targeting an origin that have been removed


Number of raw extrinsic metadata objects targeting a snapshot that have been removed


Number of raw extrinsic metadata objects targeting a revision that have been removed


Number of raw extrinsic metadata objects targeting a release that have been removed


Number ef raw extrinsic metadata objects targeting a directory that have been removed


Number of raw extrinsic metadata objects targeting a content that have been removed


Number of raw extrinsic metadata objects targeting a raw extrinsic metadata object that have been removed

Return type:


extid_delete_for_target(target_swhids: List[CoreSWHID]) Dict[str, int][source]#

Delete ExtID objects from the storage


target_swhids – list of SWHIDs targeted by the ExtID objects to remove


extid:delete: Number of ExtID objects removed

Return type:

Summary dict with the following keys and associated values

clear_buffers(object_types: Sequence[str] = ()) None[source]#

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#