swh.storage.cassandra.storage module#

class swh.storage.cassandra.storage.CassandraStorage(hosts, keyspace, objstorage=None, port=9042, journal_writer=None, allow_overwrite=False, consistency_level='ONE', directory_entries_insert_algo='one-by-one', auth_provider: Dict | None = None)[source]#

Bases: object

A backend of swh-storage backed by Cassandra

Parameters:
  • hosts – Seed Cassandra nodes, to start connecting to the cluster

  • keyspace – Name of the Cassandra database to use

  • objstorage – Passed as argument to ObjStorage; if unset, use a NoopObjStorage

  • port – Cassandra port

  • journal_writer – Passed as argument to JournalWriter

  • allow_overwrite – Whether *_add functions will check if an object already exists in the database before sending it in an INSERT. False is the default as it is more efficient when there is a moderately high probability the object is already known, but True can be useful to overwrite existing objects (eg. when applying a schema update), or when the database is known to be mostly empty. Note that a False value does not guarantee there won’t be any overwrite.

  • consistency_level – The default read/write consistency to use

  • directory_entries_insert_algo – Must be one of: * one-by-one: naive, one INSERT per directory entry, serialized * concurrent: one INSERT per directory entry, concurrent * batch: using UNLOGGED BATCH to insert many entries in a few statements

  • auth_provider

    An optional dict describing the authentication provider to use. Must contain at least a cls entry and the parameters to pass to the constructor. For example:

    auth_provider:
        cls: cassandra.auth.PlainTextAuthProvider
        username: myusername
        password: mypassword
    

property hosts: List[str]#
property keyspace: str#
property port: int#
check_config(*, check_write: bool) bool[source]#
content_add(content: List[Content]) Dict[str, int][source]#
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#
content_add_metadata(content: List[Content]) Dict[str, int][source]#
content_get_data(content: bytes | HashDict) bytes | None[source]#
content_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Content, str][source]#
content_get(contents: List[bytes], algo: str = 'sha1') List[Content | None][source]#
content_find(content: HashDict) List[Content][source]#
content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#
content_get_random() bytes[source]#
skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#
skipped_content_find(content: HashDict) List[SkippedContent][source]#
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#
directory_add(directories: List[Directory]) Dict[str, int][source]#
directory_missing(directories: List[bytes]) Iterable[bytes][source]#
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Dict[str, Any] | None[source]#
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#
directory_get_entries(directory_id: bytes, page_token: bytes | None = None, limit: int = 1000) PagedResult[DirectoryEntry, str] | None[source]#
directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, bytes | None][source]#
directory_get_random() bytes[source]#
directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
revision_add(revisions: List[Revision]) Dict[str, int][source]#
revision_missing(revisions: List[bytes]) Iterable[bytes][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Revision | None][source]#
revision_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Revision, str][source]#
revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: int | None = None) Iterable[Dict[str, Any] | None][source]#
revision_shortlog(revisions: List[bytes], limit: int | None = None) Iterable[Tuple[bytes, Tuple[bytes, ...]] | None][source]#
revision_get_random() bytes[source]#
release_add(releases: List[Release]) Dict[str, int][source]#
release_missing(releases: List[bytes]) Iterable[bytes][source]#
release_get(releases: List[bytes], ignore_displayname: bool = False) List[Release | None][source]#
release_get_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[Release, str][source]#
release_get_random() bytes[source]#
snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#
snapshot_get(snapshot_id: bytes) Dict[str, Any] | None[source]#
snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: str | None = None, limit: int = 1000) PagedResult[bytes, str][source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: bytes | None = None) Dict[str | None, int] | None[source]#
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: List[str] | None = None, branch_name_include_substring: bytes | None = None, branch_name_exclude_prefix: bytes | None = None) PartialBranches | None[source]#
snapshot_get_random() bytes[source]#
snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) SnapshotBranchByNameResponse | None[source]#
origin_get(origins: List[str]) List[Origin | None][source]#
origin_get_one(origin_url: str) Origin | None[source]#

