swh.scheduler.celery_backend.runner module#
This is the first scheduler runner. It is in charge of scheduling “oneshot” tasks (e.g save code now, indexer, vault, deposit, …). To do this, it reads tasks ouf of the scheduler backend and pushes those to their associated rabbitmq queues.
The scheduler listener swh.scheduler.celery_backend.pika_listener
is the module
in charge of finalizing the task results.
- swh.scheduler.celery_backend.runner.write_to_backends(backend: SchedulerInterface, app, backend_tasks: List, celery_tasks: List)[source]#
Utility function to unify the writing to rabbitmq and the scheduler backends in a consistent way (towards atomicity).
Messages are first sent to rabbitmq then postgresql.
In the nominal case where all writes are ok, that changes nothing vs the previous implementation (postgresql first then rabbitmq).
In degraded performance though, that’s supposedly better.
1. If we cannot write to rabbitmq, then we won’t write to postgresql either, that function will raise and stop.
2. If we can write to rabbitmq first, then the messages will be consumed independently from this. And then, if we cannot write to postgresql (for some reason), then we just lose the information we sent the task already. This means the same task will be rescheduled and we’ll have a go at it again. As those kind of tasks are supposed to be idempotent, that should not a major issue for their upstream.
Also, those tasks are mostly listers now and they have a state management of their own, so that should definitely mostly noops (if the ingestion from the previous run went fine). Edge cases scenario like down site will behave as before.
- swh.scheduler.celery_backend.runner.run_ready_tasks(backend: SchedulerInterface, app, task_types: List[TaskType] = [], task_type_patterns: List[str] = [], with_priority: bool = False) List[TaskRun] [source]#
Schedule tasks ready to be scheduled.
This lookups any tasks per task type and mass schedules those accordingly (send messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler backend).
If tasks (per task type) with priority exist, they will get redirected to dedicated high priority queue (standard queue name prefixed with save_code_now:).
- Parameters:
backend – scheduler backend to interact with (read/update tasks)
app (App) – Celery application to send tasks to
task_types – The list of task types dict to iterate over. When empty (the default), the full list of task types referenced in the scheduler backend will be used.
task_type_patterns – List of task type patterns allowed to be scheduled. If task type does not match, they are skipped from the scheduling. When empty (the default), there is no filtering on the task types.
with_priority – If True, only tasks with priority set will be fetched and scheduled. By default, False.
- Returns:
A list of TaskRun scheduled