Source code for swh.loader.pytest_plugin

# Copyright (C) 2019-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import os
from typing import Any, Dict

import pytest
import yaml

from swh.scheduler.model import ListedOrigin, Lister
from swh.scheduler.utils import create_origin_task


[docs] @pytest.fixture(autouse=True) def mock_sleep(mocker): return mocker.patch("time.sleep")
[docs] @pytest.fixture def swh_storage_backend_config(swh_storage_postgresql) -> Dict[str, Any]: return { "cls": "retry", "storage": { "cls": "filter", "storage": { "cls": "buffer", "storage": { "cls": "postgresql", "db": swh_storage_postgresql.info.dsn, "objstorage": {"cls": "memory"}, }, }, }, }
[docs] @pytest.fixture def swh_loader_config(swh_storage_backend_config) -> Dict[str, Any]: return { "storage": swh_storage_backend_config, }
[docs] @pytest.fixture def swh_config(swh_loader_config, monkeypatch, tmp_path) -> str: conffile = os.path.join(str(tmp_path), "loader.yml") with open(conffile, "w") as f: f.write(yaml.dump(swh_loader_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile) return conffile
[docs] @pytest.fixture(autouse=True, scope="session") def swh_proxy(): """Automatically inject this fixture in all tests to ensure no outside connection takes place. """ os.environ["http_proxy"] = "http://localhost:999" os.environ["https_proxy"] = "http://localhost:999"
[docs] @pytest.fixture def loading_task_creation_for_listed_origin_test( mocker, swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_config, mock_sleep, ): # unset mocking of time.sleep as celery task execution takes # too many time otherwise mocker.stop(mock_sleep) def test_implementation( loader_class_name: str, task_function_name: str, lister: Lister, listed_origin: ListedOrigin, ): mock_load = mocker.patch(f"{loader_class_name}.load") mock_load.return_value = {"status": "eventful"} task = create_origin_task(listed_origin, lister) res = swh_scheduler_celery_app.send_task( task_function_name, kwargs=task.arguments.kwargs, ) assert res res.wait() assert res.successful() assert mock_load.called assert res.result == {"status": "eventful"} return test_implementation