Source code for swh.objstorage.backends.winery.gunicorn

# Copyright (C) 2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from collections import Counter
import logging
import sys
from time import monotonic
from typing import Tuple

from swh.core.api.gunicorn_config import *  # noqa

logger = logging.getLogger(__name__)


[docs] def worker_exit(arbiter, worker): logger.info("Calling worker_exit") shutdown_storage_backend()
[docs] def worker_int(worker): logger.warning("Calling worker_int") shutdown_storage_backend()
[docs] def shutdown_storage_backend(): """Run on_shutdown callback for storage when a worker is terminating""" if "swh.objstorage.api.server" not in sys.modules: return objstorage = sys.modules["swh.objstorage.api.server"].objstorage if on_shutdown := getattr(objstorage, "on_shutdown", None): on_shutdown()
[docs] class ThrottledAccessLog(logging.Filter): """Throttle gunicorn access log lines for `status_codes` to at most one every `interval` seconds""" def __init__(self, interval: int = 60, status_codes: Tuple[int, ...] = (200,)): super().__init__(name="gunicorn.access") self.status_codes = set(str(code) for code in status_codes) self.endpoints: Counter[str] = Counter() self.interval = interval self.previous_flush = monotonic() self.deadline = self.previous_flush + self.interval
[docs] def filter(self, record): # gunicorn.access records are using `s` for status code and `U` for the # requested path if not (ret := super().filter(record)): return ret if record.args["s"] not in self.status_codes: return True # If we quiesce different status codes, stick them in different buckets if len(self.status_codes) > 1: bucket = f"{record.args['U']}({record.args['s']})" else: bucket = record.args["U"] self.endpoints[bucket] += 1 now = monotonic() if self.deadline > now: # Quiesce record return False # logging.Filter must mutate the log record instead of creating a # new one, up to and including Python 3.11 record.msg = ( "Served %(total)s requests in the last %(interval).1fs, " "including %(most_common)s" ) record.args = { "total": self.endpoints.total(), "most_common": ", ".join( f"{v} {path}" for path, v in self.endpoints.most_common(3) ), "interval": now - self.previous_flush, } self.previous_flush = now self.deadline = now + self.interval self.endpoints = Counter() return True