Source code for swh.scheduler.api.server

# Copyright (C) 2018-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
import os
from typing import Dict

from swh.core import config
from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler, negotiate
from swh.scheduler import get_scheduler
from swh.scheduler.exc import SchedulerException
from swh.scheduler.interface import SchedulerInterface

from .serializers import DECODERS, ENCODERS

scheduler = None


[docs] def get_global_scheduler(): global scheduler if not scheduler: scheduler = get_scheduler(**app.config["scheduler"]) return scheduler
[docs] class SchedulerServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS
app = SchedulerServerApp( __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler )
[docs] @app.errorhandler(SchedulerException) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400)
[docs] @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data)
[docs] def has_no_empty_params(rule): return len(rule.defaults or ()) >= len(rule.arguments or ())
[docs] @app.route("/") def index(): return """<html> <head><title>Software Heritage scheduler RPC server</title></head> <body> <p>You have reached the <a href="https://www.softwareheritage.org/">Software Heritage</a> scheduler RPC server.<br /> See its <a href="https://docs.softwareheritage.org/devel/swh-scheduler/">documentation and API</a> for more information</p> </body> </html>"""
[docs] @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def site_map(): links = [] for rule in app.url_map.iter_rules(): if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint): links.append( dict( rule=rule.rule, description=getattr(SchedulerInterface, rule.endpoint).__doc__, ) ) # links is now a list of url, endpoint tuples return links
[docs] def load_and_check_config(config_path: str, type: str = "postgresql") -> Dict: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path: Configuration file path to load type: Configuration type, for 'postgresql' type (the default), more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_path: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) vcfg = cfg.get("scheduler") if not vcfg: raise KeyError("Missing '%scheduler' configuration") if type == "postgresql": cls = vcfg.get("cls") if cls not in ("local", "postgresql"): raise ValueError( "The scheduler backend can only be started with a 'postgresql' " "configuration" ) db = vcfg.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg
api_cfg = None
[docs] def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app
if __name__ == "__main__": print('Please use the "swh-scheduler api-server" command')