Source code for swh.storage.cassandra.cql

# Copyright (C) 2019-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from collections import Counter
import dataclasses
import datetime
import functools
import itertools
import logging
import random
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Sequence,
    Tuple,
    Type,
    TypeVar,
    Union,
)

from cassandra import ConsistencyLevel, CoordinationFailure
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BoundStatement, PreparedStatement, dict_factory
from mypy_extensions import NamedArg
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_random_exponential,
)

from swh.core.utils import grouper
from swh.model.identifiers import CoreSWHID
from swh.model.model import (
    Content,
    Person,
    Sha1Git,
    SkippedContent,
    Timestamp,
    TimestampWithTimezone,
)
from swh.storage.interface import ListOrder

from ..utils import remove_keys
from .common import TOKEN_BEGIN, TOKEN_END, hash_url
from .model import (
    MAGIC_NULL_PK,
    BaseRow,
    ContentRow,
    DirectoryEntryRow,
    DirectoryRow,
    ExtIDByTargetRow,
    ExtIDRow,
    MetadataAuthorityRow,
    MetadataFetcherRow,
    ObjectCountRow,
    OriginRow,
    OriginVisitRow,
    OriginVisitStatusRow,
    RawExtrinsicMetadataByIdRow,
    RawExtrinsicMetadataRow,
    ReleaseRow,
    RevisionParentRow,
    RevisionRow,
    SkippedContentRow,
    SnapshotBranchRow,
    SnapshotRow,
    content_index_table_name,
)
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS

PARTITION_KEY_RESTRICTION_MAX_SIZE = 100
"""Maximum number of restrictions in a single query.
Usually this is a very low number (eg. SELECT ... FROM ... WHERE x=?),
but some queries can request arbitrarily many (eg. SELECT ... FROM ... WHERE x IN ?).

This can cause performance issues, as the node getting the query need to
coordinate with other nodes to get the complete results.
See <https://github.com/scylladb/scylla/pull/4797> for details and rationale.
"""

BATCH_INSERT_MAX_SIZE = 1000


logger = logging.getLogger(__name__)


