Source code for swh.scrubber.storage_checker

# Copyright (C) 2021-2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Reads all objects in a swh-storage instance and recomputes their checksums."""

import collections
import contextlib
from itertools import count, islice
import json
import logging
from typing import Iterable, Optional, Tuple, Union

import psycopg2
import tenacity

from swh.core.statsd import Statsd
from swh.journal.serializers import value_to_kafka
from swh.model import swhids
from swh.model.model import (
    Content,
    Directory,
    ObjectType,
    Release,
    Revision,
    Snapshot,
    TargetType,
)
from swh.storage.algos.directory import (
    directory_get_many_with_possibly_duplicated_entries,
)
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.storage.cassandra.storage import CassandraStorage
from swh.storage.interface import StorageInterface
from swh.storage.postgresql.storage import Storage as PostgresqlStorage

from .db import ConfigEntry, Datastore, ScrubberDb

logger = logging.getLogger(__name__)

ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content]


[docs] @contextlib.contextmanager def postgresql_storage_db(storage): db = storage.get_db() try: yield db finally: storage.put_db(db)
def _get_inclusive_range_swhids( inclusive_range_start: Optional[bytes], exclusive_range_end: Optional[bytes], object_type: swhids.ObjectType, ) -> Tuple[swhids.CoreSWHID, swhids.CoreSWHID]: r""" Given a ``[range_start, range_end)`` right-open interval of id prefixes and an object type (as returned by :const:`swh.storage.backfill.RANGE_GENERATORS`), returns a ``[range_start_swhid, range_end_swhid]`` closed interval of SWHIDs suitable for the scrubber database. >>> _get_inclusive_range_swhids(b"\x42", None, swhids.ObjectType.SNAPSHOT) (CoreSWHID.from_string('swh:1:snp:4200000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:snp:ffffffffffffffffffffffffffffffffffffffff')) >>> _get_inclusive_range_swhids(b"\x00", b"\x12\x34", swhids.ObjectType.REVISION) (CoreSWHID.from_string('swh:1:rev:0000000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:rev:1233ffffffffffffffffffffffffffffffffffff')) """ # noqa range_start_swhid = swhids.CoreSWHID( object_type=object_type, object_id=(inclusive_range_start or b"").ljust(20, b"\00"), ) if exclusive_range_end is None: inclusive_range_end = b"\xff" * 20 else: # convert "1230000000..." to "122fffffff..." inclusive_range_end = ( int.from_bytes(exclusive_range_end.ljust(20, b"\x00"), "big") - 1 ).to_bytes(20, "big") range_end_swhid = swhids.CoreSWHID( object_type=object_type, object_id=inclusive_range_end, ) return (range_start_swhid, range_end_swhid)
[docs] def get_datastore(storage) -> Datastore: if isinstance(storage, PostgresqlStorage): with postgresql_storage_db(storage) as db: datastore = Datastore( package="storage", cls="postgresql", instance=db.conn.dsn, ) elif isinstance(storage, CassandraStorage): datastore = Datastore( package="storage", cls="cassandra", instance=json.dumps( { "keyspace": storage.keyspace, "hosts": storage.hosts, "port": storage.port, } ), ) else: raise NotImplementedError(f"StorageChecker(storage={storage!r}).datastore()") return datastore
[docs] class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" def __init__( self, db: ScrubberDb, config_id: int, storage: StorageInterface, limit: int = 0 ): self.db = db self.storage = storage self.config_id = config_id self.limit = limit self._config: Optional[ConfigEntry] = None self._statsd: Optional[Statsd] = None @property def config(self) -> ConfigEntry: if self._config is None: self._config = self.db.config_get(self.config_id) assert self._config is not None return self._config @property def nb_partitions(self) -> int: return self.config.nb_partitions @property def object_type(self) -> swhids.ObjectType: return self.config.object_type @property def check_hashes(self) -> bool: return self.config.check_hashes @property def check_references(self) -> bool: return self.config.check_references @property def datastore(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" return self.config.datastore @property def statsd(self) -> Statsd: if self._statsd is None: self._statsd = Statsd( namespace="swh_scrubber", constant_tags={ "object_type": self.object_type, "nb_partitions": self.nb_partitions, "datastore_package": self.datastore.package, "datastore_cls": self.datastore.cls, }, ) return self._statsd
[docs] def run(self) -> None: """Runs on all objects of ``object_type`` in a partition between ``start_partition_id`` (inclusive) and ``end_partition_id`` (exclusive) """ counter: Iterable[int] = count() if self.limit: counter = islice(counter, 0, self.limit) for i, partition_id in zip( counter, self.db.checked_partition_iter_next(self.config_id) ): logger.debug( "Processing %s partition %d/%d", self.object_type, partition_id, self.nb_partitions, ) self._check_partition(self.object_type, partition_id) self.db.checked_partition_upsert( self.config_id, partition_id, )
@tenacity.retry( retry=tenacity.retry_if_exception_type(psycopg2.OperationalError), wait=tenacity.wait_random_exponential(min=10, max=180), ) def _check_partition( self, object_type: swhids.ObjectType, partition_id: int ) -> None: page_token = None while True: if object_type in (swhids.ObjectType.RELEASE, swhids.ObjectType.REVISION): method = getattr( self.storage, f"{self.object_type.name.lower()}_get_partition" ) page = method(partition_id, self.nb_partitions, page_token=page_token) objects = page.results elif object_type == swhids.ObjectType.DIRECTORY: page = self.storage.directory_get_id_partition( partition_id, self.nb_partitions, page_token=page_token ) directory_ids = page.results objects = [] for dir_id, item in zip( directory_ids, directory_get_many_with_possibly_duplicated_entries( self.storage, directory_ids ), ): assert item is not None, f"Directory {dir_id.hex()} disappeared" (has_duplicate_entries, object_) = item if has_duplicate_entries: self.statsd.increment("duplicate_directory_entries_total") self.db.corrupt_object_add( object_.swhid(), self.config, value_to_kafka(object_.to_dict()), ) objects.append(object_) elif object_type == swhids.ObjectType.SNAPSHOT: page = self.storage.snapshot_get_id_partition( partition_id, self.nb_partitions, page_token=page_token ) snapshot_ids = page.results objects = [ snapshot_get_all_branches(self.storage, snapshot_id) for snapshot_id in snapshot_ids ] else: assert False, f"Unexpected object type: {object_type}" if self.check_hashes: with self.statsd.timed( "batch_duration_seconds", tags={"operation": "check_hashes"} ): logger.debug( "Checking %s %s object hashes", len(objects), object_type ) self.check_object_hashes(objects) if self.check_references: with self.statsd.timed( "batch_duration_seconds", tags={"operation": "check_references"} ): logger.debug( "Checking %s %s object references", len(objects), object_type ) self.check_object_references(objects) page_token = page.next_page_token if page_token is None: break
[docs] def check_object_hashes(self, objects: Iterable[ScrubbableObject]): """Recomputes hashes, and reports mismatches.""" count = 0 for object_ in objects: if isinstance(object_, Content): # TODO continue real_id = object_.compute_hash() count += 1 if object_.id != real_id: self.statsd.increment("hash_mismatch_total") self.db.corrupt_object_add( object_.swhid(), self.config, value_to_kafka(object_.to_dict()), ) if count: self.statsd.increment("objects_hashed_total", count)
[docs] def check_object_references(self, objects: Iterable[ScrubbableObject]): """Check all objects references by these objects exist.""" cnt_references = collections.defaultdict(set) dir_references = collections.defaultdict(set) rev_references = collections.defaultdict(set) rel_references = collections.defaultdict(set) snp_references = collections.defaultdict(set) for object_ in objects: swhid = object_.swhid() if isinstance(object_, Content): pass elif isinstance(object_, Directory): for entry in object_.entries: if entry.type == "file": cnt_references[entry.target].add(swhid) elif entry.type == "dir": dir_references[entry.target].add(swhid) elif entry.type == "rev": # dir->rev holes are not considered a problem because they # happen whenever git submodules point to repositories that # were not loaded yet; ignore them pass else: assert False, entry elif isinstance(object_, Revision): dir_references[object_.directory].add(swhid) for parent in object_.parents: rev_references[parent].add(swhid) elif isinstance(object_, Release): if object_.target is None: pass elif object_.target_type == ObjectType.CONTENT: cnt_references[object_.target].add(swhid) elif object_.target_type == ObjectType.DIRECTORY: dir_references[object_.target].add(swhid) elif object_.target_type == ObjectType.REVISION: rev_references[object_.target].add(swhid) elif object_.target_type == ObjectType.RELEASE: rel_references[object_.target].add(swhid) else: assert False, object_ elif isinstance(object_, Snapshot): for branch in object_.branches.values(): if branch is None: pass elif branch.target_type == TargetType.ALIAS: pass elif branch.target_type == TargetType.CONTENT: cnt_references[branch.target].add(swhid) elif branch.target_type == TargetType.DIRECTORY: dir_references[branch.target].add(swhid) elif branch.target_type == TargetType.REVISION: rev_references[branch.target].add(swhid) elif branch.target_type == TargetType.RELEASE: rel_references[branch.target].add(swhid) elif branch.target_type == TargetType.SNAPSHOT: snp_references[branch.target].add(swhid) else: assert False, (str(object_.swhid()), branch) else: assert False, object_.swhid() missing_cnts = set( self.storage.content_missing_per_sha1_git(list(cnt_references)) ) missing_dirs = set(self.storage.directory_missing(list(dir_references))) missing_revs = set(self.storage.revision_missing(list(rev_references))) missing_rels = set(self.storage.release_missing(list(rel_references))) missing_snps = set(self.storage.snapshot_missing(list(snp_references))) self.statsd.increment( "missing_object_total", len(missing_cnts), tags={"target_object_type": "content"}, ) self.statsd.increment( "missing_object_total", len(missing_dirs), tags={"target_object_type": "directory"}, ) self.statsd.increment( "missing_object_total", len(missing_revs), tags={"target_object_type": "revision"}, ) self.statsd.increment( "missing_object_total", len(missing_rels), tags={"target_object_type": "release"}, ) self.statsd.increment( "missing_object_total", len(missing_snps), tags={"target_object_type": "snapshot"}, ) for missing_id in missing_cnts: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.CONTENT, object_id=missing_id ) self.db.missing_object_add( missing_swhid, cnt_references[missing_id], self.config ) for missing_id in missing_dirs: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id ) self.db.missing_object_add( missing_swhid, dir_references[missing_id], self.config ) for missing_id in missing_revs: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.REVISION, object_id=missing_id ) self.db.missing_object_add( missing_swhid, rev_references[missing_id], self.config ) for missing_id in missing_rels: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.RELEASE, object_id=missing_id ) self.db.missing_object_add( missing_swhid, rel_references[missing_id], self.config ) for missing_id in missing_snps: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id ) self.db.missing_object_add( missing_swhid, snp_references[missing_id], self.config )