Source code for swh.scheduler.pytest_plugin
# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from functools import partial
from celery.contrib.testing import worker
from celery.contrib.testing.app import TestApp, setup_default_app
import pkg_resources
import pytest
from pytest_postgresql import factories
from swh.core.db.db_utils import initialize_database_for_module
from swh.scheduler import get_scheduler
from swh.scheduler.backend import SchedulerBackend
from swh.scheduler.model import TaskType
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ["ping", "multiping", "add", "error", "echo"]
scheduler_postgresql_proc = factories.postgresql_proc(
load=[
partial(
initialize_database_for_module,
modname="scheduler",
version=SchedulerBackend.current_version,
)
],
)
postgresql_scheduler = factories.postgresql("scheduler_postgresql_proc")
[docs]
@pytest.fixture
def swh_scheduler_class():
return "postgresql"
[docs]
@pytest.fixture
def swh_scheduler_config(postgresql_scheduler):
return {
"db": postgresql_scheduler.info.dsn,
}
[docs]
@pytest.fixture
def swh_scheduler(swh_scheduler_class, swh_scheduler_config):
scheduler = get_scheduler(swh_scheduler_class, **swh_scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type(
TaskType(
type=f"swh-test-{taskname}",
description=f"The {taskname} testing task",
backend_name=f"swh.scheduler.tests.tasks.{taskname}",
default_interval=timedelta(days=1),
min_interval=timedelta(hours=6),
max_interval=timedelta(days=12),
)
)
return scheduler
# this alias is used to be able to easily instantiate a db-backed Scheduler
# eg. for the RPC client/server test suite.
swh_db_scheduler = swh_scheduler
[docs]
@pytest.fixture(scope="session")
def swh_scheduler_celery_app():
"""Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
test_app = TestApp(
set_as_current=True,
enable_logging=True,
task_cls="swh.scheduler.task:SWHTask",
config={
"accept_content": ["application/x-msgpack", "application/json"],
"broker_url": "memory://guest@localhost//",
"task_serializer": "msgpack",
"result_serializer": "json",
},
)
with setup_default_app(test_app, use_trap=False):
from swh.scheduler.celery_backend import config
config.app = test_app
test_app.set_default()
test_app.set_current()
yield test_app
[docs]
@pytest.fixture(scope="session")
def swh_scheduler_celery_includes():
"""List of task modules that should be loaded by the swh_scheduler_celery_worker on
startup."""
task_modules = ["swh.scheduler.tests.tasks"]
for entrypoint in pkg_resources.iter_entry_points("swh.workers"):
task_modules.extend(entrypoint.load()().get("task_modules", []))
return task_modules
[docs]
@pytest.fixture(scope="session")
def swh_scheduler_celery_worker(
swh_scheduler_celery_app,
swh_scheduler_celery_includes,
):
"""Spawn a worker"""
for module in swh_scheduler_celery_includes:
swh_scheduler_celery_app.loader.import_task_module(module)
with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w:
yield w