Source code for swh.objstorage.api.server
# Copyright (C) 2015-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import contextlib
import functools
import logging
import os
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.core.config import read as config_read
from swh.core.statsd import statsd
from swh.objstorage.exc import (
Error,
NoBackendsLeftError,
ObjCorruptedError,
ObjNotFoundError,
)
from swh.objstorage.factory import get_objstorage as get_swhobjstorage
from swh.objstorage.interface import ObjStorageInterface
[docs]
def timed(f):
@functools.wraps(f)
def w(*a, **kw):
with statsd.timed(
"swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__}
):
return f(*a, **kw)
return w
[docs]
@contextlib.contextmanager
def timed_context(f_name):
with statsd.timed(
"swh_objstorage_request_duration_seconds", tags={"endpoint": f_name}
):
yield
[docs]
def get_objstorage():
global objstorage
if objstorage is None:
objstorage = get_swhobjstorage(**app.config["objstorage"])
return objstorage
[docs]
class ObjStorageServerApp(RPCServerApp):
client_exception_classes = (
Error,
NoBackendsLeftError,
ObjCorruptedError,
ObjNotFoundError,
)
method_decorators = [timed]
[docs]
def pre_add(self, kw):
"""Called before the 'add' method."""
statsd.increment(
"swh_objstorage_in_bytes_total",
len(kw["content"]),
tags={"endpoint": "add_bytes"},
)
[docs]
def post_get(self, ret, kw):
"""Called after the 'get' method."""
statsd.increment(
"swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"}
)
app = ObjStorageServerApp(
__name__,
backend_class=ObjStorageInterface,
backend_factory=get_objstorage,
)
objstorage = None
[docs]
@app.errorhandler(Error)
def argument_error_handler(exception):
return error_handler(exception, encode_data, status_code=400)
[docs]
@app.errorhandler(PermissionError)
def permission_error_handler(exception):
return error_handler(exception, encode_data, status_code=403)
[docs]
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
[docs]
@app.route("/")
@timed
def index():
return "SWH Objstorage API server"
api_cfg = None
[docs]
def load_and_check_config(config_file):
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
config_file (str): Path to the configuration file to load
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if not config_file:
raise EnvironmentError("Configuration file must be defined")
if not os.path.exists(config_file):
raise FileNotFoundError("Configuration file %s does not exist" % (config_file,))
cfg = config_read(config_file)
return validate_config(cfg)
[docs]
def validate_config(cfg):
"""Check the minimal configuration is set to run the api or raise an
explanatory error.
Args:
cfg (dict): Loaded configuration.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if "objstorage" not in cfg:
raise KeyError("Invalid configuration; missing objstorage config entry")
missing_keys = []
vcfg = cfg["objstorage"]
if "cls" not in vcfg:
raise KeyError("Invalid configuration; missing cls config entry")
cls = vcfg["cls"]
if cls == "pathslicing":
for key in ("root", "slicing"):
v = vcfg.get(key)
if v is None:
missing_keys.append(key)
if missing_keys:
raise KeyError(
"Invalid configuration; missing %s config entry"
% (", ".join(missing_keys),)
)
return cfg
[docs]
def make_app_from_configfile():
"""Load configuration and then build application to run"""
global api_cfg
if not api_cfg:
config_path = os.environ.get("SWH_CONFIG_FILENAME")
api_cfg = load_and_check_config(config_path)
app.config.update(api_cfg)
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app
if __name__ == "__main__":
print("Deprecated. Use swh-objstorage")