# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
from pathlib import Path
import sys
from typing import Any, Dict, Optional
import click
from importlib_metadata import version
import yaml
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from .exceptions import DBError
# Config for the "serve" option
BACKEND_DEFAULT_PORT = 5011
# All generic config code should reside in swh.core.config
CONFIG_ENVVAR = "SWH_CONFIG_FILE"
DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml")
SWH_API_ROOT = "https://archive.softwareheritage.org/api/1/"
DEFAULT_CONFIG: Dict[str, Any] = {
"web-api": {
"url": SWH_API_ROOT,
"auth-token": None,
}
}
CONFIG_FILE_HELP = f"""Configuration file:
\b
The CLI option or the environment variable will fail if invalid.
CLI option is checked first.
Then, environment variable {CONFIG_ENVVAR} is checked.
Then, if cannot load the default path, a set of default values are used.
Default config path is {DEFAULT_CONFIG_PATH}.
Default config values are:
\b
{yaml.dump(DEFAULT_CONFIG)}"""
SCANNER_HELP = f"""Software Heritage Scanner tools.
{CONFIG_FILE_HELP}"""
[docs]
def setup_config(ctx, api_url):
config = ctx.obj["config"]
if api_url:
if not api_url.endswith("/"):
api_url += "/"
config["web-api"]["url"] = api_url
return config
[docs]
def check_auth(config):
"""check there is some authentication configured
Issue a warning otherwise"""
web_api_conf = config["web-api"]
if web_api_conf["url"] == SWH_API_ROOT and not web_api_conf.get("auth-token"):
# Only warn for the production API
#
# XXX We should probably warn at the time of the creation of the HTTP
# Client, after checking if the token is actually valid.
msg = "Warning: you are not authenticated with the Software Heritage API\n"
msg += "login to get a higher rate-limit"
click.echo(click.style(msg, fg="red"), file=sys.stderr)
msg = "See `swh scanner login -h` for more information."
click.echo(click.style(msg, fg="yellow"), file=sys.stderr)
@swh_cli_group.group(
name="scanner",
context_settings=CONTEXT_SETTINGS,
help=SCANNER_HELP,
)
@click.option(
"-C",
"--config-file",
default=None,
type=click.Path(exists=False, dir_okay=False, path_type=str),
help="""YAML configuration file""",
)
@click.version_option(
version=version("swh.scanner"),
prog_name="swh.scanner",
)
@click.pass_context
def scanner(ctx, config_file: Optional[str]):
env_config_path = os.environ.get(CONFIG_ENVVAR)
# read_raw_config do not fail if file does not exist, so check it beforehand
# while enforcing loading priority
if config_file:
if not config.exists_accessible(config_file):
raise click.BadParameter(
f"File '{config_file}' cannot be opened.", param_hint="--config-file"
)
elif env_config_path:
if not config.exists_accessible(env_config_path):
raise click.BadParameter(
f"File '{env_config_path}' cannot be opened.", param_hint=CONFIG_ENVVAR
)
config_file = env_config_path
elif config.exists_accessible(DEFAULT_CONFIG_PATH):
config_file = DEFAULT_CONFIG_PATH
conf = DEFAULT_CONFIG
if config_file is not None:
conf = config.read_raw_config(config_file)
conf = config.merge_configs(DEFAULT_CONFIG, conf)
else:
config_file = DEFAULT_CONFIG_PATH
ctx.ensure_object(dict)
ctx.obj["config_path"] = Path(config_file)
ctx.obj["config"] = conf
@scanner.command(name="login")
@click.option(
"-f",
"--force/--no-force",
default=False,
help="Proceed even if a token is already present in the config",
)
@click.pass_context
def login(ctx, force):
"""Perform the necessary step to log yourself in the API
You will need to first create an account before running this operation. To
create an account, visit: https://archive.softwareheritage.org/
"""
context = ctx.obj
# Check we are actually talking to the Software Heritage itself.
web_api_config = context["config"]["web-api"]
current_url = web_api_config["url"]
config_path = context["config_path"]
if current_url != SWH_API_ROOT:
msg = "`swh scanner login` only works with the Software Heritage API\n"
click.echo(click.style(msg, fg="red"), file=sys.stderr)
msg = f"Configured in '%s' as web-api.url={current_url}\n"
msg %= click.format_filename(bytes(config_path))
click.echo(click.style(msg, fg="red"), file=sys.stderr)
ctx.exit(1)
# Check for an existing value in the configuration
if web_api_config.get("auth-token") is not None:
click.echo(click.style("You appear to already be logged in.", fg="green"))
if not force:
click.echo("Hint: use `--force` to overwrite the current token")
ctx.exit()
click.echo(click.style("Continuing because of `--force`.", fg="yellow"))
# Obtain a valid token through the API
#
# Coming from the swh auth generate-token code
# (this command might eventually move there)
from getpass import getpass
from swh.auth.keycloak import (
KeycloakError,
KeycloakOpenIDConnect,
keycloak_error_message,
)
msg = "Please enter your SWH Archive credentials"
click.echo(click.style(msg, fg="yellow"))
msg = "If you do not already have an account, create one one at:"
click.echo(click.style(msg, fg="yellow"))
msg = " https://archive.softwareheritage.org/"
click.echo(click.style(msg, fg="yellow"))
username = click.prompt("username")
password = getpass()
try:
url = "https://auth.softwareheritage.org/auth/"
realm = "SoftwareHeritage"
client = "swh-web"
oidc_client = KeycloakOpenIDConnect(url, realm, client)
scope = "openid offline_access"
oidc_info = oidc_client.login(username, password, scope)
token = oidc_info["refresh_token"]
msg = "token retrieved successfully"
click.echo(click.style(msg, fg="green"))
except KeycloakError as ke:
print(keycloak_error_message(ke))
click.exit(1)
# Write the new token into the file.
web_api_config["auth-token"] = token
# TODO use ruamel.yaml to preserve comments in config file
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(yaml.safe_dump(context["config"]))
msg = "\nConfiguration file '%s' written successfully."
msg %= click.format_filename(bytes(config_path))
click.echo(click.style(msg, fg="green"))
click.echo("`swh scanner` will now be authenticated with the new token.")
@scanner.command(name="scan")
@click.argument("root_path", default=".", type=click.Path(exists=True))
@click.option(
"-u",
"--api-url",
default=None,
metavar="API_URL",
show_default=True,
help="URL for the api request",
)
@click.option(
"--exclude",
"-x",
"patterns",
metavar="PATTERN",
multiple=True,
help="Exclude directories using glob patterns \
(e.g., ``*.git`` to exclude all .git directories)",
)
@click.option(
"-f",
"--output-format",
"out_fmt",
default="summary",
show_default=True,
type=click.Choice(
["summary", "text", "json", "ndjson", "sunburst"], case_sensitive=False
),
help="The output format",
)
@click.option(
"-i", "--interactive", is_flag=True, help="Show the result in a dashboard"
)
@click.option(
"-p",
"--policy",
default="auto",
show_default=True,
type=click.Choice(
["auto", "bfs", "greedybfs", "filepriority", "dirpriority", "randomdir"]
),
help="The scan policy.",
)
@click.option(
"-e",
"--extra-info",
"extra_info",
multiple=True,
type=click.Choice(["origin"]),
help="Add selected additional information about known software artifacts.",
)
@click.pass_context
def scan(ctx, root_path, api_url, patterns, out_fmt, interactive, policy, extra_info):
"""Scan a source code project to discover files and directories already
present in the archive.
The command can provide different output using the --output-format option:\n
\b
summary: display a general summary of what the scanner found
text: display the scan result as a text based tree-like view of all the
file, using color to indicate the file status.
json: write all collected data on standard output as JSON
json: write all collected data on standard output as Newline Delimited JSON
sunburst: produce a dynamic chart as .html file. (in $PWD/chart.html)
The source code project can be checked using different policies that can be set
using the -p/--policy option:\n
\b
auto: it selects the best policy based on the source code, for codebase(s)
with less than 1000 file/dir contents all the nodes will be queried.
bfs: scan the source code in the BFS order, checking unknown directories only.
\b
greedybfs: same as "bfs" policy, but lookup the status of source code artifacts
in chunks, in order to minimize the number of Web API round-trips with the
archive.
\b
filepriority: scan all the source code file contents, checking only unset
directories. (useful if the codebase contains a lot of source files)
dirpriority: scan all the source code directories and check only unknown
directory contents.
randomdir: scan the source code using a random Merkle search on directories.
Other information about software artifacts could be specified with the -e/
--extra-info option:\n
\b
origin: search the origin url of each source code files/dirs using the in-memory
compressed graph."""
import swh.scanner.scanner as scanner
config = setup_config(ctx, api_url)
check_auth(config)
extra_info = set(extra_info)
scanner.scan(config, root_path, patterns, out_fmt, interactive, policy, extra_info)
@scanner.group("db", help="Manage local knowledge base for swh-scanner")
@click.pass_context
def db(ctx):
pass
@db.command("import")
@click.option(
"-i",
"--input",
"input_file",
metavar="INPUT_FILE",
required=True,
type=click.File("r"),
help="A file containing SWHIDs",
)
@click.option(
"-o",
"--output",
"output_file_db",
metavar="OUTPUT_DB_FILE",
required=True,
show_default=True,
help="The name of the generated sqlite database",
)
@click.option(
"-s",
"--chunk-size",
"chunk_size",
default="10000",
metavar="SIZE",
show_default=True,
type=int,
help="The chunk size ",
)
@click.pass_context
def import_(ctx, chunk_size, input_file, output_file_db):
"""Create SQLite database of known SWHIDs from a textual list of SWHIDs"""
from .db import Db
db = Db(output_file_db)
cur = db.conn.cursor()
try:
db.create_from(input_file, chunk_size, cur)
db.close()
except DBError as e:
ctx.fail("Failed to import SWHIDs into database: {0}".format(e))
@db.command("serve")
@click.option(
"-h",
"--host",
metavar="HOST",
default="127.0.0.1",
show_default=True,
help="The host of the API server",
)
@click.option(
"-p",
"--port",
metavar="PORT",
default=f"{BACKEND_DEFAULT_PORT}",
show_default=True,
help="The port of the API server",
)
@click.option(
"-f",
"--db-file",
"db_file",
metavar="DB_FILE",
default="SWHID_DB.sqlite",
show_default=True,
type=click.Path(exists=True),
help="An sqlite database file (it can be generated with: 'swh scanner db import')",
)
@click.pass_context
def serve(ctx, host, port, db_file):
"""Start an API service using the sqlite database generated with the "db import"
option."""
import swh.scanner.backend as backend
from .db import Db
db = Db(db_file)
backend.run(host, port, db)
db.close()
[docs]
def main():
return scanner(auto_envvar_prefix="SWH_SCANNER")
if __name__ == "__main__":
main()