Source code for swh.scheduler.task
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
from celery import current_app
import celery.app.task
from celery.utils.log import get_task_logger
from swh.core.statsd import Statsd
[docs]
def ts():
return int(datetime.utcnow().timestamp())
[docs]
class SWHTask(celery.app.task.Task):
"""a schedulable task (abstract class)
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
_statsd = None
_log = None
reject_on_worker_lost = None
"""Inherited from :class:`celery.app.task.Task`, but we need to override
its docstring because it uses a custom ReST role"""
@property
def statsd(self):
if self._statsd:
return self._statsd
worker_name = current_app.conf.get("worker_name")
if worker_name:
self._statsd = Statsd(
constant_tags={
"task": self.name,
"worker": worker_name,
}
)
return self._statsd
else:
statsd = Statsd(
constant_tags={
"task": self.name,
"worker": "unknown worker",
}
)
return statsd
def __call__(self, *args, **kwargs):
self.statsd.increment("swh_task_called_count")
self.statsd.gauge("swh_task_start_ts", ts())
with self.statsd.timed("swh_task_duration_seconds"):
result = super().__call__(*args, **kwargs)
try:
status = result["status"]
if status == "success":
status = "eventful" if result.get("eventful") else "uneventful"
except Exception:
status = "eventful" if result else "uneventful"
self.statsd.gauge("swh_task_end_ts", ts(), tags={"status": status})
return result
[docs]
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.statsd.increment("swh_task_failure_count")
[docs]
def on_success(self, retval, task_id, args, kwargs):
self.statsd.increment("swh_task_success_count")
# this is a swh specific event. Used to attach the retval to the
# task_run
self.send_event("task-result", result=retval)
@property
def log(self):
if self._log is None:
self._log = get_task_logger(self.name)
return self._log
[docs]
def run(self, *args, **kwargs):
self.log.debug("%s: args=%s, kwargs=%s", self.name, args, kwargs)
ret = super().run(*args, **kwargs)
self.log.debug("%s: OK => %s", self.name, ret)
return ret