Source code for swh.storage.api.server

# Copyright (C) 2015-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
import os
from typing import Any, Dict, Optional

from swh.core import config
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.storage import get_storage as get_swhstorage

from ..exc import StorageArgumentException
from ..interface import StorageInterface
from ..metrics import send_metric, timed
from .serializers import DECODERS, ENCODERS


[docs] def get_storage(): global storage if not storage: storage = get_swhstorage(**app.config["storage"]) return storage
[docs] class StorageServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS method_decorators = [timed] def _process_metrics(self, metrics, endpoint): for metric, count in metrics.items(): send_metric(metric=metric, count=count, method_name=endpoint)
[docs] def post_content_add(self, ret, kw): self._process_metrics(ret, "content_add")
[docs] def post_content_add_metadata(self, ret, kw): self._process_metrics(ret, "content_add_metadata")
[docs] def post_skipped_content_add(self, ret, kw): self._process_metrics(ret, "skipped_content_add")
[docs] def post_directory_add(self, ret, kw): self._process_metrics(ret, "directory_add")
[docs] def post_revision_add(self, ret, kw): self._process_metrics(ret, "revision_add")
[docs] def post_release_add(self, ret, kw): self._process_metrics(ret, "release_add")
[docs] def post_snapshot_add(self, ret, kw): self._process_metrics(ret, "snapshot_add")
[docs] def post_origin_visit_status_add(self, ret, kw): self._process_metrics(ret, "origin_visit_status_add")
[docs] def post_origin_add(self, ret, kw): self._process_metrics(ret, "origin_add")
[docs] def post_raw_extrinsic_metadata_add(self, ret, kw): self._process_metrics(ret, "raw_extrinsic_metadata_add")
[docs] def post_metadata_fetcher_add(self, ret, kw): self._process_metrics(ret, "metadata_fetcher_add")
[docs] def post_metadata_authority_add(self, ret, kw): self._process_metrics(ret, "metadata_authority_add")
[docs] def post_extid_add(self, ret, kw): self._process_metrics(ret, "extid_add")
[docs] def post_origin_visit_add(self, ret, kw): nb_visits = len(ret) send_metric( "origin_visit:add", count=nb_visits, # method_name should be "origin_visit_add", but changing it now would break # existing metrics method_name="origin_visit", )
app = StorageServerApp( __name__, backend_class=StorageInterface, backend_factory=get_storage ) storage = None
[docs] @app.errorhandler(StorageArgumentException) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400)
app.setup_psycopg2_errorhandlers()
[docs] @app.errorhandler(Exception) def default_error_handler(exception): return error_handler(exception, encode_data)
[docs] @app.route("/") @timed def index(): return """<html> <head><title>Software Heritage storage server</title></head> <body> <p>You have reached the <a href="https://www.softwareheritage.org/">Software Heritage</a> storage server.<br /> See its <a href="https://docs.softwareheritage.org/devel/swh-storage/">documentation and API</a> for more information</p> </body> </html>"""
[docs] @app.route("/stat/counters", methods=["GET"]) @timed def stat_counters(): return encode_data(get_storage().stat_counters())
[docs] @app.route("/stat/refresh", methods=["GET"]) @timed def refresh_stat_counters(): return encode_data(get_storage().refresh_stat_counters())
api_cfg = None
[docs] def load_and_check_config(config_path: Optional[str]) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path: Path to the configuration file to load Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_path: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) if "storage" not in cfg: raise KeyError("Missing 'storage' configuration") return cfg
[docs] def make_app_from_configfile() -> StorageServerApp: """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app
if __name__ == "__main__": print("Deprecated. Use swh-storage")