Source code for swh.storage.algos.discovery
from typing import Iterable, List
from swh.model import discovery, model
from swh.model.model import Sha1Git
from swh.storage.interface import StorageInterface
[docs]
class DiscoveryStorageConnection(discovery.ArchiveDiscoveryInterface):
"""Use the storage APIs to query the archive"""
def __init__(
self,
contents: List[model.Content],
skipped_contents: List[model.SkippedContent],
directories: List[model.Directory],
swh_storage: StorageInterface,
) -> None:
self.contents = contents
self.skipped_contents = skipped_contents
self.directories = directories
self.storage = swh_storage
[docs]
def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List content missing from the archive by sha1"""
return self.storage.content_missing_per_sha1_git(contents)
[docs]
def skipped_content_missing(
self, skipped_contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
"""List skipped content missing from the archive by sha1"""
contents = [
{"sha1_git": s, "sha1": None, "sha256": None, "blake2s256": None}
for s in skipped_contents
]
return (d["sha1_git"] for d in self.storage.skipped_content_missing(contents))
[docs]
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List directories missing from the archive by sha1"""
return self.storage.directory_missing(directories)