swh.storage.cassandra.storage module#

class swh.storage.cassandra.storage.CassandraStorage(hosts, keyspace, objstorage=None, port=9042, journal_writer=None, allow_overwrite=False, consistency_level='ONE', directory_entries_insert_algo='one-by-one', auth_provider: Optional[Dict] = None)[source]#

Bases: object

A backend of swh-storage backed by Cassandra

Parameters:
  • hosts – Seed Cassandra nodes, to start connecting to the cluster

  • keyspace – Name of the Cassandra database to use

  • objstorage – Passed as argument to ObjStorage; if unset, use a NoopObjStorage

  • port – Cassandra port

  • journal_writer – Passed as argument to JournalWriter

  • allow_overwrite – Whether *_add functions will check if an object already exists in the database before sending it in an INSERT. False is the default as it is more efficient when there is a moderately high probability the object is already known, but True can be useful to overwrite existing objects (eg. when applying a schema update), or when the database is known to be mostly empty. Note that a False value does not guarantee there won’t be any overwrite.

  • consistency_level – The default read/write consistency to use

  • directory_entries_insert_algo – Must be one of: * one-by-one: naive, one INSERT per directory entry, serialized * concurrent: one INSERT per directory entry, concurrent * batch: using UNLOGGED BATCH to insert many entries in a few statements

  • auth_provider

    An optional dict describing the authentication provider to use. Must contain at least a cls entry and the parameters to pass to the constructor. For example:

    auth_provider:
        cls: cassandra.auth.PlainTextAuthProvider
        username: myusername
        password: mypassword
    

property hosts: List[str]#
property keyspace: str#
property port: int#
check_config(*, check_write: bool) bool[source]#
content_add(content: List[Content]) Dict[str, int][source]#
content_update(contents: List[Dict[str, Any]], keys: List[str] = []) None[source]#
content_add_metadata(content: List[Content]) Dict[str, int][source]#
content_get_data(content: Union[bytes, HashDict]) Optional[bytes][source]#
content_get_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[Content, str][source]#
content_get(contents: List[bytes], algo: str = 'sha1') List[Optional[Content]][source]#
content_find(content: HashDict) List[Content][source]#
content_missing(contents: List[HashDict], key_hash: str = 'sha1') Iterable[bytes][source]#
content_missing_per_sha1(contents: List[bytes]) Iterable[bytes][source]#
content_missing_per_sha1_git(contents: List[bytes]) Iterable[bytes][source]#
content_get_random() bytes[source]#
skipped_content_add(content: List[SkippedContent]) Dict[str, int][source]#
skipped_content_find(content: HashDict) List[SkippedContent][source]#
skipped_content_missing(contents: List[Dict[str, Any]]) Iterable[Dict[str, Any]][source]#
directory_add(directories: List[Directory]) Dict[str, int][source]#
directory_missing(directories: List[bytes]) Iterable[bytes][source]#
directory_entry_get_by_path(directory: bytes, paths: List[bytes]) Optional[Dict[str, Any]][source]#
directory_ls(directory: bytes, recursive: bool = False) Iterable[Dict[str, Any]][source]#
directory_get_entries(directory_id: bytes, page_token: Optional[bytes] = None, limit: int = 1000) Optional[PagedResult[DirectoryEntry, str]][source]#
directory_get_raw_manifest(directory_ids: List[bytes]) Dict[bytes, Optional[bytes]][source]#
directory_get_random() bytes[source]#
directory_get_id_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[bytes, str][source]#
revision_add(revisions: List[Revision]) Dict[str, int][source]#
revision_missing(revisions: List[bytes]) Iterable[bytes][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) List[Optional[Revision]][source]#
revision_get_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[Revision, str][source]#
revision_log(revisions: List[bytes], ignore_displayname: bool = False, limit: Optional[int] = None) Iterable[Optional[Dict[str, Any]]][source]#
revision_shortlog(revisions: List[bytes], limit: Optional[int] = None) Iterable[Optional[Tuple[bytes, Tuple[bytes, ...]]]][source]#
revision_get_random() bytes[source]#
release_add(releases: List[Release]) Dict[str, int][source]#
release_missing(releases: List[bytes]) Iterable[bytes][source]#
release_get(releases: List[bytes], ignore_displayname: bool = False) List[Optional[Release]][source]#
release_get_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[Release, str][source]#
release_get_random() bytes[source]#
snapshot_add(snapshots: List[Snapshot]) Dict[str, int][source]#
snapshot_missing(snapshots: List[bytes]) Iterable[bytes][source]#
snapshot_get(snapshot_id: bytes) Optional[Dict[str, Any]][source]#
snapshot_get_id_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000) PagedResult[bytes, str][source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: Optional[bytes] = None) Optional[Dict[Optional[str], int]][source]#
snapshot_get_branches(snapshot_id: bytes, branches_from: bytes = b'', branches_count: int = 1000, target_types: Optional[List[str]] = None, branch_name_include_substring: Optional[bytes] = None, branch_name_exclude_prefix: Optional[bytes] = None) Optional[PartialBranches][source]#
snapshot_get_random() bytes[source]#
snapshot_branch_get_by_name(snapshot_id: bytes, branch_name: bytes, follow_alias_chain: bool = True, max_alias_chain_length: int = 100) Optional[SnapshotBranchByNameResponse][source]#
origin_get(origins: List[str]) Iterable[Optional[Origin]][source]#
origin_get_one(origin_url: str) Optional[Origin][source]#

