Source code for swh.objstorage.api.server

# Copyright (C) 2015-2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import contextlib
import functools
import logging
import os

from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.core.config import read as config_read
from swh.core.statsd import statsd
from swh.objstorage.exc import (
    Error,
    NoBackendsLeftError,
    ObjCorruptedError,
    ObjNotFoundError,
)
from swh.objstorage.factory import get_objstorage as get_swhobjstorage
from swh.objstorage.interface import ObjStorageInterface


[docs] def timed(f): @functools.wraps(f) def w(*a, **kw): with statsd.timed( "swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__} ): return f(*a, **kw) return w
[docs] @contextlib.contextmanager def timed_context(f_name): with statsd.timed( "swh_objstorage_request_duration_seconds", tags={"endpoint": f_name} ): yield
[docs] def get_objstorage(): global objstorage if objstorage is None: objstorage = get_swhobjstorage(**app.config["objstorage"]) return objstorage
[docs] class ObjStorageServerApp(RPCServerApp): client_exception_classes = ( Error, NoBackendsLeftError, ObjCorruptedError, ObjNotFoundError, ) method_decorators = [timed]
[docs] def pre_add(self, kw): """Called before the 'add' method.""" statsd.increment( "swh_objstorage_in_bytes_total", len(kw["content"]), tags={"endpoint": "add_bytes"}, )
[docs] def post_get(self, ret, kw): """Called after the 'get' method.""" statsd.increment( "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"} )
app = ObjStorageServerApp( __name__, backend_class=ObjStorageInterface, backend_factory=get_objstorage, ) objstorage = None
[docs] @app.errorhandler(Error) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400)
[docs] @app.errorhandler(PermissionError) def permission_error_handler(exception): return error_handler(exception, encode_data, status_code=403)
[docs] @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data)
[docs] @app.route("/") @timed def index(): return "SWH Objstorage API server"
api_cfg = None
[docs] def load_and_check_config(config_file): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config_read(config_file) return validate_config(cfg)
[docs] def validate_config(cfg): """Check the minimal configuration is set to run the api or raise an explanatory error. Args: cfg (dict): Loaded configuration. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if "objstorage" not in cfg: raise KeyError("Invalid configuration; missing objstorage config entry") missing_keys = [] vcfg = cfg["objstorage"] if "cls" not in vcfg: raise KeyError("Invalid configuration; missing cls config entry") cls = vcfg["cls"] if cls == "pathslicing": for key in ("root", "slicing"): v = vcfg.get(key) if v is None: missing_keys.append(key) if missing_keys: raise KeyError( "Invalid configuration; missing %s config entry" % (", ".join(missing_keys),) ) return cfg
[docs] def make_app_from_configfile(): """Load configuration and then build application to run""" global api_cfg if not api_cfg: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app
if __name__ == "__main__": print("Deprecated. Use swh-objstorage")