Source code for swh.scheduler.celery_backend.first_visits
# Copyright (C) 2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from swh.scheduler.interface import SchedulerInterface
logger = logging.getLogger(__name__)
[docs]
def schedule_first_visits(backend: SchedulerInterface):
"""Schedule first visits with high priority for origins registered by listers
having the first_visits_priority_queue attribute set.
"""
from functools import partial
from swh.core.api.classes import stream_results
from swh.core.utils import grouper
from swh.scheduler.utils import utcnow
from .utils import get_loader_task_type, send_to_celery
nb_first_visits = 0
for lister in backend.get_listers(with_first_visits_to_schedule=True):
visit_types = backend.get_visit_types_for_listed_origins(lister)
visit_type_to_queue = {}
for visit_type in visit_types:
task_type = get_loader_task_type(backend, visit_type)
if not task_type:
raise ValueError(f"Unknown task type for visit type {visit_type}.")
visit_type_to_queue[visit_type] = (
f"{lister.first_visits_queue_prefix}:{task_type.backend_name}"
)
nb_first_visits += send_to_celery(
backend,
visit_type_to_queue=visit_type_to_queue,
policy="first_visits_after_listing",
lister_name=lister.name,
lister_instance_name=lister.instance_name,
)
def all_first_visits_scheduled():
nb_listed_origins = 0
nb_scheduled_origins = 0
for listed_origins in grouper(
stream_results(
partial(backend.get_listed_origins, lister_id=lister.id)
),
n=1000,
):
listed_origins_list = list(listed_origins)
nb_listed_origins += len(listed_origins_list)
for origin_visit_stats in backend.origin_visit_stats_get(
ids=(
(origin.url, origin.visit_type)
for origin in listed_origins_list
)
):
nb_scheduled_origins += 1
if (
origin_visit_stats.last_scheduled
< lister.last_listing_finished_at
):
return False
return nb_scheduled_origins == nb_listed_origins
if all_first_visits_scheduled():
# mark that all first visits were scheduled to no longer consider that
# lister in future execution of that command
logger.info(
"All first visits of origins registered by lister with name '%s' "
"and instance '%s' were scheduled.'",
lister.name,
lister.instance_name,
)
lister.first_visits_scheduled_at = utcnow()
backend.update_lister(lister)
return nb_first_visits