Source code for swh.storage.cassandra.model

# Copyright (C) 2020-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Classes representing tables in the Cassandra database.

They are very close to classes found in swh.model.model, but most of
them are subtly different:

* Large objects are split into other classes (eg. RevisionRow has no
  'parents' field, because parents are stored in a different table,
  represented by RevisionParentRow)
* They have a "cols" field, which returns the list of column names
  of the table
* They only use types that map directly to Cassandra's schema (ie. no enums)

Therefore, this model doesn't reuse swh.model.model, except for types
that can be mapped to UDTs (Person and TimestampWithTimezone).

Fields may have :func:`dataclasses metadata <dataclasses.field>` keys ``fk``
if the existence of a corresponding row in a different table is almost guaranteed
(up to loaders not crashing and eventual-consistency settling down) and
``points_to`` if they are a Merkle-DAG link to another object (which is more likely
to be missing).
This is used by :func:`swh.storage.cassandra.diagram.dot_diagram`.
"""

import dataclasses
import datetime
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Dict,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    cast,
)

if TYPE_CHECKING:
    from _typeshed import DataclassInstance

from cassandra.util import Date

from swh.model.model import Person, TimestampWithTimezone

MAGIC_NULL_PK = b"<null>"
"""
NULLs (or all-empty blobs) are not allowed in primary keys; instead we use a
special value that can't possibly be a valid hash.
"""


T = TypeVar("T", bound="BaseRow")


[docs] def content_index_table_name(algo: str, skipped_content: bool) -> str: """Given an algorithm name, returns the name of one of the 'content_by_*' and 'skipped_content_by_*' tables that serve as index for the 'content' and 'skipped_content' tables based on this algorithm's hashes. For now it is a simple substitution, but future versions may append a version number to it, if needed for schema updates.""" if skipped_content: return f"skipped_content_by_{algo}" else: return f"content_by_{algo}"
[docs] class BaseRow: TABLE: ClassVar[str] PARTITION_KEY: ClassVar[Tuple[str, ...]] CLUSTERING_KEY: ClassVar[Tuple[str, ...]] = ()
[docs] @classmethod def denullify_clustering_key(self, ck: Tuple) -> Tuple: """If this class has a Optional fields used as a clustering key, this replaces such values from the given clustering key so it is suitable for sorting purposes """ return ck
[docs] @classmethod def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: return cls(**d)
[docs] @classmethod def cols(cls) -> List[str]: return [ field.name for field in dataclasses.fields(cast("DataclassInstance", cls)) ]
[docs] def to_dict(self) -> Dict[str, Any]: return dataclasses.asdict(cast("DataclassInstance", self))
[docs] @dataclasses.dataclass class ContentRow(BaseRow): TABLE = "content" PARTITION_KEY: ClassVar[Tuple[str, ...]] = ("sha256",) CLUSTERING_KEY = ( "sha1", "sha1_git", "blake2s256", ) sha1: bytes sha1_git: bytes sha256: bytes blake2s256: bytes length: int ctime: Optional[datetime.datetime] """creation time, i.e. time of (first) injection into the storage""" status: str
[docs] @dataclasses.dataclass class SkippedContentRow(BaseRow): TABLE = "skipped_content" PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256") sha1: Optional[bytes] sha1_git: Optional[bytes] sha256: Optional[bytes] blake2s256: Optional[bytes] length: Optional[int] ctime: Optional[datetime.datetime] """creation time, i.e. time of (first) injection into the storage""" status: str reason: str origin: str
[docs] @classmethod def denullify_clustering_key(self, ck: Tuple) -> Tuple: return tuple(MAGIC_NULL_PK if v is None else v for v in ck)
[docs] @classmethod def from_dict(cls, d: Dict[str, Any]) -> "SkippedContentRow": d = d.copy() for k in ("sha1", "sha1_git", "sha256", "blake2s256"): if d[k] == MAGIC_NULL_PK: d[k] = None return super().from_dict(d)
[docs] @dataclasses.dataclass class DirectoryRow(BaseRow): TABLE = "directory" PARTITION_KEY = ("id",) id: bytes raw_manifest: Optional[bytes] """NULL if the object can be rebuild from (sorted) entries"""
[docs] @dataclasses.dataclass class DirectoryEntryRow(BaseRow): TABLE = "directory_entry" PARTITION_KEY = ("directory_id",) CLUSTERING_KEY = ("name",) directory_id: bytes = dataclasses.field(metadata={"fk": ["directory.id"]}) name: bytes """path name, relative to containing dir""" target: bytes = dataclasses.field( metadata={ "points_to": [ "content.sha1_git", "skipped_content.sha1_git", "directory.id", "revision.id", ] } ) perms: int """unix-like permissions""" type: str """target type"""
[docs] @dataclasses.dataclass class RevisionRow(BaseRow): TABLE = "revision" PARTITION_KEY = ("id",) id: bytes date: Optional[TimestampWithTimezone] committer_date: Optional[TimestampWithTimezone] type: str directory: bytes = dataclasses.field(metadata={"points_to": ["directory.id"]}) """source code "root" directory""" message: bytes author: Person committer: Person synthetic: bool """true iff revision has been created by Software Heritage""" metadata: str """extra metadata as JSON(tarball checksums, etc...)""" extra_headers: dict """extra commit information as (tuple(key, value), ...)""" raw_manifest: Optional[bytes] """NULL if the object can be rebuild from other cells and revision_parent."""
[docs] @dataclasses.dataclass class RevisionParentRow(BaseRow): TABLE = "revision_parent" PARTITION_KEY = ("id",) CLUSTERING_KEY = ("parent_rank",) id: bytes = dataclasses.field(metadata={"fk": ["revision.id"]}) parent_rank: int """parent position in merge commits, 0-based""" parent_id: bytes = dataclasses.field(metadata={"points_to": ["revision.id"]})
[docs] @dataclasses.dataclass class ReleaseRow(BaseRow): TABLE = "release" PARTITION_KEY = ("id",) id: bytes target_type: str target: bytes = dataclasses.field( metadata={ "points_to": [ "content.sha1_git", "skipped_content.sha1_git", "directory.id", "revision.id", ] } ) date: TimestampWithTimezone name: bytes message: bytes author: Person synthetic: bool """true iff release has been created by Software Heritage""" raw_manifest: Optional[bytes] """NULL if the object can be rebuild from other cells"""
[docs] @dataclasses.dataclass class SnapshotRow(BaseRow): TABLE = "snapshot" PARTITION_KEY = ("id",) id: bytes
[docs] @dataclasses.dataclass class SnapshotBranchRow(BaseRow): """ For a given snapshot_id, branches are sorted by their name, allowing easy pagination. """ TABLE = "snapshot_branch" PARTITION_KEY = ("snapshot_id",) CLUSTERING_KEY = ("name",) snapshot_id: bytes = dataclasses.field(metadata={"fk": ["snapshot.id"]}) name: bytes target_type: Optional[str] target: Optional[bytes] = dataclasses.field( metadata={ "points_to": [ "content.sha1_git", "skipped_content.sha1_git", "revision.id", "release.id", ] } )
[docs] @dataclasses.dataclass class OriginVisitRow(BaseRow): TABLE = "origin_visit" PARTITION_KEY = ("origin",) CLUSTERING_KEY = ("visit",) origin: str = dataclasses.field(metadata={"fk": ["origin.url"]}) visit: int date: datetime.datetime type: str
[docs] @dataclasses.dataclass class OriginVisitStatusRow(BaseRow): TABLE = "origin_visit_status" PARTITION_KEY = ("origin",) CLUSTERING_KEY = ("visit", "date") origin: str = dataclasses.field(metadata={"fk": ["origin_visit.origin"]}) visit: int = dataclasses.field(metadata={"fk": ["origin_visit.visit"]}) date: datetime.datetime type: str status: str metadata: str snapshot: bytes = dataclasses.field(metadata={"fk": ["snapshot.id"]})
[docs] @classmethod def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: return cls(**d)
[docs] @dataclasses.dataclass class OriginRow(BaseRow): TABLE = "origin" PARTITION_KEY = ("sha1",) sha1: bytes url: str next_visit_id: int """ We need integer visit ids for compatibility with the pgsql storage, so we're using lightweight transactions with this trick: https://stackoverflow.com/a/29391877/539465 """
[docs] @dataclasses.dataclass class MetadataAuthorityRow(BaseRow): TABLE = "metadata_authority" PARTITION_KEY = ("url",) CLUSTERING_KEY = ("type",) url: str type: str
[docs] @dataclasses.dataclass class MetadataFetcherRow(BaseRow): TABLE = "metadata_fetcher" PARTITION_KEY = ("name",) CLUSTERING_KEY = ("version",) name: str version: str
[docs] @dataclasses.dataclass class RawExtrinsicMetadataRow(BaseRow): """ An explanation is in order for the primary key: Intuitively, the primary key should only be 'id', because two metadata entries are the same iff the id is the same; and 'id' is used for deduplication. However, we also want to query by (target, authority_type, authority_url, discovery_date) The naive solution to this would be an extra table, to use as index; but it means 1. extra code to keep them in sync 2. overhead when writing 3. overhead + random reads (instead of linear) when reading. Therefore, we use a single table for both, by adding the column we want to query with before the id. It solves both a) the query/order issues and b) the uniqueness issue because: a) adding the id at the end of the primary key does not change the rows' order: for two different rows, id1 != id2, so (target1, ..., date1) < (target2, ..., date2) <=> (target1, ..., date1, id1) < (target2, ..., date2, id2) b) the id is a hash of all the columns, so: rows are the same <=> id1 == id2 <=> (target1, ..., date1, id1) == (target2, ..., date2, id2) """ TABLE = "raw_extrinsic_metadata" PARTITION_KEY = ("target",) CLUSTERING_KEY = ( "authority_type", "authority_url", "discovery_date", "id", ) id: bytes type: str target: str # metadata source: authority_type: str = dataclasses.field( metadata={"fk": ["metadata_authority.type"]} ) authority_url: str = dataclasses.field(metadata={"fk": ["metadata_authority.url"]}) discovery_date: datetime.datetime fetcher_name: str = dataclasses.field(metadata={"fk": ["metadata_fetcher.name"]}) fetcher_version: str = dataclasses.field( metadata={"fk": ["metadata_fetcher.version"]} ) # metadata itself: format: str metadata: bytes # context: # The following keys are kept optional but extra effort is made to avoid setting # those None values to null in cassandra. Otherwise, that would end up churning on # cleaning up (all the time) origin: Optional[str] visit: Optional[int] snapshot: Optional[str] release: Optional[str] revision: Optional[str] path: Optional[bytes] directory: Optional[str]
[docs] @dataclasses.dataclass class RawExtrinsicMetadataByIdRow(BaseRow): TABLE = "raw_extrinsic_metadata_by_id" PARTITION_KEY = ("id",) CLUSTERING_KEY = () id: bytes = dataclasses.field(metadata={"fk": ["raw_extrinsic_metadata.id"]}) target: str = dataclasses.field(metadata={"fk": ["raw_extrinsic_metadata.target"]}) authority_type: str authority_url: str
[docs] @dataclasses.dataclass class ObjectCountRow(BaseRow): TABLE = "object_count" PARTITION_KEY = ("partition_key",) CLUSTERING_KEY = ("object_type",) partition_key: int object_type: str count: int
[docs] @dataclasses.dataclass class ExtIDRow(BaseRow): TABLE = "extid" PARTITION_KEY = ("extid_type", "extid") CLUSTERING_KEY = ("extid_version", "target_type", "target") extid_type: str extid: bytes extid_version: int target_type: str target: bytes
[docs] @dataclasses.dataclass class ExtIDByTargetRow(BaseRow): TABLE = "extid_by_target" PARTITION_KEY = ("target_type", "target") CLUSTERING_KEY = ("target_token",) target_type: str target: bytes = dataclasses.field(metadata={"fk": ["extid.target"]}) target_token: int """value of token(pk) on the "primary" table"""
[docs] @dataclasses.dataclass(frozen=True) class ObjectReferenceRow(BaseRow): TABLE = "object_references" PARTITION_KEY = ("target_type", "target") CLUSTERING_KEY = ("source_type", "source") target_type: str target: bytes source_type: str source: bytes
[docs] @dataclasses.dataclass(frozen=True) class ObjectReferencesTableRow(BaseRow): TABLE = "object_references_table" PARTITION_KEY = ("pk",) CLUSTERING_KEY = ("name",) pk: int """always zero, puts everything in the same Cassandra partition for faster querying""" name: str year: int """ISO year.""" week: int """ISO week.""" start: Date end: Date