Source code for swh.fuse.cli

# Copyright (C) 2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
from pathlib import Path
import re
from typing import Any, Dict

import click

from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.model.cli import CoreSWHIDParamType

# All generic config code should reside in swh.core.config
DEFAULT_CONFIG_PATH = os.environ.get(
    "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml")
)

CACHE_HOME_DIR: Path = (
    Path(os.environ["XDG_CACHE_HOME"])
    if "XDG_CACHE_HOME" in os.environ
    else Path.home() / ".cache"
)

DEFAULT_CONFIG: Dict[str, Any] = {
    "cache": {
        "metadata": {"path": str(CACHE_HOME_DIR / "swh/fuse/metadata.sqlite")},
        "blob": {"path": str(CACHE_HOME_DIR / "swh/fuse/blob.sqlite")},
        "direntry": {"maxram": "10%"},
    },
    "web-api": {
        "url": "https://archive.softwareheritage.org/api/1",
        "auth-token": None,
    },
    "json-indent": 2,
}


[docs] def load_config(config_file=None) -> dict: import logging import yaml from swh.core import config from swh.fuse import LOGGER_NAME logger = logging.getLogger(LOGGER_NAME) if not config_file: config_file = DEFAULT_CONFIG_PATH if os.path.isfile(config_file): conf = config.read_raw_config(config_file) if not conf: raise ValueError(f"Cannot parse configuration file: {config_file}") try: conf = conf["swh"]["fuse"] except KeyError: logger.warning( "No swh:fuse: block found in configuration (%s)", config_file ) # recursive merge not done by config.read conf = config.merge_configs(DEFAULT_CONFIG, conf) # printing the effective configuration is helpful in complicated environments, # but let's void leaking a token in logs if logger.getEffectiveLevel() <= logging.DEBUG: conf_dump = yaml.dump(conf) remove_token = re.compile("(.+token.*:).+\n", re.MULTILINE | re.IGNORECASE) conf_safe = remove_token.sub( lambda m: m.group(1) + " [redacted]\n", conf_dump ) logger.debug("Active configuration:\n%s", conf_safe) else: logger.info("Using default configuration") conf = DEFAULT_CONFIG return conf
@swh_cli_group.group(name="fs", context_settings=CONTEXT_SETTINGS) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help=f"Configuration file (default: {DEFAULT_CONFIG_PATH})", ) @click.pass_context def fuse(ctx, config_file): """Software Heritage virtual file system""" import logging from shutil import which if which("fusermount3") is None: logging.error("Missing dependency: 'fusermount3'") ctx.exit(1) conf = load_config(config_file) ctx.ensure_object(dict) ctx.obj["config"] = conf @fuse.command(name="mount") @click.argument( "path", required=True, metavar="PATH", type=click.Path(exists=True, dir_okay=True, file_okay=False), ) @click.argument("swhids", nargs=-1, metavar="[SWHID]...", type=CoreSWHIDParamType()) @click.option( "-f/-d", "--foreground/--daemon", default=False, help=( "whether to run FUSE attached to the console (foreground) " "or daemonized in the background (default: daemon)" ), ) @click.pass_context def mount(ctx, swhids, path, foreground): """Mount the Software Heritage virtual file system at PATH. If specified, objects referenced by the given SWHIDs will be prefetched and used to populate the virtual file system (VFS). Otherwise the VFS will be populated on-demand, when accessing its content. Example: .. code-block:: bash $ mkdir swhfs $ swh fs mount swhfs/ $ grep printf swhfs/archive/swh:1:cnt:c839dea9e8e6f0528b468214348fee8669b305b2 printf("Hello, World!"); $ """ import asyncio from contextlib import ExitStack import logging from daemon import DaemonContext from swh.fuse import LOGGER_NAME, fuse with ExitStack() as stack: if not foreground: # Disable logging config before daemonizing, and reset it once # daemonized to be sure to not close file handlers log_level = logging.getLogger(LOGGER_NAME).getEffectiveLevel() logging.shutdown() # Stay in the current working directory when spawning daemon cwd = os.getcwd() stack.enter_context(DaemonContext(working_directory=cwd)) logging.config.dictConfig( { "version": 1, "handlers": { "syslog": { "class": "logging.handlers.SysLogHandler", "address": "/dev/log", }, }, "loggers": { LOGGER_NAME: { "level": log_level, "handlers": ["syslog"], }, }, } ) conf = ctx.obj["config"] asyncio.run(fuse.main(swhids, path, conf)) @fuse.command() @click.argument( "path", required=True, metavar="PATH", type=click.Path(exists=True, dir_okay=True, file_okay=False), ) @click.pass_context def umount(ctx, path): """Unmount a mounted virtual file system. Note: this is equivalent to ``fusermount -u PATH``, which can be used to unmount any FUSE-based virtual file system. See ``man fusermount3``. """ import logging import subprocess try: subprocess.run(["fusermount", "-u", path], check=True) except subprocess.CalledProcessError as err: logging.error( "cannot unmount virtual file system: '%s' returned exit status %d", " ".join(err.cmd), err.returncode, ) ctx.exit(1) @fuse.command() @click.pass_context def clean(ctx): """Clean on-disk cache(s).""" def rm_cache(conf, cache_name): try: Path(conf["cache"][cache_name]["path"]).unlink() except (FileNotFoundError, KeyError): pass conf = ctx.obj["config"] for cache_name in ["blob", "metadata"]: rm_cache(conf, cache_name)