Source code for swh.scrubber.utils
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Callable, Optional
import psycopg2
from swh.model.swhids import CoreSWHID
from .db import CorruptObject, ScrubberDb
[docs]
def iter_corrupt_objects(
db: ScrubberDb,
start_object: CoreSWHID,
end_object: CoreSWHID,
origin_url: Optional[str],
cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None],
) -> None:
"""Fetches objects and calls ``cb`` on each of them.
objects are fetched with an update lock, with the same transaction as ``cb``,
which is automatically committed after ``cb`` runs."""
while True:
with db.conn, db.cursor() as cur:
if origin_url:
corrupt_objects = list(
db.corrupt_object_grab_by_origin(
cur, origin_url, start_object, end_object
)
)
else:
corrupt_objects = list(
db.corrupt_object_grab_by_id(cur, start_object, end_object)
)
if corrupt_objects and corrupt_objects[0].id == start_object:
# TODO: don't needlessly fetch duplicate objects
del corrupt_objects[0]
if not corrupt_objects:
# Nothing more to do
break
for corrupt_object in corrupt_objects:
cb(corrupt_object, cur)
db.conn.commit() # XXX: is this redundant with db.conn.__exit__?
start_object = corrupt_objects[-1].id