# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This package runs the scheduler in a simulated environment, to evaluate
various metrics. See :ref:`swh-scheduler-simulator`.
This module orchestrates of the simulator by initializing processes and connecting
them together; these processes are defined in modules in the package and
simulate/call specific components."""
from datetime import datetime, timedelta, timezone
import logging
from typing import Dict, Generator, Optional
from simpy import Event
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.utils import create_origin_tasks, utcnow
from . import origin_scheduler, task_scheduler
from .common import Environment, Queue, SimulationReport, Task
from .origins import generate_listed_origin, lister_process, load_task_process
logger = logging.getLogger(__name__)
[docs]
def update_metrics_process(
env: Environment, update_interval: int
) -> Generator[Event, None, None]:
"""Update the scheduler metrics every `update_interval` (simulated) seconds,
and add them to the SimulationReport
"""
t0 = env.time
while True:
metrics = env.scheduler.update_metrics(timestamp=env.time)
env.report.record_metrics(env.time, metrics)
dt = env.time - t0
logger.info("time:%s visits:%s", dt, env.report.total_visits)
yield env.timeout(update_interval)
[docs]
def worker_process(
env: Environment, name: str, task_queue: Queue, status_queue: Queue
) -> Generator[Event, Task, None]:
"""A worker which consumes tasks from the input task_queue. Tasks
themselves send OriginVisitStatus objects to the status_queue."""
logger.debug("%s worker %s: Start", env.time, name)
while True:
task = yield task_queue.get()
logger.debug(
"%s worker %s: Run task %s origin=%s",
env.time,
name,
task.visit_type,
task.origin,
)
yield env.process(load_task_process(env, task, status_queue=status_queue))
[docs]
def setup(
env: Environment,
scheduler_type: str,
policy: Optional[str],
workers_per_type: Dict[str, int],
task_queue_capacity: int,
min_batch_size: int,
metrics_update_interval: int,
):
task_queues = {
visit_type: Queue(env, capacity=task_queue_capacity)
for visit_type in workers_per_type
}
status_queue = Queue(env)
if scheduler_type == "origin_scheduler":
if policy is None:
raise ValueError("origin_scheduler needs a scheduling policy")
env.process(
origin_scheduler.scheduler_runner_process(
env, task_queues, policy, min_batch_size=min_batch_size
)
)
env.process(
origin_scheduler.scheduler_journal_client_process(env, status_queue)
)
elif scheduler_type == "task_scheduler":
if policy is not None:
raise ValueError("task_scheduler doesn't support a scheduling policy")
env.process(
task_scheduler.scheduler_runner_process(
env, task_queues, min_batch_size=min_batch_size
)
)
env.process(task_scheduler.scheduler_listener_process(env, status_queue))
else:
raise ValueError(f"Unknown scheduler type to simulate: {scheduler_type}")
env.process(update_metrics_process(env, metrics_update_interval))
for visit_type, num_workers in workers_per_type.items():
task_queue = task_queues[visit_type]
for i in range(num_workers):
worker_name = f"worker-{visit_type}-{i}"
env.process(worker_process(env, worker_name, task_queue, status_queue))
lister = env.scheduler.get_or_create_lister(name="example")
assert lister.id
env.process(lister_process(env, lister.id))
[docs]
def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000):
"""Fills the database with mock data to test the simulator."""
stored_lister = scheduler.get_or_create_lister(name="example")
assert stored_lister.id is not None
# Generate 'num_origins' new origins
origins = [generate_listed_origin(stored_lister.id) for _ in range(num_origins)]
scheduler.record_listed_origins(origins)
scheduler.create_tasks(
[
task.evolve(
policy="recurring",
next_run=origin.last_update or utcnow(),
current_interval=timedelta(days=64),
)
for (origin, task) in zip(origins, create_origin_tasks(origins, scheduler))
]
)
[docs]
def run(
scheduler: SchedulerInterface,
scheduler_type: str,
policy: Optional[str],
runtime: Optional[int],
):
NUM_WORKERS = 48
start_time = datetime.now(tz=timezone.utc)
env = Environment(start_time=start_time)
env.scheduler = scheduler
env.report = SimulationReport()
setup(
env,
scheduler_type=scheduler_type,
policy=policy,
workers_per_type={"git": NUM_WORKERS},
task_queue_capacity=10000,
min_batch_size=1000,
metrics_update_interval=3600,
)
try:
env.run(until=runtime)
except KeyboardInterrupt:
pass
finally:
end_time = env.time
print("total simulated time:", end_time - start_time)
metrics = env.scheduler.update_metrics(timestamp=end_time)
env.report.record_metrics(end_time, metrics)
return env.report