Source code for swh.storage.proxies.validate

# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


from typing import Dict, Iterable, List

from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
from swh.model.model import Content, Directory, Release, Revision, Snapshot
from swh.storage import StorageSpec, get_storage
from swh.storage.exc import StorageArgumentException
from swh.storage.interface import StorageInterface


[docs] class ValidatingProxyStorage: """Proxy for storage classes, which checks inserted objects have a correct hash. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: validate storage: cls: remote url: http://storage.internal.staging.swh.network:5002/ """ def __init__(self, storage: StorageSpec) -> None: self.storage: StorageInterface = get_storage(**storage) def __getattr__(self, key): if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def _check_hashes(self, objects: Iterable): for obj in objects: id_ = hash_to_bytes(obj.compute_hash()) if id_ != obj.id: raise StorageArgumentException( f"Object has id {hash_to_hex(obj.id)}, " f"but it should be {hash_to_hex(id_)}: {obj}" )
[docs] def content_add(self, content: List[Content]) -> Dict[str, int]: for cont in content: hashes = MultiHash.from_data(cont.data).digest() if hashes != cont.hashes(): raise StorageArgumentException( f"Object has hashes {cont.hashes()}, but they should be {hashes}" ) return self.storage.content_add(content)
[docs] def directory_add(self, directories: List[Directory]) -> Dict[str, int]: self._check_hashes(directories) return self.storage.directory_add(directories)
[docs] def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: self._check_hashes(revisions) return self.storage.revision_add(revisions)
[docs] def release_add(self, releases: List[Release]) -> Dict[str, int]: self._check_hashes(releases) return self.storage.release_add(releases)
[docs] def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: self._check_hashes(snapshots) return self.storage.snapshot_add(snapshots)