Source code for swh.scanner.cli

# Copyright (C) 2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
import os
import textwrap
from typing import Optional

import click
from importlib_metadata import version
import requests

from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.web.client.client import WebAPIClient

from .config import DEFAULT_CONFIG_PATH, SWH_API_ROOT, get_default_config
from .data import NoProvenanceAPIAccess, get_ignore_patterns_templates
from .setup_wizard import invoke_auth, run_setup, should_run_setup


[docs] def get_exclude_templates_list_repr(width=0): """Format and return a list of ignore patterns templates for CLI help""" ignore_templates = get_ignore_patterns_templates() ignore_templates_list = sorted(ignore_templates.keys()) ignore_templates_list_str = ", ".join(map(str, ignore_templates_list)) if width > 0: ignore_templates_list_repr = textwrap.fill( ignore_templates_list_str, width=width ) return ignore_templates_list_repr else: return ignore_templates_list_str
EXCLUDE_TEMPLATES_HELP = f"""Repeatable option to exclude files and directories using an exclusion template (e.g., ``Python`` for common exclusion patterns in a Python project). Valid values are: {get_exclude_templates_list_repr(40)} """ SCANNER_HELP = """Software Heritage Scanner tools Scan a source code project to discover files and directories existing in the Software Heritage archive. """
[docs] def check_auth(ctx): """Check there is some authentication configured Issue a warning otherwise""" assert "config" in ctx.obj assert "oidc_client" in ctx.obj config = ctx.obj["config"] oidc_client = ctx.obj["oidc_client"] realm_name = oidc_client.realm_name client_id = oidc_client.client_id # Check auth for `production` url only if "keycloak_tokens" in config and config["keycloak_tokens"][realm_name][client_id]: auth_token = config["keycloak_tokens"][realm_name][client_id] from swh.auth.keycloak import KeycloakError, keycloak_error_message # Ensure authentication token is valid try: oidc_client.refresh_token(refresh_token=auth_token)["access_token"] # TODO: Display more OIDC information (username, realm, client_id)? msg = f'Authenticated to "{ oidc_client.server_url }".' click.echo(click.style(msg, fg="green")) except KeycloakError as ke: msg = "Error while verifying your authentication configuration." click.echo(click.style(msg, fg="yellow")) msg = "Run `swh scanner login` to configure or verify authentication." click.echo(click.style(msg)) ctx.fail(keycloak_error_message(ke)) else: msg = "Warning: you are not authenticated with the Software Heritage API\n" msg += "Log in to get a higher rate-limit." click.echo(click.style(msg, fg="yellow")) msg = "Run `swh scanner login` to configure or verify authentication." click.echo(click.style(msg))
@swh_cli_group.group( name="scanner", context_settings=CONTEXT_SETTINGS, help=SCANNER_HELP, ) @click.option( "-C", "--config-file", type=click.Path(dir_okay=False, path_type=str), help=f"Configuration file path. [default:{DEFAULT_CONFIG_PATH}]", envvar="SWH_CONFIG_FILENAME", show_default=False, ) @click.version_option( version=version("swh.scanner"), prog_name="swh.scanner", ) @click.pass_context def scanner(ctx: click.Context, config_file: Optional[str]): ctx.ensure_object(dict) config_file = config_file or DEFAULT_CONFIG_PATH ctx.obj["config_file"] = config_file # Get Scanner default config cfg = get_default_config() # Let the setup do its own auth and config setup if ctx.invoked_subcommand != "setup" and not should_run_setup(): # Invoke auth CLI command to get an OIDC client # It will load configuration file if any and populate a ctx 'config' object invoke_auth(ctx, config_file=config_file) assert ctx.obj["config"] # Merge scanner defaults with config object ctx.obj["config"] = config.merge_configs(cfg, ctx.obj["config"]) assert ctx.obj["oidc_client"] @scanner.command(name="login") @click.option( "--username", "username", default=None, help=("OpenID username"), ) @click.option( "--token", "token", default=None, help=("A valid OpenId connect token to authenticate to"), ) @click.pass_context def login(ctx, username: str, token: str): """Authentication configuration guide for Swh Api services. Helps in verifying authentication credentials """ from swh.auth.cli import auth_config ctx.forward(auth_config) @scanner.command(name="scan") @click.argument("root_path", default=".", type=click.Path(exists=True)) @click.option( "-u", "--api-url", default=None, metavar="API_URL", show_default=True, help="URL for the api request", ) @click.option( "--exclude-template", "-t", "exclude_templates", metavar="EXCLUDE_TEMPLATES", multiple=True, help=EXCLUDE_TEMPLATES_HELP, ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERNS", multiple=True, help="Exclude directories using glob patterns \ (e.g., ``*.git`` to exclude all .git directories)", ) @click.option( "-f", "--output-format", "out_fmt", default="summary", show_default=True, type=click.Choice(["summary", "text", "json", "ndjson"], case_sensitive=False), help="The output format", ) @click.option( "--web-ui/--no-web-ui", "interactive", is_flag=True, default=True, help="Launch the default graphical web browser to explore the results in a dashboard.", ) @click.option( "--provenance", "provenance", is_flag=True, help="Also fetch provenance data (requires special permission from SWH).", ) @click.option( "--debug-http", "debug_http", is_flag=True, help="Show debug information about the http request", ) @click.option( "--disable-global-patterns", "disable_global_patterns", is_flag=True, help="Disable common and global exclusion patterns.", ) @click.option( "--disable-vcs-patterns", "disable_vcs_patterns", is_flag=True, help="Disable vcs ignore detection for exclusion patterns", ) @click.option( "-c", "--project-config-file", type=click.Path(dir_okay=False, path_type=str), help="Project Configuration file path.", show_default=False, ) @click.option( "--provenance-concurrency", default=5, help="Number of concurrent connections to the web API.", ) @click.option( "--provenance-batch-size", default=100, help="Batch size when querying the provenance API.", ) @click.pass_context def scan( ctx, root_path, api_url, exclude_templates, patterns, out_fmt, interactive, provenance, debug_http, disable_global_patterns, disable_vcs_patterns, project_config_file: Optional[str], provenance_concurrency, provenance_batch_size, ): """Scan a source code project to discover files and directories already present in the archive. The command opens by default an interactive dashboard after scanning. Can be disabled by the --no-web-ui flag. The command can provide different output using the --output-format option:\n \b summary: display a general summary of what the scanner found text: display the scan result as a text based tree-like view of all the file, using color to indicate the file status. json: write all collected data on standard output as JSON ndjson: write all collected data on standard output as Newline Delimited JSON Exclusion patterns can be set with the repeatable -x/--exclude option:\n \b pattern: glob pattern (e.g., ``*.git`` to exclude all .git directories) Common default exclusion patterns and exclusion patterns defined in your global SWH configuration file can be disabled using the --disable-global-patterns option.\n Version control system ignore files detection for exclusion (e.g. .gitignore, .hgignore, svn ignore file) can be disabled using the --disable-vcs-patterns option. \n """ from pathlib import Path import swh.scanner.data as data import swh.scanner.scanner as scanner if should_run_setup(): run_setup(ctx) click.echo("") # Separate setup and command a little more root_path = os.path.abspath(root_path) # merge global config with per project one if any if project_config_file: project_cfg_path = Path(project_config_file) else: project_cfg_path = Path(root_path) / "swh.scanner.project.yml" if project_cfg_path.exists(): ctx.obj["config"] = config.merge_configs( ctx.obj["config"], config.read_raw_config(str(project_cfg_path)) ) # Exclude from scan the per project configuration file if it is within root path if str(project_cfg_path.parent) in str(root_path): ctx.obj["config"]["scanner"]["exclude"].extend([str(project_cfg_path)]) # override config with command parameters if provided if disable_global_patterns: ctx.obj["config"]["scanner"][ "disable_global_patterns" ] = disable_global_patterns ctx.obj["config"]["scanner"]["exclude"] = [] if disable_vcs_patterns: ctx.obj["config"]["scanner"]["disable_vcs_patterns"] = disable_vcs_patterns if exclude_templates is not None: ctx.obj["config"]["scanner"]["exclude_templates"].extend(exclude_templates) # check that the exclude templates are valid if "exclude_templates" in ctx.obj["config"]["scanner"]: templates = get_ignore_patterns_templates() for template in ctx.obj["config"]["scanner"]["exclude_templates"]: if template not in templates: err_msg = f"Unknown exclusion template '{template}'. Use one of:\n" ctx.fail( click.style(err_msg, fg="yellow") + f"{get_exclude_templates_list_repr()}" ) exclude_templates = ctx.obj["config"]["scanner"]["exclude_templates"] if patterns is not None: ctx.obj["config"]["scanner"]["exclude"].extend(patterns) assert "url" in ctx.obj["config"]["web-api"] if api_url is not None: ctx.obj["config"]["web-api"]["url"] = api_url if debug_http: http_logger = logging.getLogger("swh.web.client.client") http_logger.setLevel(logging.DEBUG) # Check authentication only for production URL # TODO why do we do this? # TODO Should we remove the `swh scanner login` command in favor of the setup? if ctx.obj["config"]["web-api"]["url"] == SWH_API_ROOT: check_auth(ctx) root_path_fmt = click.format_filename(root_path) msg = f"Ready to scan {root_path_fmt}" click.echo(click.style(msg, fg="green"), err=True) class CLIProgress(scanner.Progress): def __init__( self, step: scanner.Progress.Step, total: Optional[int] = None, web_client: Optional[WebAPIClient] = None, ): self._count = 0 self._total = total self._web_client = web_client if step == scanner.Progress.Step.DISK_SCAN: self._text = "local objects scanned" elif step == scanner.Progress.Step.KNOWN_DISCOVERY: self._text = "objects compared with the Software Heritage archive" elif step == scanner.Progress.Step.PROVENANCE: self._text = "provenance data fetched" def increment(self, count=1): """move the progress forward and refresh the output""" self._count += count self._display() def update(self, current_count, total=None): self._count = current_count self._total = total self._display() def _display(self): """refresh the output""" rate_limit = "" rate_limit_delay = getattr(self._web_client, "rate_limit_delay", 0) if rate_limit_delay > 0: requests_per_second = 1 / rate_limit_delay rate_limit = ( f" (rate limited: {requests_per_second:.2f} requests / seconds)" ) if self._total is None: msg = f"\r{self._count} {self._text}{rate_limit}" else: msg = f"\r{self._count}/{self._total} {self._text}{rate_limit}" click.echo(msg, nl=False, err=True) def __enter__(self): return self def __exit__(self, *args, **kwargs): click.echo("", err=True) data.MAX_WHEREARE_BATCH = provenance_batch_size data.MAX_CONCURRENT_PROVENANCE_QUERIES = provenance_concurrency try: scanner.scan( ctx.obj["config"], root_path, out_fmt, interactive, provenance, debug_http, progress_class=CLIProgress, ) except requests.HTTPError as exc: r = exc.response click.secho( "ERROR: Unexpected errors from the Software Heritage Archive:", fg="red", ) click.secho( f"ERROR: {r.url}", fg="red", ) click.secho( f"ERROR: {r.status_code} {r.reason}", fg="red", ) return 2 except NoProvenanceAPIAccess: msg = ( "ERROR: Your account does not have permission to query the Provenance API\n" ) msg += "(Contact the Software Heritage team to get such permission)" click.echo(click.style(msg, fg="red")) return 1 @scanner.command("setup") @click.pass_context def setup_cmd(ctx: click.Context): """Get guided through setting up the scanner This interactive command gives a quick explanation of what the scanner is, and guides you through the optional authentication as well as the config options, then gives you a few examples for invocations. This setup will run the first time you run the `scan` command, but you may invoke it at anytime using `swh scanner setup`.""" run_setup(ctx)
[docs] def main(): return scanner(auto_envvar_prefix="SWH_SCANNER")
if __name__ == "__main__": main()