# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Reads all known corrupts objects from the swh-scrubber database,
and tries to recover them.
Currently, only recovery from Git origins is implemented"""
import dataclasses
import functools
import logging
import os
from pathlib import Path
import subprocess
import tempfile
from typing import Dict, Optional, Type, Union
import dulwich
import dulwich.objects
import dulwich.repo
import psycopg2
from swh.journal.serializers import kafka_to_value, value_to_kafka
from swh.loader.git import converters
from swh.model.hashutil import hash_to_bytehex, hash_to_hex
from swh.model.model import BaseModel, Directory, Release, Revision, Snapshot
from swh.model.swhids import CoreSWHID, ObjectType
from .db import CorruptObject, FixedObject, ScrubberDb
from .utils import iter_corrupt_objects
logger = logging.getLogger(__name__)
ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
[docs]
def get_object_from_clone(
clone_path: Path, swhid: CoreSWHID
) -> Union[None, bytes, dulwich.objects.ShaFile]:
"""Reads the original object matching the ``corrupt_object`` from the given clone
if it exists, and returns a Dulwich object if possible, or a the raw manifest."""
try:
repo = dulwich.repo.Repo(str(clone_path))
except dulwich.errors.NotGitRepository:
return None
with repo: # needed to avoid packfile fd leaks
try:
return repo[hash_to_bytehex(swhid.object_id)]
except KeyError:
return None
except dulwich.errors.ObjectFormatException:
# fallback to git if dulwich can't parse it.
# Unfortunately, Dulwich does not allow fetching an object without
# parsing it into a ShaFile subclass, so we have to manually get it
# by shelling out to git.
object_type = (
subprocess.check_output(
[
"git",
"-C",
clone_path,
"cat-file",
"-t",
hash_to_hex(swhid.object_id),
]
)
.decode()
.strip()
)
manifest = subprocess.check_output(
[
"git",
"-C",
clone_path,
"cat-file",
object_type,
hash_to_hex(swhid.object_id),
]
)
manifest = f"{object_type} {len(manifest)}\x00".encode() + manifest
logger.info("Dulwich failed to parse %r", manifest)
return manifest
[docs]
def get_fixed_object_from_clone(
clone_path: Path, corrupt_object: CorruptObject
) -> Optional[FixedObject]:
"""Reads the original object matching the ``corrupt_object`` from the given clone
if it exists, and returns a :class:`FixedObject` instance ready to be inserted
in the database."""
cloned_dulwich_obj_or_manifest = get_object_from_clone(
clone_path, corrupt_object.id
)
if cloned_dulwich_obj_or_manifest is None:
# Origin still exists, but object disappeared
logger.info("%s not found in origin", corrupt_object.id)
return None
elif isinstance(cloned_dulwich_obj_or_manifest, bytes):
# Dulwich could not parse it. Add as raw manifest to the existing object
d = kafka_to_value(corrupt_object.object_)
assert d.get("raw_manifest") is None, "Corrupt object has a raw_manifest"
d["raw_manifest"] = cloned_dulwich_obj_or_manifest
# Rebuild the object from the stored corrupt object + the raw manifest
# just recovered; then checksum it.
classes: Dict[ObjectType, Type[BaseModel]] = {
ObjectType.REVISION: Revision,
ObjectType.DIRECTORY: Directory,
ObjectType.RELEASE: Release,
}
cls = classes[corrupt_object.id.object_type]
recovered_obj = cls.from_dict(d)
recovered_obj.check()
return FixedObject(
id=corrupt_object.id,
object_=value_to_kafka(d),
method="manifest_from_origin",
)
else:
converter = {
ObjectType.REVISION: converters.dulwich_commit_to_revision,
ObjectType.DIRECTORY: converters.dulwich_tree_to_directory,
ObjectType.RELEASE: converters.dulwich_tag_to_release,
}[corrupt_object.id.object_type]
cloned_obj = converter(cloned_dulwich_obj_or_manifest)
# Check checksum, among others
cloned_obj.check()
return FixedObject(
id=corrupt_object.id,
object_=value_to_kafka(cloned_obj.to_dict()),
method="from_origin",
)
[docs]
@dataclasses.dataclass
class Fixer:
"""Reads a chunk of corrupt objects in the swh-scrubber database, tries to recover
them through various means (brute-forcing fields and re-downloading from the origin)
recomputes checksums, and writes them back to the swh-scrubber database
if successful.
"""
db: ScrubberDb
"""Database to read from and write to."""
start_object: CoreSWHID = CoreSWHID.from_string("swh:1:cnt:" + "00" * 20)
"""Minimum SWHID to check (in alphabetical order)"""
end_object: CoreSWHID = CoreSWHID.from_string("swh:1:snp:" + "ff" * 20)
"""Maximum SWHID to check (in alphabetical order)"""
[docs]
def run(self):
# TODO: currently only support re-downloading from the origin:
# we should try brute-forcing for objects with no known origin (or when
# all origins fail)
after = ""
while True:
new_origins = self.db.object_origin_get(after=after)
if not new_origins:
break
for origin_url in new_origins:
self.recover_objects_from_origin(origin_url)
after = new_origins[-1]
[docs]
def recover_objects_from_origin(self, origin_url):
"""Clones an origin, and cherry-picks original objects that are known to be
corrupt in the database."""
with tempfile.TemporaryDirectory(prefix=__name__ + ".") as tempdir:
clone_path = Path(tempdir) / "repository.git"
try:
subprocess.run(
["git", "clone", "--bare", origin_url, clone_path],
env={"PATH": os.environ["PATH"], "GIT_TERMINAL_PROMPT": "0"},
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
except Exception:
logger.exception("Failed to clone %s", origin_url)
return
iter_corrupt_objects(
self.db,
self.start_object,
self.end_object,
origin_url,
functools.partial(self.recover_corrupt_object, clone_path=clone_path),
)
[docs]
def recover_corrupt_object(
self,
corrupt_object: CorruptObject,
cur: psycopg2.extensions.cursor,
clone_path: Path,
) -> None:
fixed_object = get_fixed_object_from_clone(clone_path, corrupt_object)
if fixed_object is not None:
self.db.fixed_object_add(cur, [fixed_object])