Source code for swh.scheduler.celery_backend.pika_listener
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This is the scheduler listener. It is in charge of listening to rabbitmq events (the
task result) and flushes the "oneshot" tasks' status in the scheduler backend. It's the
final step after a task is done.
The scheduler runner :mod:`swh.scheduler.celery_backend.runner` is the module in charge
of pushing tasks in the queue.
"""
import json
import logging
import sys
import pika
from swh.core.statsd import statsd
from swh.scheduler import get_scheduler
from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
[docs]
def get_listener(broker_url, queue_name, scheduler_backend):
connection = pika.BlockingConnection(pika.URLParameters(broker_url))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
exchange = "celeryev"
routing_key = "#"
channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
channel.basic_qos(prefetch_count=1000)
channel.basic_consume(
queue=queue_name,
on_message_callback=get_on_message(scheduler_backend),
)
return channel
[docs]
def get_on_message(scheduler_backend):
def on_message(channel, method_frame, properties, body):
try:
events = json.loads(body)
except Exception:
logger.warning("Could not parse body %r", body)
events = []
if not isinstance(events, list):
events = [events]
for event in events:
logger.debug("Received event %r", event)
process_event(event, scheduler_backend)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
return on_message
[docs]
def process_event(event, scheduler_backend):
uuid = event.get("uuid")
if not uuid:
return
event_type = event["type"]
statsd.increment(
"swh_scheduler_listener_handled_event_total", tags={"event_type": event_type}
)
if event_type == "task-started":
scheduler_backend.start_task_run(
uuid,
timestamp=utcnow(),
metadata={"worker": event.get("hostname")},
)
elif event_type == "task-result":
result = event["result"]
status = None
# XXX we probably should expect result to always be a dict at this point...
if isinstance(result, dict):
status = result.get("status")
if status == "success":
status = "eventful" if result.get("eventful") else "uneventful"
if status is None:
# XXX when is this supposed to be used?
status = "eventful" if result else "uneventful"
metadata = {}
if status == "failed" and isinstance(result, dict):
if "error" in result:
metadata["error"] = result["error"]
scheduler_backend.end_task_run(
uuid, timestamp=utcnow(), status=status, metadata=metadata
)
elif event_type == "task-failed":
scheduler_backend.end_task_run(uuid, timestamp=utcnow(), status="failed")
if __name__ == "__main__":
url = sys.argv[1]
logging.basicConfig(level=logging.DEBUG)
scheduler_backend = get_scheduler(
"postgresql", args={"db": "service=swh-scheduler"}
)
channel = get_listener(url, "celeryev.test", scheduler_backend)
logger.info("Start consuming")
channel.start_consuming()