Source code for swh.scanner.scanner

# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import asyncio
from typing import Any, Dict, Iterable

import aiohttp

from swh.model.cli import model_of_dir
from swh.model.from_disk import Directory

from .client import Client
from .data import (
    MerkleNodeInfo,
    add_origin,
    get_vcs_ignore_patterns,
    init_merkle_node_info,
)
from .output import get_output_class
from .policy import (
    QUERY_LIMIT,
    DirectoryPriority,
    FilePriority,
    GreedyBFS,
    LazyBFS,
    QueryAll,
    RandomDirSamplingPriority,
    source_size,
)


[docs] async def run( config: Dict[str, Any], policy, source_tree: Directory, nodes_data: MerkleNodeInfo, extra_info: set, ) -> None: """Scan a given source code according to the policy given in input.""" api_url = config["web-api"]["url"] if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: client = Client(api_url, session) for info in extra_info: if info == "known": await policy.run(client) elif info == "origin": await add_origin(source_tree, nodes_data, client) else: raise Exception(f"The information '{info}' cannot be retrieved")
[docs] def get_policy_obj(source_tree: Directory, nodes_data: MerkleNodeInfo, policy: str): if policy == "auto": return ( QueryAll(source_tree, nodes_data) if source_size(source_tree) <= QUERY_LIMIT else LazyBFS(source_tree, nodes_data) ) elif policy == "bfs": return LazyBFS(source_tree, nodes_data) elif policy == "greedybfs": return GreedyBFS(source_tree, nodes_data) elif policy == "filepriority": return FilePriority(source_tree, nodes_data) elif policy == "dirpriority": return DirectoryPriority(source_tree, nodes_data) elif policy == "randomdir": return RandomDirSamplingPriority(source_tree, nodes_data) else: raise Exception(f"policy '{policy}' not found")
# here is a set of directory we should disregard # # TODO: make its usage configurable # TODO: make it extensible through configuration COMMON_EXCLUDE_PATTERNS = [ b".bzr", b".coverage", b"*.egg-info", b".eggs", b".git", b".hg", b".mypy_cache", b"__pycache__", b".svn", b".tox", ] COMMON_EXCLUDE_PATTERNS.extend([b"*/" + p for p in COMMON_EXCLUDE_PATTERNS])
[docs] def scan( config: Dict[str, Any], root_path: str, exclude_patterns: Iterable[str], out_fmt: str, interactive: bool, policy: str, extra_info: set, ): """Scan a source code project to discover files and directories already present in the archive""" converted_patterns = [pattern.encode() for pattern in exclude_patterns] converted_patterns.extend(COMMON_EXCLUDE_PATTERNS) vcs_ignore_patterns = get_vcs_ignore_patterns() converted_patterns.extend(vcs_ignore_patterns) source_tree = model_of_dir(root_path.encode(), converted_patterns) nodes_data = MerkleNodeInfo() extra_info.add("known") init_merkle_node_info(source_tree, nodes_data, extra_info) policy = get_policy_obj(source_tree, nodes_data, policy) asyncio.run(run(config, policy, source_tree, nodes_data, extra_info)) if interactive: out_fmt = "interactive" get_output_class(out_fmt)(root_path, nodes_data, source_tree).show()