Source code for swh.scrubber.journal_checker
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Reads all objects in a swh-storage instance and recomputes their checksums."""
import logging
from typing import Any, Dict, List
from swh.journal.client import get_journal_client
from swh.journal.serializers import kafka_to_value
from swh.model import model
from .db import Datastore, ScrubberDb
logger = logging.getLogger(__name__)
[docs]class JournalChecker:
"""Reads a chunk of a swh-storage database, recomputes checksums, and
reports errors in a separate database."""
_datastore = None
def __init__(self, db: ScrubberDb, journal: Dict[str, Any]):
self.db = db
self.journal_client_config = journal
self.journal_client = get_journal_client(
**journal,
# Remove default deserializer; so process_kafka_values() gets the message
# verbatim so it can archive it with as few modifications a possible.
value_deserializer=lambda obj_type, msg: msg,
)
[docs] def datastore_info(self) -> Datastore:
"""Returns a :class:`Datastore` instance representing the journal instance
being checked."""
if self._datastore is None:
config = self.journal_client_config
if config["cls"] == "kafka":
self._datastore = Datastore(
package="journal",
cls="kafka",
instance=(
f"brokers={config['brokers']!r} prefix={config['prefix']!r}"
),
)
else:
raise NotImplementedError(
f"StorageChecker(journal_client={self.journal_client_config!r})"
f".datastore()"
)
return self._datastore
[docs] def run(self):
"""Runs a journal client with the given configuration.
This method does not return, unless otherwise configured (with ``stop_on_eof``).
"""
self.journal_client.process(self.process_kafka_messages)
[docs] def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
for (object_type, messages) in all_messages.items():
cls = getattr(model, object_type.capitalize())
for message in messages:
object_ = cls.from_dict(kafka_to_value(message))
real_id = object_.compute_hash()
if object_.id != real_id:
self.db.corrupt_object_add(
object_.swhid(), self.datastore_info(), message
)