Given an origin url, return the origin if it exists, None otherwise

origin_get_by_sha1(sha1s: List[bytes]) List[Optional[Dict[str, Any]]][source]#
origin_list(page_token: Optional[str] = None, limit: int = 100) PagedResult[Origin, str][source]#
origin_count(url_pattern: str, regexp: bool = False, with_visit: bool = False) int[source]#
origin_snapshot_get_all(origin_url: str) List[bytes][source]#
origin_add(origins: List[Origin]) Dict[str, int][source]#
origin_visit_add(visits: List[OriginVisit]) Iterable[OriginVisit][source]#
origin_visit_status_add(visit_statuses: List[OriginVisitStatus]) Dict[str, int][source]#
origin_visit_get(origin: str, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisit, str][source]#
origin_visit_get_with_statuses(origin: str, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitWithStatuses, str][source]#
origin_visit_status_get(origin: str, visit: int, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10) PagedResult[OriginVisitStatus, str][source]#
origin_visit_find_by_date(origin: str, visit_date: datetime) Optional[OriginVisit][source]#
origin_visit_get_by(origin: str, visit: int) Optional[OriginVisit][source]#
origin_visit_get_latest(origin: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) Optional[OriginVisit][source]#
origin_visit_status_get_latest(origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False) Optional[OriginVisitStatus][source]#
origin_visit_status_get_random(type: str) Optional[OriginVisitStatus][source]#
object_find_by_sha1_git(ids: List[bytes]) Dict[bytes, List[Dict]][source]#
stat_counters()[source]#
refresh_stat_counters()[source]#
raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) Dict[str, int][source]#
raw_extrinsic_metadata_get(target: ExtendedSWHID, authority: MetadataAuthority, after: Optional[datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000) PagedResult[RawExtrinsicMetadata, str][source]#
raw_extrinsic_metadata_get_by_ids(ids: List[bytes]) List[RawExtrinsicMetadata][source]#
raw_extrinsic_metadata_get_authorities(target: ExtendedSWHID) List[MetadataAuthority][source]#
metadata_fetcher_add(fetchers: List[MetadataFetcher]) Dict[str, int][source]#
metadata_fetcher_get(name: str, version: str) Optional[MetadataFetcher][source]#
metadata_authority_add(authorities: List[MetadataAuthority]) Dict[str, int][source]#
metadata_authority_get(type: MetadataAuthorityType, url: str) Optional[MetadataAuthority][source]#
extid_add(ids: List[ExtID]) Dict[str, int][source]#
extid_get_from_extid(id_type: str, ids: List[bytes], version: Optional[int] = None) List[ExtID][source]#
extid_get_from_target(target_type: ObjectType, ids: List[bytes], extid_type: Optional[str] = None, extid_version: Optional[int] = None) List[ExtID][source]#
object_find_recent_references(target_swhid: ExtendedSWHID, limit: int) List[ExtendedSWHID][source]#
object_references_add(references: List[ObjectReference]) Dict[str, int][source]#
clear_buffers(object_types: Sequence[str] = ()) None[source]#

Do nothing

flush(object_types: Sequence[str] = ()) Dict[str, int][source]#