Source code for swh.storage.api.server

# Copyright (C) 2015-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from functools import partial
import logging
import os
from typing import Any, Dict, Optional

from swh.core import config
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.storage import get_storage as get_swhstorage

from ..exc import NonRetryableException
from ..interface import StorageInterface
from ..metrics import send_metric, timed
from .serializers import DECODERS, ENCODERS


[docs] def get_storage(): global storage if not storage: storage = get_swhstorage(**app.config["storage"]) return storage
[docs] class StorageServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS method_decorators = [timed] def _process_metrics(self, metrics, endpoint): for metric, count in metrics.items(): send_metric(metric=metric, count=count, method_name=endpoint)
[docs] def post_content_add(self, ret, kw): self._process_metrics(ret, "content_add")
[docs] def post_content_add_metadata(self, ret, kw): self._process_metrics(ret, "content_add_metadata")
[docs] def post_skipped_content_add(self, ret, kw): self._process_metrics(ret, "skipped_content_add")
[docs] def post_directory_add(self, ret, kw): self._process_metrics(ret, "directory_add")
[docs] def post_revision_add(self, ret, kw): self._process_metrics(ret, "revision_add")
[docs] def post_release_add(self, ret, kw): self._process_metrics(ret, "release_add")
[docs] def post_snapshot_add(self, ret, kw): self._process_metrics(ret, "snapshot_add")
[docs] def post_origin_visit_status_add(self, ret, kw): self._process_metrics(ret, "origin_visit_status_add")
[docs] def post_origin_add(self, ret, kw): self._process_metrics(ret, "origin_add")
[docs] def post_raw_extrinsic_metadata_add(self, ret, kw): self._process_metrics(ret, "raw_extrinsic_metadata_add")
[docs] def post_metadata_fetcher_add(self, ret, kw): self._process_metrics(ret, "metadata_fetcher_add")
[docs] def post_metadata_authority_add(self, ret, kw): self._process_metrics(ret, "metadata_authority_add")
[docs] def post_extid_add(self, ret, kw): self._process_metrics(ret, "extid_add")
[docs] def post_origin_visit_add(self, ret, kw): nb_visits = len(ret) send_metric( "origin_visit:add", count=nb_visits, # method_name should be "origin_visit_add", but changing it now would break # existing metrics method_name="origin_visit", )
app = StorageServerApp( __name__, backend_class=StorageInterface, backend_factory=get_storage ) storage = None
[docs] @app.errorhandler(NonRetryableException) def non_retryable_error_handler(exception): """Send all non-retryable errors with a 400 status code so the client can re-raise them.""" return error_handler( exception, partial(encode_data, extra_type_encoders=ENCODERS), status_code=400 )
app.setup_psycopg2_errorhandlers()
[docs] @app.errorhandler(Exception) def default_error_handler(exception): return error_handler(exception, encode_data)
[docs] @app.route("/") @timed def index(): return """<html> <head><title>Software Heritage storage server</title></head> <body> <p>You have reached the <a href="https://www.softwareheritage.org/">Software Heritage</a> storage server.<br /> See its <a href="https://docs.softwareheritage.org/devel/swh-storage/">documentation and API</a> for more information</p> </body> </html>"""
[docs] @app.route("/stat/counters", methods=["GET"]) @timed def stat_counters(): return encode_data(get_storage().stat_counters())
[docs] @app.route("/stat/refresh", methods=["GET"]) @timed def refresh_stat_counters(): return encode_data(get_storage().refresh_stat_counters())
api_cfg = None
[docs] def load_and_check_config(config_path: Optional[str]) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path: Path to the configuration file to load Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_path: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) if "storage" not in cfg: raise KeyError("Missing 'storage' configuration") return cfg
[docs] def make_app_from_configfile() -> StorageServerApp: """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app
if __name__ == "__main__": print("Deprecated. Use swh-storage")