swh.scrubber.db module#

swh.scrubber.db.now()[source]#
class swh.scrubber.db.Datastore(package: str, cls: str, instance: str)[source]#

Bases: object

Represents a datastore being scrubbed; eg. swh-storage or swh-journal.

package: str#

‘storage’, ‘journal’, or ‘objstorage’.

cls: str#

‘postgresql’/’cassandra’ for storage, ‘kafka’ for journal, ‘pathslicer’/’winery’/… for objstorage.

instance: str#

Human readable string.

class swh.scrubber.db.ConfigEntry(name: str, datastore: Datastore, object_type: ObjectType, nb_partitions: int, check_hashes: bool, check_references: bool)[source]#

Bases: object

Represents a datastore being scrubbed; eg. swh-storage or swh-journal.

name: str#
datastore: Datastore#
object_type: ObjectType#
nb_partitions: int#
check_hashes: bool#
check_references: bool#
class swh.scrubber.db.CorruptObject(id: swh.model.swhids.CoreSWHID, config: swh.scrubber.db.ConfigEntry, first_occurrence: datetime.datetime, object_: bytes)[source]#

Bases: object

id: CoreSWHID#
config: ConfigEntry#
first_occurrence: datetime#
object_: bytes#
class swh.scrubber.db.MissingObject(id: swh.model.swhids.CoreSWHID, config: swh.scrubber.db.ConfigEntry, first_occurrence: datetime.datetime)[source]#

Bases: object

id: CoreSWHID#
config: ConfigEntry#
first_occurrence: datetime#
class swh.scrubber.db.MissingObjectReference(missing_id: swh.model.swhids.CoreSWHID, reference_id: swh.model.swhids.CoreSWHID, config: swh.scrubber.db.ConfigEntry, first_occurrence: datetime.datetime)[source]#

Bases: object

missing_id: CoreSWHID#
reference_id: CoreSWHID#
config: ConfigEntry#
first_occurrence: datetime#
class swh.scrubber.db.FixedObject(id: swh.model.swhids.CoreSWHID, object_: bytes, method: str, recovery_date: datetime.datetime | None = None)[source]#

Bases: object

id: CoreSWHID#
object_: bytes#
method: str#
recovery_date: datetime | None = None#
class swh.scrubber.db.ScrubberDb(conn: connection, pool: AbstractConnectionPool | None = None)[source]#

Bases: BaseDb

create a DB proxy

Parameters:
  • conn – psycopg2 connection to the SWH DB

  • pool – psycopg2 pool of connections

current_version = 7#
datastore_get_or_add(datastore: Datastore) int[source]#

Creates a datastore if it does not exist, and returns its id.

datastore_get(datastore_id: int) Datastore[source]#

Returns a datastore’s id. Raises ValueError if it does not exist.

config_add(name: str | None, datastore: Datastore, object_type: ObjectType, nb_partitions: int, check_hashes: bool = True, check_references: bool = True) int[source]#

Creates a configuration entry (and potentially a datastore);

Will fail if a config with same (datastore. object_type, nb_paritions) already exists.

config_get(config_id: int) ConfigEntry[source]#
config_get_by_name(name: str, datastore: int | None = None) int | None[source]#

Get the configuration entry for given name, if any

config_iter() Iterator[Tuple[int, ConfigEntry]][source]#
config_get_stats(config_id: int) Dict[str, Any][source]#

Return statistics for the check configuration <check_id>.

checked_partition_iter_next(config_id: int) Iterator[int][source]#

Generates partitions to be checked for the given configuration

At each iteration, look for the next “free” partition in the checked_partition, for the given config_id, reserve it and return its id.

Reserving the partition means make sure there is a row in the table for this partition id with the start_date column set.

To chose a “free” partition is to select either the smaller partition is for which the start_date is NULL, or the first partition id not yet in the table.

Stops the iteration when the number of partitions for the config id is reached.

checked_partition_reset(config_id: int, partition_id: int) bool[source]#

