Source code for swh.scheduler.simulator.origin_scheduler
# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Agents using the new origin-aware scheduler."""
import logging
from typing import Any, Dict, Generator, Iterator, List
from simpy import Event
from swh.scheduler.journal_client import process_journal_objects
from .common import Environment, Queue, Task, TaskEvent
logger = logging.getLogger(__name__)
[docs]
def scheduler_runner_process(
env: Environment,
task_queues: Dict[str, Queue],
policy: str,
min_batch_size: int,
) -> Iterator[Event]:
"""Scheduler runner. Grabs next visits from the database according to the
scheduling policy, and fills the task_queues accordingly."""
while True:
for visit_type, queue in task_queues.items():
remaining = queue.slots_remaining()
if remaining < min_batch_size:
continue
next_origins = env.scheduler.grab_next_visits(
visit_type, remaining, policy=policy, timestamp=env.time
)
logger.debug(
"%s runner: running %s %s tasks",
env.time,
visit_type,
len(next_origins),
)
for origin in next_origins:
yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url))
yield env.timeout(10.0)
[docs]
def scheduler_journal_client_process(
env: Environment, status_queue: Queue
) -> Generator[Event, TaskEvent, None]:
"""Scheduler journal client. Every once in a while, pulls
:class:`OriginVisitStatuses <swh.model.model.OriginVisitStatus>`
from the status_queue to update the scheduler origin_visit_stats table."""
BATCH_SIZE = 100
statuses: List[Dict[str, Any]] = []
while True:
task_event = yield status_queue.get()
statuses.append(task_event.status.to_dict())
if len(statuses) < BATCH_SIZE:
continue
logger.debug(
"%s journal client: processing %s statuses", env.time, len(statuses)
)
process_journal_objects(
{"origin_visit_status": statuses}, scheduler=env.scheduler
)
statuses = []