# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Classes representing tables in the Cassandra database.
They are very close to classes found in swh.model.model, but most of
them are subtly different:
* Large objects are split into other classes (eg. RevisionRow has no
'parents' field, because parents are stored in a different table,
represented by RevisionParentRow)
* They have a "cols" field, which returns the list of column names
of the table
* They only use types that map directly to Cassandra's schema (ie. no enums)
Therefore, this model doesn't reuse swh.model.model, except for types
that can be mapped to UDTs (Person and TimestampWithTimezone).
Fields may have :func:`dataclasses metadata <dataclasses.field>` keys ``fk``
if the existence of a corresponding row in a different table is almost guaranteed
(up to loaders not crashing and eventual-consistency settling down) and
``points_to`` if they are a Merkle-DAG link to another object (which is more likely
to be missing).
This is used by :func:`swh.storage.cassandra.diagram.dot_diagram`.
"""
import dataclasses
import datetime
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Dict,
List,
Optional,
Tuple,
Type,
TypeVar,
cast,
)
if TYPE_CHECKING:
from _typeshed import DataclassInstance
from cassandra.util import Date
from swh.model.model import Person, TimestampWithTimezone
MAGIC_NULL_PK = b"<null>"
"""
NULLs (or all-empty blobs) are not allowed in primary keys; instead we use a
special value that can't possibly be a valid hash.
"""
T = TypeVar("T", bound="BaseRow")
[docs]
def content_index_table_name(algo: str, skipped_content: bool) -> str:
"""Given an algorithm name, returns the name of one of the 'content_by_*'
and 'skipped_content_by_*' tables that serve as index for the 'content'
and 'skipped_content' tables based on this algorithm's hashes.
For now it is a simple substitution, but future versions may append a version
number to it, if needed for schema updates."""
if skipped_content:
return f"skipped_content_by_{algo}"
else:
return f"content_by_{algo}"
[docs]
class BaseRow:
TABLE: ClassVar[str]
PARTITION_KEY: ClassVar[Tuple[str, ...]]
CLUSTERING_KEY: ClassVar[Tuple[str, ...]] = ()
[docs]
@classmethod
def denullify_clustering_key(self, ck: Tuple) -> Tuple:
"""If this class has a Optional fields used as a clustering key, this replaces
such values from the given clustering key so it is suitable for sorting purposes
"""
return ck
[docs]
@classmethod
def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
return cls(**d)
[docs]
@classmethod
def cols(cls) -> List[str]:
return [
field.name for field in dataclasses.fields(cast("DataclassInstance", cls))
]
[docs]
def to_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(cast("DataclassInstance", self))
[docs]
@dataclasses.dataclass
class ContentRow(BaseRow):
TABLE = "content"
PARTITION_KEY: ClassVar[Tuple[str, ...]] = ("sha256",)
CLUSTERING_KEY = (
"sha1",
"sha1_git",
"blake2s256",
)
sha1: bytes
sha1_git: bytes
sha256: bytes
blake2s256: bytes
length: int
ctime: Optional[datetime.datetime]
"""creation time, i.e. time of (first) injection into the storage"""
status: str
[docs]
@dataclasses.dataclass
class SkippedContentRow(BaseRow):
TABLE = "skipped_content"
PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
sha1: Optional[bytes]
sha1_git: Optional[bytes]
sha256: Optional[bytes]
blake2s256: Optional[bytes]
length: Optional[int]
ctime: Optional[datetime.datetime]
"""creation time, i.e. time of (first) injection into the storage"""
status: str
reason: str
origin: str
[docs]
@classmethod
def denullify_clustering_key(self, ck: Tuple) -> Tuple:
return tuple(MAGIC_NULL_PK if v is None else v for v in ck)
[docs]
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "SkippedContentRow":
d = d.copy()
for k in ("sha1", "sha1_git", "sha256", "blake2s256"):
if d[k] == MAGIC_NULL_PK:
d[k] = None
return super().from_dict(d)
[docs]
@dataclasses.dataclass
class DirectoryRow(BaseRow):
TABLE = "directory"
PARTITION_KEY = ("id",)
id: bytes
raw_manifest: Optional[bytes]
"""NULL if the object can be rebuild from (sorted) entries"""
[docs]
@dataclasses.dataclass
class DirectoryEntryRow(BaseRow):
TABLE = "directory_entry"
PARTITION_KEY = ("directory_id",)
CLUSTERING_KEY = ("name",)
directory_id: bytes = dataclasses.field(metadata={"fk": ["directory.id"]})
name: bytes
"""path name, relative to containing dir"""
target: bytes = dataclasses.field(
metadata={
"points_to": [
"content.sha1_git",
"skipped_content.sha1_git",
"directory.id",
"revision.id",
]
}
)
perms: int
"""unix-like permissions"""
type: str
"""target type"""
[docs]
@dataclasses.dataclass
class RevisionRow(BaseRow):
TABLE = "revision"
PARTITION_KEY = ("id",)
id: bytes
date: Optional[TimestampWithTimezone]
committer_date: Optional[TimestampWithTimezone]
type: str
directory: bytes = dataclasses.field(metadata={"points_to": ["directory.id"]})
"""source code "root" directory"""
message: bytes
author: Person
committer: Person
synthetic: bool
"""true iff revision has been created by Software Heritage"""
metadata: str
"""extra metadata as JSON(tarball checksums, etc...)"""
extra_headers: dict
"""extra commit information as (tuple(key, value), ...)"""
raw_manifest: Optional[bytes]
"""NULL if the object can be rebuild from other cells and revision_parent."""
[docs]
@dataclasses.dataclass
class RevisionParentRow(BaseRow):
TABLE = "revision_parent"
PARTITION_KEY = ("id",)
CLUSTERING_KEY = ("parent_rank",)
id: bytes = dataclasses.field(metadata={"fk": ["revision.id"]})
parent_rank: int
"""parent position in merge commits, 0-based"""
parent_id: bytes = dataclasses.field(metadata={"points_to": ["revision.id"]})
[docs]
@dataclasses.dataclass
class ReleaseRow(BaseRow):
TABLE = "release"
PARTITION_KEY = ("id",)
id: bytes
target_type: str
target: bytes = dataclasses.field(
metadata={
"points_to": [
"content.sha1_git",
"skipped_content.sha1_git",
"directory.id",
"revision.id",
]
}
)
date: TimestampWithTimezone
name: bytes
message: bytes
author: Person
synthetic: bool
"""true iff release has been created by Software Heritage"""
raw_manifest: Optional[bytes]
"""NULL if the object can be rebuild from other cells"""
[docs]
@dataclasses.dataclass
class SnapshotRow(BaseRow):
TABLE = "snapshot"
PARTITION_KEY = ("id",)
id: bytes
[docs]
@dataclasses.dataclass
class SnapshotBranchRow(BaseRow):
"""
For a given snapshot_id, branches are sorted by their name,
allowing easy pagination.
"""
TABLE = "snapshot_branch"
PARTITION_KEY = ("snapshot_id",)
CLUSTERING_KEY = ("name",)
snapshot_id: bytes = dataclasses.field(metadata={"fk": ["snapshot.id"]})
name: bytes
target_type: Optional[str]
target: Optional[bytes] = dataclasses.field(
metadata={
"points_to": [
"content.sha1_git",
"skipped_content.sha1_git",
"revision.id",
"release.id",
]
}
)
[docs]
@dataclasses.dataclass
class OriginVisitRow(BaseRow):
TABLE = "origin_visit"
PARTITION_KEY = ("origin",)
CLUSTERING_KEY = ("visit",)
origin: str = dataclasses.field(metadata={"fk": ["origin.url"]})
visit: int
date: datetime.datetime
type: str
[docs]
@dataclasses.dataclass
class OriginVisitStatusRow(BaseRow):
TABLE = "origin_visit_status"
PARTITION_KEY = ("origin",)
CLUSTERING_KEY = ("visit", "date")
origin: str = dataclasses.field(metadata={"fk": ["origin_visit.origin"]})
visit: int = dataclasses.field(metadata={"fk": ["origin_visit.visit"]})
date: datetime.datetime
type: str
status: str
metadata: str
snapshot: bytes = dataclasses.field(metadata={"fk": ["snapshot.id"]})
[docs]
@classmethod
def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
return cls(**d)
[docs]
@dataclasses.dataclass
class OriginRow(BaseRow):
TABLE = "origin"
PARTITION_KEY = ("sha1",)
sha1: bytes
url: str
next_visit_id: int
"""
We need integer visit ids for compatibility with the pgsql
storage, so we're using lightweight transactions with this trick:
https://stackoverflow.com/a/29391877/539465
"""
[docs]
@dataclasses.dataclass
class ObjectCountRow(BaseRow):
TABLE = "object_count"
PARTITION_KEY = ("partition_key",)
CLUSTERING_KEY = ("object_type",)
partition_key: int
object_type: str
count: int
[docs]
@dataclasses.dataclass
class ExtIDRow(BaseRow):
TABLE = "extid"
PARTITION_KEY = ("extid_type", "extid")
CLUSTERING_KEY = ("extid_version", "target_type", "target")
extid_type: str
extid: bytes
extid_version: int
target_type: str
target: bytes
[docs]
@dataclasses.dataclass
class ExtIDByTargetRow(BaseRow):
TABLE = "extid_by_target"
PARTITION_KEY = ("target_type", "target")
CLUSTERING_KEY = ("target_token",)
target_type: str
target: bytes = dataclasses.field(metadata={"fk": ["extid.target"]})
target_token: int
"""value of token(pk) on the "primary" table"""
[docs]
@dataclasses.dataclass(frozen=True)
class ObjectReferenceRow(BaseRow):
TABLE = "object_references"
PARTITION_KEY = ("target_type", "target")
CLUSTERING_KEY = ("source_type", "source")
target_type: str
target: bytes
source_type: str
source: bytes
[docs]
@dataclasses.dataclass(frozen=True)
class ObjectReferencesTableRow(BaseRow):
TABLE = "object_references_table"
PARTITION_KEY = ("pk",)
CLUSTERING_KEY = ("name",)
pk: int
"""always zero, puts everything in the same Cassandra partition for faster querying"""
name: str
year: int
"""ISO year."""
week: int
"""ISO week."""
start: Date
end: Date