Source code for swh.scrubber.base_checker

# Copyright (C) 2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from abc import ABC, abstractmethod
from itertools import count, islice
import logging
from typing import Any, Dict, Iterable, Optional

import psycopg2
import tenacity

from swh.core.statsd import Statsd
from swh.model import swhids

from .db import ConfigEntry, Datastore, ScrubberDb

logger = logging.getLogger(__name__)


[docs] class BaseChecker(ABC): """Base Checker class wrapping common features.""" def __init__( self, db: ScrubberDb, config_id: int, ): self.db = db self.config_id = config_id self._config: Optional[ConfigEntry] = None self._statsd: Optional[Statsd] = None self.statsd_constant_tags: Dict[str, Any] = { "object_type": self.object_type.name.lower(), "datastore_package": self.datastore.package, "datastore_cls": self.datastore.cls, "datastore_instance": self.datastore.instance, } @property def config(self) -> ConfigEntry: """Returns a :class:`ConfigEntry` instance containing checker configuration.""" if self._config is None: self._config = self.db.config_get(self.config_id) assert self._config is not None return self._config @property def datastore(self) -> Datastore: """Returns a :class:`Datastore` instance representing the source of data being checked.""" return self.config.datastore @property def statsd(self) -> Statsd: """Returns a :class:`Statsd` instance to send statsd metrics.""" if self._statsd is None: self._statsd = Statsd( namespace="swh_scrubber", constant_tags=self.statsd_constant_tags, ) return self._statsd @property def object_type(self) -> swhids.ObjectType: """Returns the type of object being checked.""" return self.config.object_type @property def check_hashes(self) -> bool: return self.config.check_hashes @property def check_references(self) -> bool: return self.config.check_references
[docs] @abstractmethod def run(self) -> None: """Run the checker processing, derived classes must implement this method.""" pass
[docs] class BasePartitionChecker(BaseChecker): """Base class for checkers processing partition of objects.""" def __init__( self, db: ScrubberDb, config_id: int, limit: int = 0, ): super().__init__(db=db, config_id=config_id) self.limit = limit self.statsd_constant_tags["nb_partitions"] = self.nb_partitions @property def nb_partitions(self) -> int: """Returns the number of partitions set in configuration.""" return self.config.nb_partitions
[docs] def run(self) -> None: """Runs on all objects of ``object_type`` in each partition between ``start_partition_id`` (inclusive) and ``end_partition_id`` (exclusive). """ counter: Iterable[int] = count() if self.limit: counter = islice(counter, 0, self.limit) for _, partition_id in zip( counter, self.db.checked_partition_iter_next(self.config_id) ): logger.debug( "Processing %s partition %d/%d", self.object_type, partition_id, self.nb_partitions, ) self._check_partition(self.object_type, partition_id) self.db.checked_partition_upsert( self.config_id, partition_id, )
@tenacity.retry( retry=tenacity.retry_if_exception_type(psycopg2.OperationalError), wait=tenacity.wait_random_exponential(min=10, max=180), ) def _check_partition( self, object_type: swhids.ObjectType, partition_id: int ) -> None: "Retryable method checking objects in partition." return self.check_partition(object_type, partition_id)
[docs] @abstractmethod def check_partition( self, object_type: swhids.ObjectType, partition_id: int ) -> None: """Abstract method that derived classes must implement to check objects in partition.""" pass