Source code for swh.scrubber.origin_locator
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Lists corrupt objects in the scrubber database, and lists candidate origins
to recover them from."""
import dataclasses
import itertools
import logging
from typing import Iterable, Union
import psycopg2
from swh.core.utils import grouper
from swh.graph.http_client import GraphArgumentException, RemoteGraphClient
from swh.model.model import Directory, Release, Revision, Snapshot
from swh.model.swhids import CoreSWHID, ExtendedSWHID
from swh.storage.interface import StorageInterface
from .db import CorruptObject, ScrubberDb
from .utils import iter_corrupt_objects
logger = logging.getLogger(__name__)
ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
[docs]
def get_origins(
graph: RemoteGraphClient, storage: StorageInterface, swhid: CoreSWHID
) -> Iterable[str]:
try:
origin_swhids = [
ExtendedSWHID.from_string(line)
for line in graph.leaves(str(swhid), direction="backward")
if line.startswith("swh:1:ori:")
]
except GraphArgumentException:
return
for origin_swhid_group in grouper(origin_swhids, 10):
origin_swhid_group = list(origin_swhid_group)
for origin, origin_swhid in zip(
storage.origin_get_by_sha1(
[origin_swhid.object_id for origin_swhid in origin_swhid_group]
),
origin_swhid_group,
):
if origin is None:
logger.error("%s found in graph but missing from storage", origin_swhid)
else:
yield origin["url"]
[docs]
@dataclasses.dataclass
class OriginLocator:
"""Reads a chunk of corrupt objects in the swh-scrubber database, then writes
to the same database a list of origins they might be recovered from."""
db: ScrubberDb
"""Database to read from and write to."""
graph: RemoteGraphClient
storage: StorageInterface
"""Used to resolve origin SHA1s to URLs."""
start_object: CoreSWHID
"""Minimum SWHID to check (in alphabetical order)"""
end_object: CoreSWHID
"""Maximum SWHID to check (in alphabetical order)"""
[docs]
def run(self):
iter_corrupt_objects(
self.db,
self.start_object,
self.end_object,
None,
self.handle_corrupt_object,
)
[docs]
def handle_corrupt_object(
self, corrupt_object: CorruptObject, cur: psycopg2.extensions.cursor
) -> None:
origins = get_origins(self.graph, self.storage, corrupt_object.id)
# Keep only 100 origins, to avoid flooding the DB.
# It is very unlikely an object disappred from 100 somwhat-randomly sampled
# origins.
first_origins = list(itertools.islice(origins, 0, 100))
self.db.object_origin_add(cur, corrupt_object.id, first_origins)