[docs]def get_execution_profiles( consistency_level: str = "ONE", ) -> Dict[object, ExecutionProfile]: if consistency_level not in ConsistencyLevel.name_to_value: raise ValueError( f"Configuration error: Unknown consistency level '{consistency_level}'" ) return { EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), row_factory=dict_factory, consistency_level=ConsistencyLevel.name_to_value[consistency_level], ) }
# Configuration for cassandra-driver's access to servers: # * hit the right server directly when sending a query (TokenAwarePolicy), # * if there's more than one, then pick one at random that's in the same # datacenter as the client (DCAwareRoundRobinPolicy)
[docs]def create_keyspace( hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True ): cluster = Cluster(hosts, port=port, execution_profiles=get_execution_profiles()) session = cluster.connect() extra_params = "" if not durable_writes: extra_params = "AND durable_writes = false" session.execute( """CREATE KEYSPACE IF NOT EXISTS "%s" WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } %s; """ % (keyspace, extra_params) ) session.execute('USE "%s"' % keyspace) for query in CREATE_TABLES_QUERIES: session.execute(query)
TRet = TypeVar("TRet") def _prepared_statement( query: str, ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: """Returns a decorator usable on methods of CqlRunner, to inject them with a 'statement' argument, that is a prepared statement corresponding to the query. This only works on methods of CqlRunner, as preparing a statement requires a connection to a Cassandra server.""" def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: statement: PreparedStatement = self._session.prepare(query) self._prepared_statements[f.__name__] = statement return f( self, *args, **kwargs, statement=self._prepared_statements[f.__name__] ) return newf return decorator TArg = TypeVar("TArg") TSelf = TypeVar("TSelf") def _insert_query(row_class): columns = row_class.cols() return ( f"INSERT INTO {row_class.TABLE} ({', '.join(columns)}) " f"VALUES ({', '.join('?' for _ in columns)})" ) def _prepared_insert_statement( row_class: Type[BaseRow], ) -> Callable[ [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa Callable[[TSelf, TArg], TRet], ]: """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" return _prepared_statement(_insert_query(row_class)) def _prepared_exists_statement( table_name: str, ) -> Callable[ [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa Callable[[TSelf, TArg], TRet], ]: """Shorthand for using `_prepared_statement` for queries that only check which ids in a list exist in the table.""" return _prepared_statement(f"SELECT id FROM {table_name} WHERE id IN ?") def _prepared_select_statement( row_class: Type[BaseRow], clauses: str = "", cols: Optional[List[str]] = None, ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: if cols is None: cols = row_class.cols() return _prepared_statement( f"SELECT {', '.join(cols)} FROM {row_class.TABLE} {clauses}" ) def _prepared_select_statements( row_class: Type[BaseRow], queries: Dict[Any, str], ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: """Like _prepared_statement, but supports multiple statements, passed a dict, and passes a dict of prepared statements to the decorated method""" cols = row_class.cols() statement_start = f"SELECT {', '.join(cols)} FROM {row_class.TABLE} " def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: self._prepared_statements[f.__name__] = { key: self._session.prepare(statement_start + query) for (key, query) in queries.items() } return f( self, *args, **kwargs, statements=self._prepared_statements[f.__name__] ) return newf return decorator def _next_bytes_value(value: bytes) -> bytes: """Returns the next bytes value by incrementing the integer representation of the provided value and converting it back to bytes. For instance when prefix is b"abcd", it returns b"abce". """ next_value_int = int.from_bytes(value, byteorder="big") + 1 return next_value_int.to_bytes( (next_value_int.bit_length() + 7) // 8, byteorder="big" )
[docs]class CqlRunner: """Class managing prepared statements and building queries to be sent to Cassandra.""" def __init__( self, hosts: List[str], keyspace: str, port: int, consistency_level: str ): self._cluster = Cluster( hosts, port=port, execution_profiles=get_execution_profiles(consistency_level), ) self._session = self._cluster.connect(keyspace) self._cluster.register_user_type( keyspace, "microtimestamp_with_timezone", TimestampWithTimezone ) self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) self._cluster.register_user_type(keyspace, "person", Person) # directly a PreparedStatement for methods decorated with # @_prepared_statements (and its wrappers, _prepared_insert_statement, # _prepared_exists_statement, and _prepared_select_statement); # and a dict of PreparedStatements with @_prepared_select_statements self._prepared_statements: Dict[ str, Union[PreparedStatement, Dict[Any, PreparedStatement]] ] = {} ########################## # Common utility functions ########################## MAX_RETRIES = 3 @retry( wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) def _execute_with_retries(self, statement, args: Optional[Sequence]) -> ResultSet: return self._session.execute(statement, args, timeout=1000.0) @retry( wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) def _execute_many_with_retries( self, statement, args_list: List[Tuple] ) -> Iterable[BaseRow]: for res in execute_concurrent_with_args(self._session, statement, args_list): yield from res.result_or_exc def _add_one(self, statement, obj: BaseRow) -> None: self._execute_with_retries(statement, dataclasses.astuple(obj)) def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: tables = {obj.TABLE for obj in objs} assert len(tables) == 1, f"Cannot insert to multiple tables: {tables}" rows = list(map(dataclasses.astuple, objs)) for _ in self._execute_many_with_retries(statement, rows): # Need to consume the generator to actually run the INSERTs pass _T = TypeVar("_T", bound=BaseRow) def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa """Takes a prepared statement of the form "SELECT * FROM <table> WHERE token(<keys>) > ? LIMIT 1" and uses it to return a random row""" token = random.randint(TOKEN_BEGIN, TOKEN_END) rows = self._execute_with_retries(statement, [token]) if not rows: # There are no row with a greater token; wrap around to get # the row with the smallest token rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) if rows: return row_class.from_dict(rows.one()) # type: ignore else: return None def _missing(self, statement, ids): found_ids = set() for id_group in grouper(ids, PARTITION_KEY_RESTRICTION_MAX_SIZE): rows = self._execute_with_retries(statement, [list(id_group)]) found_ids.update(row["id"] for row in rows) return [id_ for id_ in ids if id_ not in found_ids] ########################## # 'content' table ########################## def _content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None)
[docs] @_prepared_insert_statement(ContentRow) def content_add_prepare( self, content: ContentRow, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" statement = statement.bind(dataclasses.astuple(content)) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(ContentRow.PARTITION_KEY)}) FROM content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._content_add_finalize, statement) return (token, finalizer)
[docs] @_prepared_select_statement( ContentRow, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}" ) def content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[ContentRow]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] ) ) assert len(rows) <= 1 if rows: return ContentRow(**rows[0]) else: return None
[docs] def content_missing_from_hashes( self, contents_hashes: List[Dict[str, bytes]] ) -> Iterator[Dict[str, bytes]]: for group in grouper(contents_hashes, PARTITION_KEY_RESTRICTION_MAX_SIZE): group = list(group) # Get all contents that share a sha256 with one of the contents in the group present = set( self._content_get_hashes_from_sha256( [content["sha256"] for content in group] ) ) for content in group: for algo in HASH_ALGORITHMS: assert content.get(algo) is not None, ( "content_missing_from_hashes must not be called with " "partial hashes." ) if tuple(content[algo] for algo in HASH_ALGORITHMS) not in present: yield content
@_prepared_statement( f"SELECT {', '.join(HASH_ALGORITHMS)} FROM content WHERE sha256 IN ?" ) def _content_get_hashes_from_sha256( self, ids: List[bytes], *, statement ) -> Iterator[Tuple[bytes, bytes, bytes, bytes]]: for row in self._execute_with_retries(statement, [ids]): yield tuple(row[algo] for algo in HASH_ALGORITHMS) # type: ignore
[docs] @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" ) def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: return map(ContentRow.from_dict, self._execute_with_retries(statement, [token]))
[docs] @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" ) def content_get_random(self, *, statement) -> Optional[ContentRow]: return self._get_random_row(ContentRow, statement)
[docs] @_prepared_statement( """ SELECT token({pk}) AS tok, {cols} FROM {table} WHERE token({pk}) >= ? AND token({pk}) <= ? LIMIT ? """.format( pk=", ".join(ContentRow.PARTITION_KEY), cols=", ".join(ContentRow.cols()), table=ContentRow.TABLE, ) ) def content_get_token_range( self, start: int, end: int, limit: int, *, statement ) -> Iterable[Tuple[int, ContentRow]]: """Returns an iterable of (token, row)""" return ( (row["tok"], ContentRow.from_dict(remove_keys(row, ("tok",)))) for row in self._execute_with_retries(statement, [start, end, limit]) )
########################## # 'content_by_*' tables ##########################
[docs] @_prepared_statement( f""" SELECT sha1_git AS id FROM {content_index_table_name("sha1_git", skipped_content=False)} WHERE sha1_git IN ? """ ) def content_missing_by_sha1_git( self, ids: List[bytes], *, statement ) -> List[bytes]: return self._missing(statement, ids)
[docs] def content_index_add_one(self, algo: str, content: Content, token: int) -> None: """Adds a row mapping content[algo] to the token of the Content in the main 'content' table.""" query = f""" INSERT INTO {content_index_table_name(algo, skipped_content=False)} ({algo}, target_token) VALUES (%s, %s) """ self._execute_with_retries(query, [content.get_hash(algo), token])
[docs] def content_get_tokens_from_single_algo( self, algo: str, hashes: List[bytes] ) -> Iterable[int]: assert algo in HASH_ALGORITHMS query = f""" SELECT target_token FROM {content_index_table_name(algo, skipped_content=False)} WHERE {algo} = %s """ return ( row["target_token"] # type: ignore for row in self._execute_many_with_retries( query, [(hash_,) for hash_ in hashes] ) )
########################## # 'skipped_content' table ########################## def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by skipped_content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None)
[docs] @_prepared_insert_statement(SkippedContentRow) def skipped_content_add_prepare( self, content, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'skipped_content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" # Replace NULLs (which are not allowed in the partition key) with # an empty byte string for key in SkippedContentRow.PARTITION_KEY: if getattr(content, key) is None: setattr(content, key, MAGIC_NULL_PK) statement = statement.bind(dataclasses.astuple(content)) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(SkippedContentRow.PARTITION_KEY)}) # FROM skipped_content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'skipped_content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._skipped_content_add_finalize, statement) return (token, finalizer)
[docs] @_prepared_select_statement( SkippedContentRow, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", ) def skipped_content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[SkippedContentRow]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] or MAGIC_NULL_PK for algo in HASH_ALGORITHMS], ) ) assert len(rows) <= 1 if rows: return SkippedContentRow.from_dict(rows[0]) else: return None
[docs] @_prepared_select_statement( SkippedContentRow, f"WHERE token({', '.join(SkippedContentRow.PARTITION_KEY)}) = ?", ) def skipped_content_get_from_token( self, token, *, statement ) -> Iterable[SkippedContentRow]: return map( SkippedContentRow.from_dict, self._execute_with_retries(statement, [token]) )
########################## # 'skipped_content_by_*' tables ##########################
[docs] def skipped_content_index_add_one( self, algo: str, content: SkippedContent, token: int ) -> None: """Adds a row mapping content[algo] to the token of the SkippedContent in the main 'skipped_content' table.""" query = ( f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" ) self._execute_with_retries( query, [content.get_hash(algo) or MAGIC_NULL_PK, token] )
[docs] def skipped_content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS query = f""" SELECT target_token FROM {content_index_table_name(algo, skipped_content=True)} WHERE {algo} = %s """ return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) )
########################## # 'revision' table ##########################
[docs] @_prepared_exists_statement("revision") def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids)
[docs] @_prepared_insert_statement(RevisionRow) def revision_add_one(self, revision: RevisionRow, *, statement) -> None: self._add_one(statement, revision)
[docs] @_prepared_statement(f"SELECT id FROM {RevisionRow.TABLE} WHERE id IN ?") def revision_get_ids(self, revision_ids, *, statement) -> Iterable[int]: return ( row["id"] for row in self._execute_with_retries(statement, [revision_ids]) )
[docs] @_prepared_select_statement(RevisionRow, "WHERE id IN ?") def revision_get( self, revision_ids: List[Sha1Git], *, statement ) -> Iterable[RevisionRow]: return map( RevisionRow.from_dict, self._execute_with_retries(statement, [revision_ids]) )
[docs] @_prepared_select_statement(RevisionRow, "WHERE token(id) > ? LIMIT 1") def revision_get_random(self, *, statement) -> Optional[RevisionRow]: return self._get_random_row(RevisionRow, statement)
########################## # 'revision_parent' table ##########################
[docs] @_prepared_insert_statement(RevisionParentRow) def revision_parent_add_one( self, revision_parent: RevisionParentRow, *, statement ) -> None: self._add_one(statement, revision_parent)
[docs] @_prepared_statement( f"SELECT parent_id FROM {RevisionParentRow.TABLE} WHERE id = ?" ) def revision_parent_get( self, revision_id: Sha1Git, *, statement ) -> Iterable[bytes]: return ( row["parent_id"] for row in self._execute_with_retries(statement, [revision_id]) )
########################## # 'release' table ##########################
[docs] @_prepared_exists_statement("release") def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids)
[docs] @_prepared_insert_statement(ReleaseRow) def release_add_one(self, release: ReleaseRow, *, statement) -> None: self._add_one(statement, release)
[docs] @_prepared_select_statement(ReleaseRow, "WHERE id in ?") def release_get(self, release_ids: List[str], *, statement) -> Iterable[ReleaseRow]: return map( ReleaseRow.from_dict, self._execute_with_retries(statement, [release_ids]) )
[docs] @_prepared_select_statement(ReleaseRow, "WHERE token(id) > ? LIMIT 1") def release_get_random(self, *, statement) -> Optional[ReleaseRow]: return self._get_random_row(ReleaseRow, statement)
########################## # 'directory' table ##########################
[docs] @_prepared_exists_statement("directory") def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids)
[docs] @_prepared_insert_statement(DirectoryRow) def directory_add_one(self, directory: DirectoryRow, *, statement) -> None: """Called after all calls to directory_entry_add_one, to commit/finalize the directory.""" self._add_one(statement, directory)
[docs] @_prepared_select_statement(DirectoryRow, "WHERE token(id) > ? LIMIT 1") def directory_get_random(self, *, statement) -> Optional[DirectoryRow]: return self._get_random_row(DirectoryRow, statement)
########################## # 'directory_entry' table ##########################
[docs] @_prepared_insert_statement(DirectoryEntryRow) def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: self._add_one(statement, entry)
[docs] @_prepared_insert_statement(DirectoryEntryRow) def directory_entry_add_concurrent( self, entries: List[DirectoryEntryRow], *, statement ) -> None: if len(entries) == 0: # nothing to do return assert ( len({entry.directory_id for entry in entries}) == 1 ), "directory_entry_add_many must be called with entries for a single dir" self._add_many(statement, entries)
[docs] @_prepared_statement( "BEGIN UNLOGGED BATCH\n" + (_insert_query(DirectoryEntryRow) + ";\n") * BATCH_INSERT_MAX_SIZE + "APPLY BATCH" ) def directory_entry_add_batch( self, entries: List[DirectoryEntryRow], *, statement ) -> None: if len(entries) == 0: # nothing to do return assert ( len({entry.directory_id for entry in entries}) == 1 ), "directory_entry_add_many must be called with entries for a single dir" for entry_group in grouper(entries, BATCH_INSERT_MAX_SIZE): entry_group = list(entry_group) if len(entry_group) == BATCH_INSERT_MAX_SIZE: entry_group = list(map(dataclasses.astuple, entry_group)) self._execute_with_retries( statement, list(itertools.chain.from_iterable(entry_group)) ) else: # Last group, with a smaller size than the BATCH we prepared. # Creating a prepared BATCH just for this then discarding it would # create too much churn on the server side; and using unprepared # statements is annoying (we can't use _insert_query() as they have # a different format) # Fall back to inserting concurrently. self.directory_entry_add_concurrent(entry_group)
[docs] @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") def directory_entry_get( self, directory_ids, *, statement ) -> Iterable[DirectoryEntryRow]: return map( DirectoryEntryRow.from_dict, self._execute_with_retries(statement, [directory_ids]), )
[docs] @_prepared_select_statement( DirectoryEntryRow, "WHERE directory_id = ? AND name >= ? LIMIT ?" ) def directory_entry_get_from_name( self, directory_id: Sha1Git, from_: bytes, limit: int, *, statement ) -> Iterable[DirectoryEntryRow]: return map( DirectoryEntryRow.from_dict, self._execute_with_retries(statement, [directory_id, from_, limit]), )
########################## # 'snapshot' table ##########################
[docs] @_prepared_exists_statement("snapshot") def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids)
[docs] @_prepared_insert_statement(SnapshotRow) def snapshot_add_one(self, snapshot: SnapshotRow, *, statement) -> None: self._add_one(statement, snapshot)
[docs] @_prepared_select_statement(SnapshotRow, "WHERE token(id) > ? LIMIT 1") def snapshot_get_random(self, *, statement) -> Optional[SnapshotRow]: return self._get_random_row(SnapshotRow, statement)
########################## # 'snapshot_branch' table ##########################
[docs] @_prepared_insert_statement(SnapshotBranchRow) def snapshot_branch_add_one(self, branch: SnapshotBranchRow, *, statement) -> None: self._add_one(statement, branch)
[docs] @_prepared_statement( f""" SELECT ascii_bins_count(target_type) AS counts FROM {SnapshotBranchRow.TABLE} WHERE snapshot_id = ? AND name >= ? """ ) def snapshot_count_branches_from_name( self, snapshot_id: Sha1Git, from_: bytes, *, statement ) -> Dict[Optional[str], int]: row = self._execute_with_retries(statement, [snapshot_id, from_]).one() (nb_none, counts) = row["counts"] return {None: nb_none, **counts}
[docs] @_prepared_statement( f""" SELECT ascii_bins_count(target_type) AS counts FROM {SnapshotBranchRow.TABLE} WHERE snapshot_id = ? AND name < ? """ ) def snapshot_count_branches_before_name( self, snapshot_id: Sha1Git, before: bytes, *, statement, ) -> Dict[Optional[str], int]: row = self._execute_with_retries(statement, [snapshot_id, before]).one() (nb_none, counts) = row["counts"] return {None: nb_none, **counts}
[docs] def snapshot_count_branches( self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Dict[Optional[str], int]: """Returns a dictionary from type names to the number of branches of that type.""" prefix = branch_name_exclude_prefix if prefix is None: return self.snapshot_count_branches_from_name(snapshot_id, b"") else: # counts branches before exclude prefix counts = Counter( self.snapshot_count_branches_before_name(snapshot_id, prefix) ) # no need to execute that part if each bit of the prefix equals 1 if prefix.replace(b"\xff", b"") != b"": # counts branches after exclude prefix and update counters counts.update( self.snapshot_count_branches_from_name( snapshot_id, _next_bytes_value(prefix) ) ) return counts
[docs] @_prepared_select_statement( SnapshotBranchRow, "WHERE snapshot_id = ? AND name >= ? LIMIT ?" ) def snapshot_branch_get_from_name( self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement ) -> Iterable[SnapshotBranchRow]: return map( SnapshotBranchRow.from_dict, self._execute_with_retries(statement, [snapshot_id, from_, limit]), )
[docs] @_prepared_select_statement( SnapshotBranchRow, "WHERE snapshot_id = ? AND name >= ? AND name < ? LIMIT ?" ) def snapshot_branch_get_range( self, snapshot_id: Sha1Git, from_: bytes, before: bytes, limit: int, *, statement, ) -> Iterable[SnapshotBranchRow]: return map( SnapshotBranchRow.from_dict, self._execute_with_retries(statement, [snapshot_id, from_, before, limit]), )
[docs] def snapshot_branch_get( self, snapshot_id: Sha1Git, from_: bytes, limit: int, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Iterable[SnapshotBranchRow]: prefix = branch_name_exclude_prefix if prefix is None: return self.snapshot_branch_get_from_name(snapshot_id, from_, limit) else: # get branches before the exclude prefix branches = list( self.snapshot_branch_get_range(snapshot_id, from_, prefix, limit) ) nb_branches = len(branches) # no need to execute that part if limit is reached # or if each bit of the prefix equals 1 if nb_branches < limit and prefix.replace(b"\xff", b"") != b"": # get branches after the exclude prefix and update list to return branches.extend( self.snapshot_branch_get_from_name( snapshot_id, _next_bytes_value(prefix), limit - nb_branches ) ) return branches
########################## # 'origin' table ##########################
[docs] @_prepared_insert_statement(OriginRow) def origin_add_one(self, origin: OriginRow, *, statement) -> None: self._add_one(statement, origin)
[docs] @_prepared_select_statement(OriginRow, "WHERE sha1 = ?") def origin_get_by_sha1(self, sha1: bytes, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [sha1]))
[docs] def origin_get_by_url(self, url: str) -> Iterable[OriginRow]: return self.origin_get_by_sha1(hash_url(url))
[docs] @_prepared_statement( f""" SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} FROM {OriginRow.TABLE} WHERE token(sha1) >= ? LIMIT ? """ ) def origin_list( self, start_token: int, limit: int, *, statement ) -> Iterable[Tuple[int, OriginRow]]: """Returns an iterable of (token, origin)""" return ( (row["tok"], OriginRow.from_dict(remove_keys(row, ("tok",)))) for row in self._execute_with_retries(statement, [start_token, limit]) )
[docs] @_prepared_select_statement(OriginRow) def origin_iter_all(self, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, []))
[docs] @_prepared_statement( f""" UPDATE {OriginRow.TABLE} SET next_visit_id=? WHERE sha1 = ? IF next_visit_id<? """ ) def origin_bump_next_visit_id( self, origin_url: str, visit_id: int, *, statement ) -> None: origin_sha1 = hash_url(origin_url) next_id = visit_id + 1 self._execute_with_retries(statement, [next_id, origin_sha1, next_id])
@_prepared_statement(f"SELECT next_visit_id FROM {OriginRow.TABLE} WHERE sha1 = ?") def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: rows = list(self._execute_with_retries(statement, [origin_sha1])) assert len(rows) == 1 # TODO: error handling return rows[0]["next_visit_id"]
[docs] @_prepared_statement( f""" UPDATE {OriginRow.TABLE} SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=? """ ) def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: origin_sha1 = hash_url(origin_url) next_id = self._origin_get_next_visit_id(origin_sha1) while True: res = list( self._execute_with_retries( statement, [next_id + 1, origin_sha1, next_id] ) ) assert len(res) == 1 if res[0]["[applied]"]: # No data race return next_id else: # Someone else updated it before we did, let's try again next_id = res[0]["next_visit_id"] # TODO: abort after too many attempts return next_id
########################## # 'origin_visit' table ##########################
[docs] @_prepared_select_statements( OriginVisitRow, { (True, ListOrder.ASC): ( "WHERE origin = ? AND visit > ? ORDER BY visit ASC LIMIT ?" ), (True, ListOrder.DESC): ( "WHERE origin = ? AND visit < ? ORDER BY visit DESC LIMIT ?" ), (False, ListOrder.ASC): "WHERE origin = ? ORDER BY visit ASC LIMIT ?", (False, ListOrder.DESC): "WHERE origin = ? ORDER BY visit DESC LIMIT ?", }, ) def origin_visit_get( self, origin_url: str, last_visit: Optional[int], limit: int, order: ListOrder, *, statements, ) -> Iterable[OriginVisitRow]: args: List[Any] = [origin_url] if last_visit is not None: args.append(last_visit) args.append(limit) statement = statements[(last_visit is not None, order)] return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, args) )
[docs] @_prepared_insert_statement(OriginVisitRow) def origin_visit_add_one(self, visit: OriginVisitRow, *, statement) -> None: self._add_one(statement, visit)
[docs] @_prepared_select_statement(OriginVisitRow, "WHERE origin = ? AND visit = ?") def origin_visit_get_one( self, origin_url: str, visit_id: int, *, statement ) -> Optional[OriginVisitRow]: # TODO: error handling rows = list(self._execute_with_retries(statement, [origin_url, visit_id])) if rows: return OriginVisitRow.from_dict(rows[0]) else: return None
[docs] @_prepared_select_statement(OriginVisitRow, "WHERE origin = ?") def origin_visit_get_all( self, origin_url: str, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [origin_url]), )
@_prepared_select_statement(OriginVisitRow, "WHERE token(origin) >= ?") def _origin_visit_iter_from( self, min_token: int, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [min_token]) ) @_prepared_select_statement(OriginVisitRow, "WHERE token(origin) < ?") def _origin_visit_iter_to( self, max_token: int, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [max_token]) )
[docs] def origin_visit_iter(self, start_token: int) -> Iterator[OriginVisitRow]: """Returns all origin visits in order from this token, and wraps around the token space.""" yield from self._origin_visit_iter_from(start_token) yield from self._origin_visit_iter_to(start_token)
########################## # 'origin_visit_status' table ##########################
[docs] @_prepared_select_statements( OriginVisitStatusRow, { (True, ListOrder.ASC): ( "WHERE origin = ? AND visit = ? AND date >= ? " "ORDER BY visit ASC LIMIT ?" ), (True, ListOrder.DESC): ( "WHERE origin = ? AND visit = ? AND date <= ? " "ORDER BY visit DESC LIMIT ?" ), (False, ListOrder.ASC): ( "WHERE origin = ? AND visit = ? ORDER BY visit ASC LIMIT ?" ), (False, ListOrder.DESC): ( "WHERE origin = ? AND visit = ? ORDER BY visit DESC LIMIT ?" ), }, ) def origin_visit_status_get_range( self, origin: str, visit: int, date_from: Optional[datetime.datetime], limit: int, order: ListOrder, *, statements, ) -> Iterable[OriginVisitStatusRow]: args: List[Any] = [origin, visit] if date_from is not None: args.append(date_from) args.append(limit) statement = statements[(date_from is not None, order)] return map( OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) )
[docs] @_prepared_insert_statement(OriginVisitStatusRow) def origin_visit_status_add_one( self, visit_update: OriginVisitStatusRow, *, statement ) -> None: self._add_one(statement, visit_update)
[docs] def origin_visit_status_get_latest( self, origin: str, visit: int, ) -> Optional[OriginVisitStatusRow]: """Given an origin visit id, return its latest origin_visit_status """ return next(self.origin_visit_status_get(origin, visit), None)
[docs] @_prepared_select_statement( OriginVisitStatusRow, # 'visit DESC,' is optional with Cassandra 4, but ScyllaDB needs it "WHERE origin = ? AND visit = ? ORDER BY visit DESC, date DESC", ) def origin_visit_status_get( self, origin: str, visit: int, *, statement, ) -> Iterator[OriginVisitStatusRow]: """Return all origin visit statuses for a given visit """ return map( OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, [origin, visit]), )
########################## # 'metadata_authority' table ##########################
[docs] @_prepared_insert_statement(MetadataAuthorityRow) def metadata_authority_add(self, authority: MetadataAuthorityRow, *, statement): self._add_one(statement, authority)
[docs] @_prepared_select_statement(MetadataAuthorityRow, "WHERE type = ? AND url = ?") def metadata_authority_get( self, type, url, *, statement ) -> Optional[MetadataAuthorityRow]: rows = list(self._execute_with_retries(statement, [type, url])) if rows: return MetadataAuthorityRow.from_dict(rows[0]) else: return None
########################## # 'metadata_fetcher' table ##########################
[docs] @_prepared_insert_statement(MetadataFetcherRow) def metadata_fetcher_add(self, fetcher, *, statement): self._add_one(statement, fetcher)
[docs] @_prepared_select_statement(MetadataFetcherRow, "WHERE name = ? AND version = ?") def metadata_fetcher_get( self, name, version, *, statement ) -> Optional[MetadataFetcherRow]: rows = list(self._execute_with_retries(statement, [name, version])) if rows: return MetadataFetcherRow.from_dict(rows[0]) else: return None
######################### # 'raw_extrinsic_metadata_by_id' table #########################
[docs] @_prepared_insert_statement(RawExtrinsicMetadataByIdRow) def raw_extrinsic_metadata_by_id_add(self, row, *, statement): self._add_one(statement, row)
[docs] @_prepared_select_statement(RawExtrinsicMetadataByIdRow, "WHERE id IN ?") def raw_extrinsic_metadata_get_by_ids( self, ids: List[Sha1Git], *, statement ) -> Iterable[RawExtrinsicMetadataByIdRow]: return map( RawExtrinsicMetadataByIdRow.from_dict, self._execute_with_retries(statement, [ids]), )
######################### # 'raw_extrinsic_metadata' table #########################
[docs] @_prepared_insert_statement(RawExtrinsicMetadataRow) def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata, *, statement): self._add_one(statement, raw_extrinsic_metadata)
[docs] @_prepared_select_statement( RawExtrinsicMetadataRow, "WHERE target=? AND authority_url=? AND discovery_date>? AND authority_type=?", ) def raw_extrinsic_metadata_get_after_date( self, target: str, authority_type: str, authority_url: str, after: datetime.datetime, *, statement, ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [target, authority_url, after, authority_type] ), )
[docs] @_prepared_select_statement( RawExtrinsicMetadataRow, # This is equivalent to: # WHERE target=? AND authority_type = ? AND authority_url = ? " # AND (discovery_date, id) > (?, ?)" # but it needs to be written this way to work with ScyllaDB. "WHERE target=? AND (authority_type, authority_url) <= (?, ?) " "AND (authority_type, authority_url, discovery_date, id) > (?, ?, ?, ?)", ) def raw_extrinsic_metadata_get_after_date_and_id( self, target: str, authority_type: str, authority_url: str, after_date: datetime.datetime, after_id: bytes, *, statement, ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [ target, authority_type, authority_url, authority_type, authority_url, after_date, after_id, ], ), )
[docs] @_prepared_select_statement( RawExtrinsicMetadataRow, "WHERE target=? AND authority_url=? AND authority_type=?", ) def raw_extrinsic_metadata_get( self, target: str, authority_type: str, authority_url: str, *, statement ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [target, authority_url, authority_type] ), )
[docs] @_prepared_statement( "SELECT authority_type, authority_url FROM raw_extrinsic_metadata " "WHERE target = ?" ) def raw_extrinsic_metadata_get_authorities( self, target: str, *, statement ) -> Iterable[Tuple[str, str]]: return ( (entry["authority_type"], entry["authority_url"]) for entry in self._execute_with_retries(statement, [target]) )
########################## # 'extid' table ########################## def _extid_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by extid_add_prepare, to be called when the extid row should be added to the primary table.""" self._execute_with_retries(statement, None)
[docs] @_prepared_insert_statement(ExtIDRow) def extid_add_prepare( self, extid: ExtIDRow, *, statement ) -> Tuple[int, Callable[[], None]]: statement = statement.bind(dataclasses.astuple(extid)) token_class = self._cluster.metadata.token_map.token_class token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._extid_add_finalize, statement) return (token, finalizer)
[docs] @_prepared_select_statement( ExtIDRow, "WHERE extid_type=? AND extid=? AND extid_version=? " "AND target_type=? AND target=?", ) def extid_get_from_pk( self, extid_type: str, extid: bytes, extid_version: int, target: CoreSWHID, *, statement, ) -> Optional[ExtIDRow]: rows = list( self._execute_with_retries( statement, [ extid_type, extid, extid_version, target.object_type.value, target.object_id, ], ), ) assert len(rows) <= 1 if rows: return ExtIDRow(**rows[0]) else: return None
[docs] @_prepared_select_statement( ExtIDRow, "WHERE token(extid_type, extid) = ?", ) def extid_get_from_token(self, token: int, *, statement) -> Iterable[ExtIDRow]: return map(ExtIDRow.from_dict, self._execute_with_retries(statement, [token]),)
# Rows are partitioned by token(extid_type, extid), then ordered (aka. "clustered") # by (extid_type, extid, extid_version, ...). This means that, without knowing the # exact extid_type and extid, we need to scan the whole partition; which should be # reasonably small. We can change the schema later if this becomes an issue
[docs] @_prepared_select_statement( ExtIDRow, "WHERE token(extid_type, extid) = ? AND extid_version = ? ALLOW FILTERING", ) def extid_get_from_token_and_extid_version( self, token: int, extid_version: int, *, statement ) -> Iterable[ExtIDRow]: return map( ExtIDRow.from_dict, self._execute_with_retries(statement, [token, extid_version]), )
[docs] @_prepared_select_statement( ExtIDRow, "WHERE extid_type=? AND extid=?", ) def extid_get_from_extid( self, extid_type: str, extid: bytes, *, statement ) -> Iterable[ExtIDRow]: return map( ExtIDRow.from_dict, self._execute_with_retries(statement, [extid_type, extid]), )
[docs] @_prepared_select_statement( ExtIDRow, "WHERE extid_type=? AND extid=? AND extid_version = ?", ) def extid_get_from_extid_and_version( self, extid_type: str, extid: bytes, extid_version: int, *, statement ) -> Iterable[ExtIDRow]: return map( ExtIDRow.from_dict, self._execute_with_retries(statement, [extid_type, extid, extid_version]), )
[docs] def extid_get_from_target( self, target_type: str, target: bytes, extid_type: Optional[str] = None, extid_version: Optional[int] = None, ) -> Iterable[ExtIDRow]: for token in self._extid_get_tokens_from_target(target_type, target): if token is not None: if extid_type is not None and extid_version is not None: extids = self.extid_get_from_token_and_extid_version( token, extid_version ) else: extids = self.extid_get_from_token(token) for extid in extids: # re-check the extid against target (in case of murmur3 collision) if ( extid is not None and extid.target_type == target_type and extid.target == target and ( (extid_version is None and extid_type is None) or ( ( extid_version is not None and extid.extid_version == extid_version and extid_type is not None and extid.extid_type == extid_type ) ) ) ): yield extid
########################## # 'extid_by_target' table ##########################
[docs] @_prepared_insert_statement(ExtIDByTargetRow) def extid_index_add_one(self, row: ExtIDByTargetRow, *, statement) -> None: """Adds a row mapping extid[target_type, target] to the token of the ExtID in the main 'extid' table.""" self._add_one(statement, row)
@_prepared_statement( f""" SELECT target_token FROM {ExtIDByTargetRow.TABLE} WHERE target_type = ? AND target = ? """ ) def _extid_get_tokens_from_target( self, target_type: str, target: bytes, *, statement ) -> Iterable[int]: return ( row["target_token"] for row in self._execute_with_retries(statement, [target_type, target]) ) ########################## # Miscellaneous ##########################
[docs] def stat_counters(self) -> Iterable[ObjectCountRow]: raise NotImplementedError( "stat_counters is not implemented by the Cassandra backend" )
[docs] @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") def check_read(self, *, statement): self._execute_with_retries(statement, [])