Reset the partition, aka clear start_date and end_date

checked_partition_upsert(config_id: int, partition_id: int, date: datetime | None = None) None[source]#

Records in the database the given partition was last checked at the given date.

checked_partition_get_last_date(config_id: int, partition_id: int) datetime | None[source]#

Returns the last date the given partition was checked in the given datastore, or None if it was never checked.

Currently, this matches partition id and number exactly, with no regard for partitions that contain or are contained by it.

checked_partition_get_running(config_id: int) Iterator[Tuple[int, datetime]][source]#

Yields the partitions which are currently being checked; i.e. which have a start_date but no end_date.

checked_partition_get_stuck(config_id: int, since: timedelta | None = None) Iterator[Tuple[int, datetime]][source]#

Yields the partitions which are currently running for more than since; if not set, automatically guess a reasonable delay from completed partitions. If no such a delay can be extracted, fall back to 1 hour.

The heuristic for the automatic delay is 2x max(end_date-start_date) for the last 10 partitions checked.

checked_partition_iter(config_id: int) Iterator[Tuple[int, int, datetime, datetime | None]][source]#

Yields tuples of (partition_id, nb_partitions, start_date, end_date)

corrupt_object_add(id: CoreSWHID, config: ConfigEntry, serialized_object: bytes) None[source]#
corrupt_object_iter() Iterator[CorruptObject][source]#

Yields all records in the ‘corrupt_object’ table.

corrupt_object_get(start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100) Iterator[CorruptObject][source]#

Yields a page of records in the ‘corrupt_object’ table, ordered by id.

Parameters:
  • start_id – Only return objects after this id

  • end_id – Only return objects before this id

  • in_origin – An origin URL. If provided, only returns objects that may be found in the given origin

corrupt_object_grab_by_id(cur: cursor, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100) Iterator[CorruptObject][source]#

Returns a page of records in the ‘corrupt_object’ table for a fixer, ordered by id

These records are not already fixed (ie. do not have a corresponding entry in the ‘fixed_object’ table), and they are selected with an exclusive update lock.

Parameters:
  • start_id – Only return objects after this id

  • end_id – Only return objects before this id

corrupt_object_grab_by_origin(cur: cursor, origin_url: str, start_id: CoreSWHID | None = None, end_id: CoreSWHID | None = None, limit: int = 100) Iterator[CorruptObject][source]#

Returns a page of records in the ‘corrupt_object’ table for a fixer, ordered by id

These records are not already fixed (ie. do not have a corresponding entry in the ‘fixed_object’ table), and they are selected with an exclusive update lock.

Parameters:

origin_url – only returns objects that may be found in the given origin

missing_object_add(id: CoreSWHID, reference_ids: Iterable[CoreSWHID], config: ConfigEntry) None[source]#

Adds a “hole” to the inventory, ie. an object missing from a datastore that is referenced by an other object of the same datastore.

If the missing object is already known to be missing by the scrubber database, this only records the reference (which can be useful to locate an origin to recover the object from). If that reference is already known too, this is a noop.

Parameters:
  • id – SWHID of the missing object (the hole)

  • reference_id – SWHID of the object referencing the missing object

  • datastore – representation of the swh-storage/swh-journal/… instance containing this hole

missing_object_iter() Iterator[MissingObject][source]#

Yields all records in the ‘missing_object’ table.

missing_object_reference_iter(missing_id: CoreSWHID) Iterator[MissingObjectReference][source]#

Yields all records in the ‘missing_object_reference’ table.

object_origin_add(cur: cursor, swhid: CoreSWHID, origins: List[str]) None[source]#
object_origin_get(after: str = '', limit: int = 1000) List[str][source]#

Returns origins with non-fixed corrupt objects, ordered by URL.

Parameters:

after – if given, only returns origins with an URL after this value

fixed_object_add(cur: cursor, fixed_objects: List[FixedObject]) None[source]#
fixed_object_iter() Iterator[FixedObject][source]#