# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import itertools
from typing import Iterable, List, no_type_check
from swh.core.utils import grouper
from swh.model import discovery, from_disk
from swh.model.from_disk import model
from swh.model.model import Sha1Git
from .client import QUERY_LIMIT, Client
from .data import MerkleNodeInfo
[docs]
def source_size(source_tree: from_disk.Directory):
"""return the size of a source tree as the number of nodes it contains"""
return sum(1 for n in source_tree.iter_tree(dedup=False))
[docs]
class Policy(metaclass=abc.ABCMeta):
data: MerkleNodeInfo
"""information about contents and directories of the merkle tree"""
source_tree: from_disk.Directory
"""representation of a source code project directory in the merkle tree"""
def __init__(self, source_tree: from_disk.Directory, data: MerkleNodeInfo):
self.source_tree = source_tree
self.data = data
[docs]
@abc.abstractmethod
async def run(self, client: Client):
"""Scan a source code project"""
raise NotImplementedError("Must implement run method")
[docs]
class LazyBFS(Policy):
"""Read nodes in the merkle tree using the BFS algorithm.
Lookup only directories that are unknown otherwise set all the downstream
contents to known.
"""
[docs]
async def run(self, client: Client):
queue = []
queue.append(self.source_tree)
while queue:
swhids = [node.swhid() for node in queue]
swhids_res = await client.known(swhids)
for node in queue.copy():
queue.remove(node)
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][
"known"
]
if node.object_type == "directory":
if not self.data[node.swhid()]["known"]:
children = [n[1] for n in list(node.items())]
queue.extend(children)
else:
for sub_node in node.iter_tree():
if sub_node == node:
continue
self.data[sub_node.swhid()]["known"] = True # type: ignore
[docs]
class GreedyBFS(Policy):
"""Query graph nodes in chunks (to maximize the Web API rate limit use) and set the
downstream contents of known directories to known.
"""
[docs]
async def run(self, client: Client):
ssize = source_size(self.source_tree)
seen = []
async for nodes_chunk in self.get_nodes_chunks(client, ssize):
for node in nodes_chunk:
seen.append(node)
if len(seen) == ssize:
return
if node.object_type == "directory" and self.data[node.swhid()]["known"]:
sub_nodes = [n for n in node.iter_tree(dedup=False)]
sub_nodes.remove(node) # remove root node
for sub_node in sub_nodes:
seen.append(sub_node)
self.data[sub_node.swhid()]["known"] = True
[docs]
@no_type_check
async def get_nodes_chunks(self, client: Client, ssize: int):
"""Query chunks of QUERY_LIMIT nodes at once in order to fill the Web API
rate limit. It query all the nodes in the case the source code contains
less than QUERY_LIMIT nodes.
"""
nodes = self.source_tree.iter_tree(dedup=False)
for nodes_chunk in grouper(nodes, QUERY_LIMIT):
nodes_chunk = [n for n in nodes_chunk]
swhids = [node.swhid() for node in nodes_chunk]
swhids_res = await client.known(swhids)
for node in nodes_chunk:
swhid = node.swhid()
self.data[swhid]["known"] = swhids_res[str(swhid)]["known"]
yield nodes_chunk
[docs]
class FilePriority(Policy):
"""Check the Merkle tree querying all the file contents and set all the upstream
directories to unknown in the case a file content is unknown.
Finally check all the directories which status is still unknown and set all the
sub-directories of known directories to known.
"""
[docs]
@no_type_check
async def run(self, client: Client):
# get all the files
all_contents = list(
filter(
lambda node: node.object_type == "content", self.source_tree.iter_tree()
)
)
all_contents.reverse() # check deepest node first
# query the backend to get all file contents status
cnt_swhids = [node.swhid() for node in all_contents]
cnt_status_res = await client.known(cnt_swhids)
# set all the file contents status
for cnt in all_contents:
self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"]
# set all the upstream directories of unknown file contents to unknown
if not self.data[cnt.swhid()]["known"]:
parent = cnt.parents[0]
while parent:
self.data[parent.swhid()]["known"] = False
parent = parent.parents[0] if parent.parents else None
# get all unset directories and check their status
# (update children directories accordingly)
unset_dirs = list(
filter(
lambda node: node.object_type == "directory"
and self.data[node.swhid()]["known"] is None,
self.source_tree.iter_tree(),
)
)
# check unset directories
for dir_ in unset_dirs:
if self.data[dir_.swhid()]["known"] is None:
# update directory status
dir_status = await client.known([dir_.swhid()])
dir_known = dir_status[str(dir_.swhid())]["known"]
self.data[dir_.swhid()]["known"] = dir_known
if dir_known:
sub_dirs = list(
filter(
lambda n: n.object_type == "directory"
and self.data[n.swhid()]["known"] is None,
dir_.iter_tree(),
)
)
for node in sub_dirs:
self.data[node.swhid()]["known"] = True
[docs]
class DirectoryPriority(Policy):
"""Check the Merkle tree querying all the directories that have at least one file
content and set all the upstream directories to unknown in the case a directory
is unknown otherwise set all the downstream contents to known.
Finally check the status of empty directories and all the remaining file
contents.
"""
[docs]
@no_type_check
async def run(self, client: Client):
# get all directory contents that have at least one file content
unknown_dirs = list(
filter(
lambda dir_: dir_.object_type == "directory"
and self.has_contents(dir_),
self.source_tree.iter_tree(),
)
)
unknown_dirs.reverse() # check deepest node first
for dir_ in unknown_dirs:
if self.data[dir_.swhid()]["known"] is None:
dir_status = await client.known([dir_.swhid()])
dir_known = dir_status[str(dir_.swhid())]["known"]
self.data[dir_.swhid()]["known"] = dir_known
# set all the downstream file contents to known
if dir_known:
for cnt in self.get_contents(dir_):
self.data[cnt.swhid()]["known"] = True
# otherwise set all the upstream directories to unknown
else:
parent = dir_.parents[0]
while parent:
self.data[parent.swhid()]["known"] = False
parent = parent.parents[0] if parent.parents else None
# get remaining directories that have no file contents
empty_dirs = list(
filter(
lambda n: n.object_type == "directory"
and not self.has_contents(n)
and self.data[n.swhid()]["known"] is None,
self.source_tree.iter_tree(),
)
)
empty_dirs_swhids = [n.swhid() for n in empty_dirs]
empty_dir_status = await client.known(empty_dirs_swhids)
# update status of directories that have no file contents
for dir_ in empty_dirs:
self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][
"known"
]
# check unknown file contents
unknown_cnts = list(
filter(
lambda n: n.object_type == "content"
and self.data[n.swhid()]["known"] is None,
self.source_tree.iter_tree(),
)
)
unknown_cnts_swhids = [n.swhid() for n in unknown_cnts]
unknown_cnts_status = await client.known(unknown_cnts_swhids)
for cnt in unknown_cnts:
self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][
"known"
]
[docs]
def has_contents(self, directory: from_disk.Directory):
"""Check if the directory given in input has contents"""
for entry in directory.entries:
if entry["type"] == "file":
return True
return False
[docs]
def get_contents(self, dir_: from_disk.Directory):
"""Get all the contents of a given directory"""
for _, node in list(dir_.items()):
if node.object_type == "content":
yield node
[docs]
class WebAPIConnection(discovery.ArchiveDiscoveryInterface):
"""Use the web APIs to query the archive"""
def __init__(
self,
contents: List[model.Content],
skipped_contents: List[model.SkippedContent],
directories: List[model.Directory],
client: Client,
) -> None:
self.contents = contents
self.skipped_contents = skipped_contents
self.directories = directories
self.client = client
self.sha_to_swhid = {}
self.swhid_to_sha = {}
for content in contents:
swhid = str(content.swhid())
self.sha_to_swhid[content.sha1_git] = swhid
self.swhid_to_sha[swhid] = content.sha1_git
for directory in directories:
swhid = str(directory.swhid())
self.sha_to_swhid[directory.id] = swhid
self.swhid_to_sha[swhid] = directory.id
[docs]
async def content_missing(self, contents: List[Sha1Git]) -> List[Sha1Git]:
"""List content missing from the archive by sha1"""
return await self._missing(contents)
[docs]
async def skipped_content_missing(
self, skipped_contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
"""List skipped content missing from the archive by sha1"""
# TODO what should we do about skipped contents?
return skipped_contents
[docs]
async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List directories missing from the archive by sha1"""
return await self._missing(directories)
async def _missing(self, shas: List[Sha1Git]) -> List[Sha1Git]:
# Ignore mypy complaining about string being passed, since `known`
# transforms them to string immediately.
res = await self.client.known([self.sha_to_swhid[o] for o in shas]) # type: ignore
return [self.swhid_to_sha[k] for k, v in res.items() if not v["known"]]
[docs]
class RandomDirSamplingPriority(Policy):
"""Check the Merkle tree querying random directories. Set all ancestors to
unknown for unknown directories, otherwise set all descendants to known.
Finally check all the remaining file contents.
"""
[docs]
@no_type_check
async def run(self, client: Client):
contents, skipped_contents, directories = from_disk.iter_directory(
self.source_tree
)
# `filter_known_objects` only does filtering by random directory
# sampling for now.
# In the future, it could/will grow a parameter to choose/pass in a
# different discovery implementation.
# From this call site, we are relying on this behavior in order to
# *actually* be a random directory sampling policy, but any change away
# from under us in `filter_known_objects` should trigger a test failure.
get_unknowns = discovery.filter_known_objects(
WebAPIConnection(contents, skipped_contents, directories, client),
)
unknowns = set(itertools.chain(*await get_unknowns))
for obj in itertools.chain(contents, skipped_contents, directories):
self.data[obj.swhid()]["known"] = obj not in unknowns
[docs]
class QueryAll(Policy):
"""Check the status of every node in the Merkle tree."""
[docs]
@no_type_check
async def run(self, client: Client):
all_nodes = [node for node in self.source_tree.iter_tree()]
all_swhids = [node.swhid() for node in all_nodes]
swhids_res = await client.known(all_swhids)
for node in all_nodes:
self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"]