Source code for

# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from typing import Dict, Iterable, List

from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
from swh.model.model import Content, Directory, Release, Revision, Snapshot
from import StorageSpec, get_storage
from import StorageArgumentException
from import StorageInterface

[docs] class ValidatingProxyStorage: """Proxy for storage classes, which checks inserted objects have a correct hash. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: validate storage: cls: remote url: """ def __init__(self, storage: StorageSpec) -> None: StorageInterface = get_storage(**storage) def __getattr__(self, key): if key == "storage": raise AttributeError(key) return getattr(, key) def _check_hashes(self, objects: Iterable): for obj in objects: id_ = hash_to_bytes(obj.compute_hash()) if id_ != raise StorageArgumentException( f"Object has id {hash_to_hex(}, " f"but it should be {hash_to_hex(id_)}: {obj}" )
[docs] def content_add(self, content: List[Content]) -> Dict[str, int]: for cont in content: hashes = MultiHash.from_data( if hashes != cont.hashes(): raise StorageArgumentException( f"Object has hashes {cont.hashes()}, but they should be {hashes}" ) return
[docs] def directory_add(self, directories: List[Directory]) -> Dict[str, int]: self._check_hashes(directories) return
[docs] def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: self._check_hashes(revisions) return
[docs] def release_add(self, releases: List[Release]) -> Dict[str, int]: self._check_hashes(releases) return
[docs] def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: self._check_hashes(snapshots) return