Source code for swh.scheduler.simulator.task_scheduler
# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Agents using the "old" task-based scheduler."""
import logging
from typing import Dict, Generator, Iterator
from simpy import Event
from swh.scheduler.model import TaskRun, TaskRunStatus
from .common import Environment, Queue, Task, TaskEvent
logger = logging.getLogger(__name__)
[docs]
def scheduler_runner_process(
env: Environment,
task_queues: Dict[str, Queue],
min_batch_size: int,
) -> Iterator[Event]:
"""Scheduler runner. Grabs next visits from the database according to the
scheduling policy, and fills the task_queues accordingly."""
while True:
for visit_type, queue in task_queues.items():
remaining = queue.slots_remaining()
if remaining < min_batch_size:
continue
next_tasks = env.scheduler.grab_ready_tasks(
f"load-{visit_type}", num_tasks=remaining, timestamp=env.time
)
logger.debug(
"%s runner: running %s %s tasks",
env.time,
visit_type,
len(next_tasks),
)
sim_tasks = [
Task(visit_type=visit_type, origin=task.arguments.kwargs["url"])
for task in next_tasks
]
env.scheduler.mass_schedule_task_runs(
[
TaskRun(
task=task.id,
scheduled=env.time,
backend_id=str(sim_task.backend_id),
)
for task, sim_task in zip(next_tasks, sim_tasks)
]
)
for sim_task in sim_tasks:
yield queue.put(sim_task)
yield env.timeout(10.0)
[docs]
def scheduler_listener_process(
env: Environment, status_queue: Queue
) -> Generator[Event, TaskEvent, None]:
"""Scheduler listener. In the real world this would listen to celery
events, but we listen to the status_queue and simulate celery events from
that."""
while True:
event = yield status_queue.get()
if event.status.status == "ongoing":
env.scheduler.start_task_run(str(event.task.backend_id), timestamp=env.time)
else:
status: TaskRunStatus = "failed"
if event.status.status == "full":
status = "eventful" if event.eventful else "uneventful"
env.scheduler.end_task_run(
str(event.task.backend_id), status=status, timestamp=env.time
)