Source code for swh.objstorage.api.server
# Copyright (C) 2015-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import contextlib
import functools
import logging
import os
from typing import Iterator, Optional
from flask import request
import msgpack
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.core.config import read as config_read
from swh.core.statsd import statsd
from swh.objstorage.exc import (
Error,
NoBackendsLeftError,
ObjCorruptedError,
ObjNotFoundError,
)
from swh.objstorage.factory import get_objstorage as get_swhobjstorage
from swh.objstorage.interface import ObjStorageInterface
[docs]
def timed(f):
@functools.wraps(f)
def w(*a, **kw):
with statsd.timed(
"swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__}
):
return f(*a, **kw)
return w
[docs]
@contextlib.contextmanager
def timed_context(f_name):
with statsd.timed(
"swh_objstorage_request_duration_seconds", tags={"endpoint": f_name}
):
yield
[docs]
def get_objstorage():
global objstorage
if objstorage is None:
objstorage = get_swhobjstorage(**app.config["objstorage"])
return objstorage
[docs]
class ObjStorageServerApp(RPCServerApp):
client_exception_classes = (
Error,
NoBackendsLeftError,
ObjCorruptedError,
ObjNotFoundError,
)
method_decorators = [timed]
[docs]
def pre_add(self, kw):
"""Called before the 'add' method."""
statsd.increment(
"swh_objstorage_in_bytes_total",
len(kw["content"]),
tags={"endpoint": "add_bytes"},
)
[docs]
def post_get(self, ret, kw):
"""Called after the 'get' method."""
statsd.increment(
"swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"}
)
app = ObjStorageServerApp(
__name__,
backend_class=ObjStorageInterface,
backend_factory=get_objstorage,
)
objstorage = None
[docs]
@app.errorhandler(Error)
def argument_error_handler(exception):
return error_handler(exception, encode_data, status_code=400)
[docs]
@app.errorhandler(PermissionError)
def permission_error_handler(exception):
return error_handler(exception, encode_data, status_code=403)
[docs]
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
[docs]
@app.route("/")
@timed
def index():
return "SWH Objstorage API server"
[docs]
@app.route("/content")
def list_content():
last_obj_id = request.args.get("last_obj_id")
if last_obj_id:
last_obj_id = bytes.fromhex(last_obj_id)
limit: Optional[str] = request.args.get("limit")
if limit:
limit = int(limit)
def generate() -> Iterator[bytes]:
with timed_context("list_content"):
packer = msgpack.Packer(use_bin_type=True)
for obj in get_objstorage().list_content(last_obj_id, limit=limit):
yield packer.pack(obj)
return app.response_class(generate())
api_cfg = None
[docs]
def load_and_check_config(config_file):
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
config_file (str): Path to the configuration file to load
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if not config_file:
raise EnvironmentError("Configuration file must be defined")
if not os.path.exists(config_file):
raise FileNotFoundError("Configuration file %s does not exist" % (config_file,))
cfg = config_read(config_file)
return validate_config(cfg)
[docs]
def validate_config(cfg):
"""Check the minimal configuration is set to run the api or raise an
explanatory error.
Args:
cfg (dict): Loaded configuration.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if "objstorage" not in cfg:
raise KeyError("Invalid configuration; missing objstorage config entry")
missing_keys = []
vcfg = cfg["objstorage"]
if "cls" not in vcfg:
raise KeyError("Invalid configuration; missing cls config entry")
cls = vcfg["cls"]
if cls == "pathslicing":
for key in ("root", "slicing"):
v = vcfg.get(key)
if v is None:
missing_keys.append(key)
if missing_keys:
raise KeyError(
"Invalid configuration; missing %s config entry"
% (", ".join(missing_keys),)
)
return cfg
[docs]
def make_app_from_configfile():
"""Load configuration and then build application to run"""
global api_cfg
if not api_cfg:
config_path = os.environ.get("SWH_CONFIG_FILENAME")
api_cfg = load_and_check_config(config_path)
app.config.update(api_cfg)
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app
if __name__ == "__main__":
print("Deprecated. Use swh-objstorage")