swh.storage.in_memory module#

class swh.storage.in_memory.Table(row_class: Type[TRow])[source]#

Bases: Generic[TRow]

partition_key(row: TRow | Dict[str, Any]) Tuple[source]#

Returns the partition key of a row (ie. the cells which get hashed into the token.

clustering_key(row: TRow | Dict[str, Any]) Tuple[source]#

Returns the clustering key of a row (ie. the cells which are used for sorting rows within a partition.

primary_key(row)[source]#
primary_key_from_dict(d: Dict[str, Any]) Tuple[source]#

Returns the primary key (ie. concatenation of partition key and clustering key) of the given dictionary interpreted as a row.

token(key: Tuple)[source]#

Returns the token of a row (ie. the hash of its partition key).

get_partition(token: int) Dict[Tuple, TRow][source]#

Returns the partition that contains this token.

insert(row: TRow) None[source]#
delete(predicate: Callable[[TRow], bool]) None[source]#
split_primary_key(key: Tuple) Tuple[Tuple, Tuple][source]#

Returns (partition_key, clustering_key) from a partition key

get_from_partition_key(partition_key: Tuple) Iterable[TRow][source]#

Returns at most one row, from its partition key.

get_from_primary_key(primary_key: Tuple) TRow | None[source]#

Returns at most one row, from its primary key.

get_from_token(token: int) Iterable[TRow][source]#

Returns all rows whose token (ie. non-cryptographic hash of the partition key) is the one passed as argument.

iter_all() Iterator[Tuple[Tuple, TRow]][source]#
get_random() TRow | None[source]#
class swh.storage.in_memory.InMemoryCqlRunner[source]#

Bases: object

increment_counter(object_type: str, nb: int)[source]#
stat_counters() Iterable[ObjectCountRow][source]#
content_add_prepare(content: ContentRow)[source]#
content_get_from_pk(content_hashes: Dict[str, bytes]) ContentRow | None[source]#
content_get_from_tokens(tokens: List[int]) Iterable[ContentRow][source]#
content_get_random() ContentRow | None[source]#
content_get_token_range(start: int, end: int, limit: int) Iterable[Tuple[int, ContentRow]][source]#
content_missing_from_all_hashes(contents_hashes: List[Dict[str, bytes]]) Iterator[Dict[str, bytes]][source]#
content_delete(content_hashes: TotalHashDict) None[source]#
content_missing_by_sha1_git(ids: List[bytes]) List[bytes][source]#
content_index_add_one(algo: str, content: Content, token: int) None[source]#
content_get_tokens_from_single_algo(algo: str, hashes: List[bytes]) Iterable[int][source]#
skipped_content_add_prepare(content: SkippedContentRow)[source]#
skipped_content_get_from_pk(content_hashes: Dict[str, bytes]) SkippedContentRow | None[source]#
skipped_content_get_from_token(token: int) Iterable[SkippedContentRow][source]#
skipped_content_delete(content_hashes: TotalHashDict) None[source]#
skipped_content_index_add_one(algo: str, content: SkippedContent, token: int) None[source]#
skipped_content_get_tokens_from_single_hash(algo: str, hash_: bytes) Iterable[int][source]#
directory_missing(ids: List[bytes]) List[bytes][source]#
directory_add_one(directory: DirectoryRow) None[source]#
directory_get_random() DirectoryRow | None[source]#
directory_get(directory_ids: List[bytes]) Iterable[DirectoryRow][source]#
directory_get_token_range(start: int, end: int, limit: int) Iterable[Tuple[int, DirectoryRow]][source]#
directory_delete(directory_id: bytes) None[source]#
directory_entry_add_one(entry: DirectoryEntryRow) None[source]#
directory_entry_get(directory_ids: List[bytes]) Iterable[DirectoryEntryRow][source]#
directory_entry_get_from_name(directory_id: bytes, from_: bytes, limit: int) Iterable[DirectoryEntryRow][source]#
directory_entry_delete(directory_id: bytes) None[source]#
revision_missing(ids: List[bytes]) Iterable[bytes][source]#
revision_add_one(revision: RevisionRow) None[source]#
revision_get_ids(revision_ids) Iterable[int][source]#
revision_get(revision_ids: List[bytes], ignore_displayname: bool = False) Iterable[RevisionRow][source]#
revision_get_token_range(start: int, end: int, limit: int) Iterable[Tuple[int, RevisionRow]][source]#
revision_get_random() RevisionRow | None[source]#
revision_delete(revision_id: bytes) None[source]#
revision_parent_add_one(revision_parent: RevisionParentRow) None[source]#
revision_parent_get(revision_id: bytes) Iterable[bytes][source]#
revision_parent_delete(revision_id: bytes) None[source]#
release_missing(ids: List[bytes]) List[bytes][source]#
release_add_one(release: ReleaseRow) None[source]#
release_get(release_ids: List[str], ignore_displayname: bool = False) Iterable[ReleaseRow][source]#
release_get_token_range(start: int, end: int, limit: int) Iterable[Tuple[int, ReleaseRow]][source]#
release_get_random() ReleaseRow | None[source]#
release_delete(release_id: bytes) None[source]#
snapshot_missing(ids: List[bytes]) List[bytes][source]#
snapshot_add_one(snapshot: SnapshotRow) None[source]#
snapshot_get_token_range(start: int, end: int, limit: int) Iterable[Tuple[int, SnapshotRow]][source]#
snapshot_get_random() SnapshotRow | None[source]#
snapshot_branch_get_from_name(snapshot_id: bytes, from_: bytes, limit: int) Iterable[SnapshotBranchRow][source]#
snapshot_delete(snapshot_id: bytes) None[source]#
snapshot_branch_add_one(branch: SnapshotBranchRow) None[source]#
snapshot_count_branches(snapshot_id: bytes, branch_name_exclude_prefix: bytes | None = None) Dict[str | None, int][source]#

Returns a dictionary from type names to the number of branches of that type.

snapshot_branch_get(snapshot_id: bytes, from_: bytes, limit: int, branch_name_exclude_prefix: bytes | None = None) Iterable[SnapshotBranchRow][source]#
snapshot_branch_delete(snapshot_id: bytes) None[source]#
origin_add_one(origin: OriginRow) None[source]#
origin_get_by_sha1(sha1: bytes) Iterable[OriginRow][source]#
origin_get_by_url(url: str) Iterable[OriginRow][source]#
origin_list(start_token: int, limit: int) Iterable[Tuple[int, OriginRow]][source]#

Returns an iterable of (token, origin)

origin_iter_all() Iterable[OriginRow][source]#
origin_bump_next_visit_id(origin_url: str, visit_id: int) None[source]#
origin_generate_unique_visit_id(origin_url: str) int[source]#
origin_delete(sha1: bytes) None[source]#
origin_visit_get(origin_url: str, last_visit: int | None, limit: int, order: ListOrder) Iterable[OriginVisitRow][source]#
origin_visit_add_one(visit: OriginVisitRow) None[source]#
origin_visit_get_one(origin_url: str, visit_id: int) OriginVisitRow | None[source]#
origin_visit_iter_all(origin_url: str) Iterable[OriginVisitRow][source]#
origin_visit_iter(start_token: int) Iterator[OriginVisitRow][source]#

Returns all origin visits in order from this token, and wraps around the token space.

origin_visit_delete(origin_url: str) None[source]#
origin_visit_status_get_range(origin: str, visit: int, date_from: datetime | None, limit: int, order: ListOrder) Iterable[OriginVisitStatusRow][source]#
origin_visit_status_get_all_range(origin: str, first_visit: int, last_visit: int) Iterable[OriginVisitStatusRow][source]#
origin_visit_status_add_one(visit_update: OriginVisitStatusRow) None[source]#
origin_visit_status_get_latest(origin: str, visit: int) OriginVisitStatusRow | None[source]#

Given an origin visit id, return its latest origin_visit_status

origin_visit_status_get(origin: str, visit: int) Iterator[OriginVisitStatusRow][source]#

Return all origin visit statuses for a given visit

origin_snapshot_get_all(origin: str) Iterator[bytes][source]#

Return all snapshots for a given origin

origin_visit_status_delete(origin_url: str) None[source]#
metadata_authority_add(authority: MetadataAuthorityRow)[source]#
metadata_authority_get(type, url) MetadataAuthorityRow | None[source]#
metadata_fetcher_add(fetcher: MetadataFetcherRow)[source]#
metadata_fetcher_get(name, version) MetadataAuthorityRow | None[source]#
raw_extrinsic_metadata_by_id_add(row: RawExtrinsicMetadataByIdRow) None[source]#
raw_extrinsic_metadata_get_by_ids(ids) List[RawExtrinsicMetadataByIdRow][source]#
raw_extrinsic_metadata_by_id_delete(emd_id)[source]#
raw_extrinsic_metadata_add(raw_extrinsic_metadata)[source]#
raw_extrinsic_metadata_get_after_date(target: str, authority_type: str, authority_url: str, after: datetime) Iterable[RawExtrinsicMetadataRow][source]#
raw_extrinsic_metadata_get_after_date_and_id(target: str, authority_type: str, authority_url: str, after_date: datetime, after_id: bytes) Iterable[RawExtrinsicMetadataRow][source]#
raw_extrinsic_metadata_get(target: str, authority_type: str, authority_url: str) Iterable[RawExtrinsicMetadataRow][source]#
raw_extrinsic_metadata_get_authorities(target: str) Iterable[Tuple[str, str]][source]#
raw_extrinsic_metadata_delete(target, authority_type, authority_url, discovery_date, emd_id)[source]#
extid_add_prepare(extid: ExtIDRow)[source]#
extid_index_add_one(row: ExtIDByTargetRow) None[source]#
extid_delete(extid_type: str, extid: bytes, extid_version: int, target_type: str, target: bytes) None[source]#
extid_delete_from_by_target_table(target_type: str, target: bytes) None[source]#
extid_get_from_pk(extid_type: str, extid: bytes, extid_version: int, target: ExtendedSWHID) ExtIDRow | None[source]#
extid_get_from_extid(extid_type: str, extid: bytes) Iterable[ExtIDRow][source]#
extid_get_from_extid_and_version(extid_type: str, extid: bytes, extid_version: int) Iterable[ExtIDRow][source]#
extid_get_from_target(target_type: str, target: bytes, extid_type: str | None = None, extid_version: int | None = None) Iterable[ExtIDRow][source]#
object_reference_add_concurrent(entries: List[ObjectReferenceRow]) None[source]#
object_reference_get(target: bytes, target_type: str, limit: int) Iterable[ObjectReferenceRow][source]#
object_references_list_tables() List[ObjectReferencesTableRow][source]#
object_references_create_table(date: Tuple[int, int]) Tuple[date, date][source]#
object_references_drop_table(year: int, week: int) None[source]#
class swh.storage.in_memory.InMemoryStorage(journal_writer=None)[source]#

Bases: CassandraStorage

A backend of swh-storage backed by Cassandra

Parameters:
  • hosts – Seed Cassandra nodes, to start connecting to the cluster

  • keyspace – Name of the Cassandra database to use

  • objstorage – Passed as argument to ObjStorage; if unset, use a NoopObjStorage

  • port – Cassandra port

  • journal_writer – Passed as argument to JournalWriter

  • allow_overwrite – Whether *_add functions will check if an object already exists in the database before sending it in an INSERT. False is the default as it is more efficient when there is a moderately high probability the object is already known, but True can be useful to overwrite existing objects (eg. when applying a schema update), or when the database is known to be mostly empty. Note that a False value does not guarantee there won’t be any overwrite.

  • consistency_level – The default read/write consistency to use

  • directory_entries_insert_algo – Must be one of: * one-by-one: naive, one INSERT per directory entry, serialized * concurrent: one INSERT per directory entry, concurrent * batch: using UNLOGGED BATCH to insert many entries in a few statements

  • auth_provider

    An optional dict describing the authentication provider to use. Must contain at least a cls entry and the parameters to pass to the constructor. For example:

    auth_provider:
        cls: cassandra.auth.PlainTextAuthProvider
        username: myusername
        password: mypassword
    

  • table_options – An optional dict mapping each table name (or the literal object_references_*) to CQL table options

reset()[source]#
check_config(*, check_write: bool) bool[source]#