swh.scheduler.celery_backend.runner module#

This is the first scheduler runner. It is in charge of scheduling “oneshot” tasks (e.g save code now, indexer, vault, deposit, …). To do this, it reads tasks ouf of the scheduler backend and pushes those to their associated rabbitmq queues.

The scheduler listener swh.scheduler.celery_backend.pika_listener is the module in charge of finalizing the task results.

swh.scheduler.celery_backend.runner.run_ready_tasks(backend: SchedulerInterface, app, task_types: List[Dict] = [], with_priority: bool = False) List[Dict][source]#

Schedule tasks ready to be scheduled.

This lookups any tasks per task type and mass schedules those accordingly (send messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler backend).

If tasks (per task type) with priority exist, they will get redirected to dedicated high priority queue (standard queue name prefixed with save_code_now:).

Parameters:
  • backend – scheduler backend to interact with (read/update tasks)

  • app (App) – Celery application to send tasks to

  • task_types – The list of task types dict to iterate over. By default, empty. When empty, the full list of task types referenced in the scheduler will be used.

  • with_priority – If True, only tasks with priority set will be fetched and scheduled. By default, False.

Returns:

A list of dictionaries:

{
  'task': the scheduler's task id,
  'backend_id': Celery's task id,
  'scheduler': utcnow()
}

The result can be used to block-wait for the tasks’ results:

backend_tasks = run_ready_tasks(self.scheduler, app)
for task in backend_tasks:
    AsyncResult(id=task['backend_id']).get()

swh.scheduler.celery_backend.runner.main()[source]#