Source code for swh.scanner.scanner

# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Type

import requests.status_codes

from swh.model.cli import model_of_dir
from swh.model.from_disk import Directory
from swh.web.client.client import DEFAULT_RETRY_REASONS, WebAPIClient

from .data import (
    MerkleNodeInfo,
    add_provenance,
    get_ignore_patterns_templates,
    get_vcs_ignore_patterns,
    init_merkle_node_info,
    parse_ignore_patterns_template,
)
from .output import get_output_class
from .policy import RandomDirSamplingPriority


[docs] class Progress: """default no-op Progress class"""
[docs] class Step(enum.Enum): DISK_SCAN = enum.auto() KNOWN_DISCOVERY = enum.auto() PROVENANCE = enum.auto()
def __init__(self, step: Step, total: Optional[int] = None, **kwargs): pass
[docs] def increment(self, count=1): pass
[docs] def update(self, current_count, total=None): pass
def __enter__(self): return self def __exit__(self, *args, **kwargs): pass
[docs] def get_webapi_client(config: Dict[str, Any]): api_url = config["web-api"]["url"] kwargs = {} # TODO: Better retrieve realm and client id directly from the oidc client? if "keycloak" in config: realm_name = config["keycloak"].get("realm_name") client_id = config["keycloak"].get("client_id") if ( realm_name and client_id and "keycloak_tokens" in config and config["keycloak_tokens"][realm_name][client_id] ): auth_token = config["keycloak_tokens"][realm_name][client_id] kwargs["bearer_token"] = auth_token retry_status = DEFAULT_RETRY_REASONS | { requests.status_codes.codes.GATEWAY_TIMEOUT, requests.status_codes.codes.SERVICE_UNAVAILABLE, } client = WebAPIClient( api_url=api_url, retry_status=retry_status, **kwargs, ) return client
[docs] def run( config: Dict[str, Any], policy, source_tree: Directory, nodes_data: MerkleNodeInfo, provenance: bool, progress_class: Type[Progress] = Progress, ) -> WebAPIClient: """Scan a given source code according to the policy given in input.""" client = get_webapi_client(config) # always start with finding what is known. The other option will need this # information anyway. Fetching "known" status is efficicient and relatively # cheap. In addition is "context free"¹ and can fetch in any order. So we # start with this step in all cases. # # [1] the best answer for "known" does not changes depending of the status # of the files and directory around it. This is not free for "oring" for # example. with progress_class( step=Progress.Step.KNOWN_DISCOVERY, total=len(nodes_data), web_client=client, ) as progress: def callback(*args, **kwargs): progress.increment() policy.run(client, update_info=callback) if provenance: with progress_class( step=Progress.Step.PROVENANCE, total=len(nodes_data), web_client=client, ) as progress: add_provenance( source_tree, nodes_data, client, update_progress=progress.update ) return client
COMMON_EXCLUDE_PATTERNS: List[bytes] = [ b".bzr", b".coverage", b"*.egg-info", b".eggs", b".git", b".hg", b".mypy_cache", b"__pycache__", b".svn", b".tox", ] COMMON_EXCLUDE_PATTERNS.extend([b"*/" + p for p in COMMON_EXCLUDE_PATTERNS])
[docs] def scan( config: Dict[str, Any], root_path: str, out_fmt: str, interactive: bool, provenance: bool, debug_http: bool, progress_class: Type[Progress], ): """Scan a source code project to discover files and directories already present in the archive""" exclude_patterns = config["scanner"]["exclude"] exclude_templates = config["scanner"]["exclude_templates"] disable_global_patterns = config["scanner"]["disable_global_patterns"] disable_vcs_patterns = config["scanner"]["disable_vcs_patterns"] converted_patterns = [pattern.encode() for pattern in exclude_patterns] if exclude_templates is not None: templates = get_ignore_patterns_templates() for template in exclude_templates: converted_patterns.extend( parse_ignore_patterns_template(templates[template]) ) if not disable_global_patterns: converted_patterns.extend(COMMON_EXCLUDE_PATTERNS) if not disable_vcs_patterns: vcs_ignore_patterns = get_vcs_ignore_patterns(Path(root_path)) converted_patterns.extend(vcs_ignore_patterns) with progress_class(step=Progress.Step.DISK_SCAN) as progress: dir_update_info = progress.increment source_tree = model_of_dir( root_path.encode(), converted_patterns, update_info=dir_update_info, ) nodes_data = MerkleNodeInfo() init_merkle_node_info(source_tree, nodes_data, provenance) policy = RandomDirSamplingPriority( source_tree, nodes_data, ) web_client = run( config, policy, source_tree, nodes_data, provenance, progress_class=progress_class, ) get_output_class(out_fmt)( root_path, nodes_data, source_tree, config, web_client ).show() config["debug_http"] = debug_http if interactive: get_output_class("interactive")( root_path, nodes_data, source_tree, config, web_client ).show()