Source code for swh.vault.api.server

# Copyright (C) 2016-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import asyncio
import collections
import os

import aiohttp.web

from swh.core import config
from swh.core.api.asynchronous import RPCServerApp, decode_request
from swh.core.api.asynchronous import encode_data_server as encode_data
from swh.model import hashutil
from swh.vault import get_vault
from swh.vault.backend import NotFoundExc
from swh.vault.cookers import COOKER_TYPES

DEFAULT_CONFIG_PATH = "vault/server"
DEFAULT_CONFIG = {
    "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}),
    "cache": (
        "dict",
        {
            "cls": "pathslicing",
            "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",},
        },
    ),
    "client_max_size": ("int", 1024 ** 3),
    "vault": (
        "dict",
        {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},},
    ),
    "scheduler": ("dict", {"cls": "remote", "url": "http://localhost:5008/",},),
}


[docs]@asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server")
# Web API endpoints
[docs]@asyncio.coroutine def vault_fetch(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] if not request.app["backend"].is_available(obj_type, obj_id): raise NotFoundExc(f"{obj_type} {obj_id} is not available.") return encode_data(request.app["backend"].fetch(obj_type, obj_id))
[docs]def user_info(task_info): return { "id": task_info["id"], "status": task_info["task_status"], "progress_message": task_info["progress_msg"], "obj_type": task_info["type"], "obj_id": hashutil.hash_to_hex(task_info["object_id"]), }
[docs]@asyncio.coroutine def vault_cook(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] email = request.query.get("email") sticky = request.query.get("sticky") in ("true", "1") if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") info = request.app["backend"].cook_request( obj_type, obj_id, email=email, sticky=sticky ) # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info))
[docs]@asyncio.coroutine def vault_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] info = request.app["backend"].task_info(obj_type, obj_id) if not info: raise NotFoundExc(f"{obj_type} {obj_id} was not found.") return encode_data(user_info(info))
# Cookers endpoints
[docs]@asyncio.coroutine def set_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] progress = yield from decode_request(request) request.app["backend"].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value?
[docs]@asyncio.coroutine def set_status(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] status = yield from decode_request(request) request.app["backend"].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value?
[docs]@asyncio.coroutine def put_bundle(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] # TODO: handle streaming properly content = yield from decode_request(request) request.app["backend"].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value?
[docs]@asyncio.coroutine def send_notif(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] request.app["backend"].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value?
# Batch endpoints
[docs]@asyncio.coroutine def batch_cook(request): batch = yield from decode_request(request) for obj_type, obj_id in batch: if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") batch_id = request.app["backend"].batch_cook(batch) return encode_data({"id": batch_id})
[docs]@asyncio.coroutine def batch_progress(request): batch_id = request.match_info["batch_id"] bundles = request.app["backend"].batch_info(batch_id) if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") bundles = [user_info(bundle) for bundle in bundles] counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return encode_data(res)
# Web server
[docs]def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.router.add_route("GET", "/", index) app.client_exception_classes = (NotFoundExc,) # Endpoints used by the web API app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) # Endpoints used by the Cookers app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) app.router.add_route("POST", "/set_status/{type}/{id}", set_status) app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) # Endpoints for batch requests app.router.add_route("POST", "/batch_cook", batch_cook) app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) app["backend"] = backend return app
[docs]def get_local_backend(cfg): if "vault" not in cfg: raise ValueError("missing '%vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "local": raise EnvironmentError( "The vault backend can only be started with a 'local' " "configuration", err=True, ) args = vcfg["args"] if "cache" not in args: args["cache"] = cfg.get("cache") if "storage" not in args: args["storage"] = cfg.get("storage") if "scheduler" not in args: args["scheduler"] = cfg.get("scheduler") for key in ("cache", "storage", "scheduler"): if not args.get(key): raise ValueError("invalid configuration; missing %s config entry." % key) return get_vault("local", args)
[docs]def make_app_from_configfile(config_file=None, **kwargs): if config_file is None: config_file = DEFAULT_CONFIG_PATH config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) if os.path.isfile(config_file): cfg = config.read(config_file, DEFAULT_CONFIG) else: cfg = config.load_named_config(config_file, DEFAULT_CONFIG) vault = get_local_backend(cfg) return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs)
if __name__ == "__main__": print("Deprecated. Use swh-vault ")