Source code for swh.scrubber.journal_checker
# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Reads all objects in a swh-storage instance and recomputes their checksums."""
import json
import logging
from typing import Any, Dict, List
import attr
from swh.journal.client import get_journal_client
from swh.journal.serializers import kafka_to_value
from swh.model import model
from .base_checker import BaseChecker
from .db import Datastore, ScrubberDb
logger = logging.getLogger(__name__)
[docs]
def get_datastore(journal_cfg) -> Datastore:
if journal_cfg.get("cls") == "kafka":
datastore = Datastore(
package="journal",
cls="kafka",
instance=json.dumps(
{
"brokers": journal_cfg["brokers"],
"group_id": journal_cfg["group_id"],
"prefix": journal_cfg["prefix"],
}
),
)
else:
raise NotImplementedError(
f"JournalChecker(journal_client_config={journal_cfg!r}).datastore()"
)
return datastore
[docs]
class JournalChecker(BaseChecker):
"""Reads a chunk of a swh-storage database, recomputes checksums, and
reports errors in a separate database."""
def __init__(
self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any]
):
super().__init__(db=db, config_id=config_id)
if self.config.check_references:
raise ValueError(
"The journal checker cannot check for references, please set "
"the 'check_references' to False in the config entry %s.",
self.config_id,
)
self.journal_client_config = journal_client_config.copy()
if "object_types" in self.journal_client_config:
raise ValueError(
"The journal_client configuration entry should not define the "
"object_types field; this is handled by the scrubber configuration entry"
)
self.journal_client_config["object_types"] = [self.object_type.name.lower()]
self.journal_client = get_journal_client(
**self.journal_client_config,
# Remove default deserializer; so process_kafka_values() gets the message
# verbatim so it can archive it with as few modifications a possible.
value_deserializer=lambda obj_type, msg: msg,
)
[docs]
def run(self) -> None:
"""Runs a journal client with the given configuration.
This method does not return, unless the client is configured with ``on_eof``
parameter equals to ``EofBehavior.STOP`` (``stop`` in YAML).
"""
self.journal_client.process(self.process_kafka_messages)
[docs]
def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
for object_type, messages in all_messages.items():
logger.debug("Processing %s %s", len(messages), object_type)
cls = getattr(model, object_type.capitalize())
for message in messages:
if object_type == "directory":
d = kafka_to_value(message)
(
has_duplicate_dir_entries,
object_,
) = cls.from_possibly_duplicated_entries(
entries=tuple(
map(model.DirectoryEntry.from_dict, d["entries"])
),
raw_manifest=d.get("raw_manifest"),
)
object_ = attr.evolve(object_, id=d["id"])
if has_duplicate_dir_entries:
self.statsd.increment(
"duplicate_directory_entries_total",
tags={"object_type": "directory"},
)
else:
object_ = cls.from_dict(kafka_to_value(message))
has_duplicate_dir_entries = False
real_id = object_.compute_hash()
if object_.id != real_id or has_duplicate_dir_entries:
self.db.corrupt_object_add(object_.swhid(), self.config, message)