Given an origin url, return the origin if it exists, None otherwise

origin_get_by_sha1(sha1s: List[bytes]) List[Dict[str, Any] | None][source]#
origin_list(page_token: str | None = None, limit: int = 100) PagedResult[Origin, str][source]#
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#
origin_snapshot_get_all(origin_url: str) List[bytes][source]#
origin_add(origins: List[Origin]) Dict[str, int][source]#
origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#
origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#
origin_visit_get(origin: str, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#
origin_visit_get_with_statuses(origin: str, allowed_statuses: List[str] | None = None, require_snapshot: bool = False, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#
origin_visit_status_get(origin: str, visit: int, page_token: str | None = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#
origin_visit_find_by_date(origin: str, visit_date: datetime, type: str | None = None) OriginVisit | None[source]#
origin_visit_get_by(origin: str, visit: int) OriginVisit | None[source]#
origin_visit_get_latest(origin: str, type: str | None = None, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisit | None[source]#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: List[str] | None = None, require_snapshot: bool = False) OriginVisitStatus | None[source]#
origin_visit_status_get_random(type: str) OriginVisitStatus | None[source]#
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#
stat_counters()[source]#
refresh_stat_counters()[source]#
raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#

Add extrinsic metadata on objects (contents, directories, …).

raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: datetime | None = None, page_token: bytes | None = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#
raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#
metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#
metadata_fetcher_get(name: str, version: str) MetadataFetcher | None[source]#
metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#
metadata_authority_get(type: MetadataAuthorityType, url: str) MetadataAuthority | None[source]#
extid_add(ids: List[ExtID]) Dict[str, int][source]#
extid_get_from_extid(id_type: str, ids: List[bytes], version: int | None = None) List[ExtID][source]#
extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: str | None = None, extid_version: int | None = None) List[ExtID][source]#
object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#
object_references_add(references: List[ObjectReference]) Dict[str, int][source]#
object_delete(swhids: List[ExtendedSWHID]) Dict[str, int][source]#

Delete objects from the storage

All skipped content objects matching the given SWHID will be removed, including those who have the same SWHID due to hash collisions.

Origin objects are removed alongside their associated origin visit and origin visit status objects.

Parameters:

swhids – list of SWHID of the objects to remove

Returns:

number of objects removed. Details of each key:

content:delete

Number of content objects removed

content:delete:bytes

Sum of the removed contents’ data length

skipped_content:delete

Number of skipped content objects removed

directory:delete

Number of directory objects removed

revision:delete

Number of revision objects removed

release:delete

Number of release objects removed

snapshot:delete

Number of snapshot objects removed

origin:delete

Number of origin objects removed

origin_visit:delete

Number of origin visit objects removed

origin_visit_status:delete

Number of origin visit status objects removed

ori_metadata:delete

Number of raw extrinsic metadata objects targeting an origin that have been removed

snp_metadata:delete

Number of raw extrinsic metadata objects targeting a snapshot that have been removed

rev_metadata:delete

Number of raw extrinsic metadata objects targeting a revision that have been removed

rel_metadata:delete

Number of raw extrinsic metadata objects targeting a release that have been removed

dir_metadata:delete

Number ef raw extrinsic metadata objects targeting a directory that have been removed

cnt_metadata:delete

Number of raw extrinsic metadata objects targeting a content that have been removed

emd_metadata:delete

Number of raw extrinsic metadata objects targeting a raw extrinsic metadata object that have been removed

Return type:

dict

extid_delete_for_target(target_swhids: List[CoreSWHID]) Dict[str, int][source]#

Delete ExtID objects from the storage

Parameters:

target_swhids – list of SWHIDs targeted by the ExtID objects to remove

Returns:

extid:delete: Number of ExtID objects removed

Return type:

Summary dict with the following keys and associated values

clear_buffers(object_types: Sequence[str] = ()) None[source]#

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#