Source code for swh.scanner.policy

# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import abc
import itertools
from typing import Any, Callable, Iterable, List, Optional, no_type_check

from swh.model import discovery, from_disk
from swh.model.from_disk import model
from swh.model.model import Sha1Git
from swh.web.client.client import WebAPIClient

from .data import MerkleNodeInfo


[docs] def source_size(source_tree: from_disk.Directory): """return the size of a source tree as the number of nodes it contains""" return sum(1 for n in source_tree.iter_tree(dedup=False))
[docs] class Policy(metaclass=abc.ABCMeta): data: MerkleNodeInfo """information about contents and directories of the merkle tree""" source_tree: from_disk.Directory """representation of a source code project directory in the merkle tree""" def __init__( self, source_tree: from_disk.Directory, data: MerkleNodeInfo, ): self.source_tree = source_tree self.data = data
[docs] @abc.abstractmethod def run( self, client: WebAPIClient, update_info: Optional[Callable[[Any], None]] = None ): """Scan a source code project""" raise NotImplementedError("Must implement run method")
def _set_info(self, obj, known): self.data[obj.swhid()]["known"] = known
[docs] class WebAPIConnection(discovery.ArchiveDiscoveryInterface): """Use the web APIs to query the archive""" def __init__( self, contents: List[model.Content], skipped_contents: List[model.SkippedContent], directories: List[model.Directory], client: WebAPIClient, ) -> None: self.contents = contents self.skipped_contents = skipped_contents self.directories = directories self.client = client self.sha_to_swhid = {} self.swhid_to_sha = {} for content in contents: swhid = str(content.swhid()) self.sha_to_swhid[content.sha1_git] = swhid self.swhid_to_sha[swhid] = content.sha1_git for directory in directories: swhid = str(directory.swhid()) self.sha_to_swhid[directory.id] = swhid self.swhid_to_sha[swhid] = directory.id
[docs] def content_missing(self, contents: List[Sha1Git]) -> List[Sha1Git]: """List content missing from the archive by sha1""" return self._missing(contents)
[docs] def skipped_content_missing( self, skipped_contents: List[Sha1Git] ) -> Iterable[Sha1Git]: """List skipped content missing from the archive by sha1""" # TODO what should we do about skipped contents? return skipped_contents
[docs] def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: """List directories missing from the archive by sha1""" return self._missing(directories)
def _missing(self, shas: List[Sha1Git]) -> List[Sha1Git]: # Ignore mypy complaining about string being passed, since `known` # transforms them to string immediately. res = self.client.known([self.sha_to_swhid[o] for o in shas]) return [k.object_id for k, v in res.items() if not v["known"]]
[docs] class RandomDirSamplingPriority(Policy): """Check the Merkle tree querying random directories. Set all ancestors to unknown for unknown directories, otherwise set all descendants to known. Finally check all the remaining file contents. """
[docs] @no_type_check def run( self, client: WebAPIClient, update_info: Optional[Callable[[Any], None]] = None ): contents, skipped_contents, directories = from_disk.iter_directory( self.source_tree ) # `filter_known_objects` only does filtering by random directory # sampling for now. # In the future, it could/will grow a parameter to choose/pass in a # different discovery implementation. # From this call site, we are relying on this behavior in order to # *actually* be a random directory sampling policy, but any change away # from under us in `filter_known_objects` should trigger a test failure. connection = WebAPIConnection(contents, skipped_contents, directories, client) if update_info is None: callback = self._set_info else: def callback(*args, **kwargs): self._set_info(*args, **kwargs) update_info(*args, **kwargs) get_unknowns = discovery.filter_known_objects( connection, update_info_callback=callback, ) unknowns = set(itertools.chain(*get_unknowns)) # double check the result for obj in itertools.chain(contents, skipped_contents, directories): assert self.data[obj.swhid()]["known"] == (obj not in unknowns)