Source code for swh.storage.metrics
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from contextlib import contextmanager
from functools import wraps
import logging
from time import monotonic
from types import TracebackType
from typing import Dict, Optional, Type
from swh.core.statsd import statsd
logger = logging.getLogger(__name__)
OPERATIONS_METRIC = "swh_storage_operations_total"
OPERATIONS_UNIT_METRIC = "swh_storage_operations_{unit}_total"
DURATION_METRIC = "swh_storage_request_duration_seconds"
[docs]
def timed(f):
"""Time that function!"""
@wraps(f)
def d(*a, **kw):
with statsd.timed(DURATION_METRIC, tags={"endpoint": f.__name__}):
return f(*a, **kw)
return d
[docs]
class DifferentialTimer:
"""Compute differential timing metrics and send them to StatsD.
The DifferentialTimer will send the ``metric`` to statsd, computing the
difference between the time spent within the whole block, with the time
spent within any blocks wrapped in the :meth:`inner` decorator.
If an exception is raised,when sending the metric, the DifferentialTimer
will add a ``inner_exc`` (when the exception is raised by an inner-timed
section) or ``outer_exc`` tag with the name of the exception class that was
raised .
Arguments:
metric: name of the timing metric that is sent to StatsD when the context
manager exits
tags: tags to attach to the metric when it is sent
Example:
To generate a ``metric_seconds`` timed metric, recording the overhead of
running ``run_pre_processing`` and ``run_post_processing``, use:
.. code-block:: python
with DifferentialTimer("metric_seconds") as t:
run_pre_processing()
with t.inner():
run_inner_method()
run_post_processing()
"""
def __init__(self, metric: str, tags: Optional[Dict[str, str]] = None) -> None:
self.start_ts: Optional[float] = None
self.inner_elapsed = 0
self.exc_source = "outer"
self.metric = metric
self.tags = tags or {}
def __enter__(self) -> "DifferentialTimer":
self.start_ts = monotonic()
return self
[docs]
@contextmanager
def inner(self):
"""Add the duration of this block to the inner elapsed time counter."""
start = monotonic()
try:
yield
except BaseException:
self.exc_source = "inner"
raise
finally:
self.inner_elapsed += monotonic() - start
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
_exc_value: Optional[BaseException],
_traceback: Optional[TracebackType],
) -> None:
if self.start_ts is None:
logger.warning("DifferentialTimer exited without being started")
return
total_elapsed = monotonic() - self.start_ts
extra_tags = {}
if exc_type:
extra_tags[f"{self.exc_source}_exc"] = exc_type.__name__
if self.inner_elapsed == 0 and not exc_type:
logger.warning("DifferentialTimer didn't record calls to `inner`?")
# Statsd expects timings in milliseconds
value = (total_elapsed - self.inner_elapsed) * 1000
statsd.timing(
self.metric,
value,
tags={**self.tags, **extra_tags},
)
[docs]
def send_metric(metric, count, method_name):
"""Send statsd metric with count for method `method_name`
If count is 0, the metric is discarded. If the metric is not
parseable, the metric is discarded with a log message.
Args:
metric (str): Metric's name (e.g content:add, content:add:bytes)
count (int): Associated value for the metric
method_name (str): Method's name
Returns:
Bool to explicit if metric has been set or not
"""
if count == 0:
return False
metric_type = metric.split(":")
_length = len(metric_type)
if _length == 2:
object_type, operation = metric_type
metric_name = OPERATIONS_METRIC
elif _length == 3:
object_type, operation, unit = metric_type
metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit)
else:
logging.warning("Skipping unknown metric {%s: %s}" % (metric, count))
return False
statsd.increment(
metric_name,
count,
tags={
"endpoint": method_name,
"object_type": object_type,
"operation": operation,
},
)
return True
[docs]
def process_metrics(f):
"""Increment object counters for the decorated function."""
@wraps(f)
def d(*a, **kw):
r = f(*a, **kw)
for metric, count in r.items():
send_metric(metric=metric, count=count, method_name=f.__name__)
return r
return d