Source code for swh.scrubber.objstorage_checker

# Copyright (C) 2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import json
import logging
from typing import Any, Dict, List, Optional, Protocol

from swh.core.statsd import Statsd
from swh.journal.client import get_journal_client
from swh.journal.serializers import kafka_to_value, value_to_kafka
from swh.model.model import Content
from swh.model.swhids import ObjectType
from swh.objstorage.exc import ObjCorruptedError, ObjNotFoundError
from swh.objstorage.interface import ObjStorageInterface, objid_from_dict
from swh.storage.interface import StorageInterface

from .base_checker import BaseChecker, BasePartitionChecker
from .db import ConfigEntry, Datastore, ScrubberDb

logger = logging.getLogger(__name__)


[docs] def get_objstorage_datastore(objstorage_config): objstorage_config = dict(objstorage_config) return Datastore( package="objstorage", cls=objstorage_config.pop("cls"), instance=json.dumps(objstorage_config), )
[docs] class ObjectStorageCheckerProtocol(Protocol): db: ScrubberDb objstorage: ObjStorageInterface @property def config(self) -> ConfigEntry: ... @property def statsd(self) -> Statsd: ...
[docs] class ContentCheckerMixin(ObjectStorageCheckerProtocol): """Mixin class implementing content checks used by object storage checkers."""
[docs] def check_content(self, content: Content) -> None: """Checks if a content exists in an object storage (if ``check_references`` is set to :const:`True` in checker config) or if a content is corrupted in an object storage (if ``check_hashes`` is set to :const:`True` in checker config). """ content_hashes = objid_from_dict(content.hashes()) try: self.objstorage.check(content_hashes) except ObjNotFoundError: if self.config.check_references: self.statsd.increment("missing_object_total") self.db.missing_object_add( id=content.swhid(), reference_ids={}, config=self.config ) except ObjCorruptedError: if self.config.check_hashes: self.statsd.increment("hash_mismatch_total") self.db.corrupt_object_add( id=content.swhid(), config=self.config, serialized_object=value_to_kafka(content.to_dict()), )
[docs] class ObjectStorageCheckerFromStoragePartition( BasePartitionChecker, ContentCheckerMixin ): """A partition based checker to detect missing and corrupted contents in an object storage. It iterates on content objects referenced in a storage instance, check they are available in a given object storage instance (if ``check_references`` is set to :const:`True` in checker config) then retrieve their bytes from it in order to recompute checksums and detect corruptions (if ``check_hashes`` is set to :const:`True` in checker config).""" def __init__( self, db: ScrubberDb, config_id: int, storage: StorageInterface, objstorage: Optional[ObjStorageInterface] = None, limit: int = 0, ): super().__init__(db=db, config_id=config_id, limit=limit) self.storage = storage self.objstorage = ( objstorage if objstorage is not None else getattr(storage, "objstorage") ) object_type = self.object_type.name.lower() if object_type != "content": raise ValueError( "ObjectStorageCheckerFromStoragePartition can only check objects of type " f"content, checking objects of type {object_type} is not supported." ) if self.objstorage is None: raise ValueError( "An object storage must be provided to ObjectStorageCheckerFromStoragePartition." # noqa )
[docs] def check_partition(self, object_type: ObjectType, partition_id: int) -> None: page_token = None while True: page = self.storage.content_get_partition( partition_id=partition_id, nb_partitions=self.nb_partitions, page_token=page_token, ) contents = page.results with self.statsd.timed( "batch_duration_seconds", tags={"operation": "check_hashes"} ): logger.debug("Checking %s content object hashes", len(contents)) for content in contents: self.check_content(content) page_token = page.next_page_token if page_token is None: break
[docs] class ObjectStorageCheckerFromJournal(BaseChecker, ContentCheckerMixin): """A journal based checker to detect missing and corrupted contents in an object storage. It iterates on content objects referenced in a kafka topic, check they are available in a given object storage instance then retrieve their bytes from it in order to recompute checksums and detect corruptions.""" def __init__( self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any], objstorage: ObjStorageInterface, ): super().__init__(db=db, config_id=config_id) self.objstorage = objstorage object_type = self.object_type.name.lower() if object_type != "content": raise ValueError( "ObjectStorageCheckerFromJournal can only check objects of type content," f"checking objects of type {object_type} is not supported." ) self.journal_client_config = journal_client_config.copy() if "object_types" in self.journal_client_config: raise ValueError( "The journal_client configuration entry should not define the " "object_types field; this is handled by the scrubber configuration entry" ) self.journal_client_config["object_types"] = [object_type] self.journal_client = get_journal_client( **self.journal_client_config, # Remove default deserializer; so process_kafka_values() gets the message # verbatim so it can archive it with as few modifications a possible. value_deserializer=lambda obj_type, msg: msg, )
[docs] def run(self) -> None: self.journal_client.process(self.process_kafka_messages)
[docs] def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]): for message in all_messages["content"]: self.check_content(Content.from_dict(kafka_to_value(message)))