swh.scheduler.celery_backend.recurrent_visits module#
This schedules the recurrent visits, for listed origins, in Celery.
For “oneshot” (save code now, lister) tasks, check the
swh.scheduler.celery_backend.runner
and
swh.scheduler.celery_backend.pika_listener
modules.
- swh.scheduler.celery_backend.recurrent_visits.DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = {'bzr': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'cvs': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'default': [{'policy': 'already_visited_order_by_lag', 'weight': 40}, {'policy': 'never_visited_oldest_update_first', 'weight': 40}, {'policy': 'origins_without_last_update', 'weight': 20}], 'git': [{'policy': 'already_visited_order_by_lag', 'tablesample': 0.1, 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'tablesample': 0.1, 'weight': 49}, {'policy': 'origins_without_last_update', 'tablesample': 0.1, 'weight': 2}], 'hg': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'svn': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}]}#
Scheduling policies to use to retrieve visits for the given visit types, with their relative weights
- swh.scheduler.celery_backend.recurrent_visits.MIN_SLOTS_RATIO = 0.05#
Quantity of slots that need to be available (with respect to max_queue_length) for
grab_next_visits()
to trigger
- swh.scheduler.celery_backend.recurrent_visits.QUEUE_FULL_BACKOFF = 60#
Backoff time (in seconds) if there’s fewer than
MIN_SLOTS_RATIO
slots available in the queue.
- swh.scheduler.celery_backend.recurrent_visits.DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 1200#
Backoff time (in seconds) if no origins have been scheduled in the current iteration
- swh.scheduler.celery_backend.recurrent_visits.BACKOFF_SPLAY = 5.0#
Amplitude of the fuzziness between backoffs
- swh.scheduler.celery_backend.recurrent_visits.TERMINATE = <object object>#
Termination request received from command queue (singleton used for identity comparison)
- swh.scheduler.celery_backend.recurrent_visits.grab_next_visits_policy_weights(scheduler: SchedulerInterface, visit_type: str, num_visits: int, policy_cfg: List[Dict[str, Any]]) List[ListedOrigin] [source]#
Get the next
num_visits
for the givenvisit_type
using the corresponding set of scheduling policies.The
POLICY_CFG
list sets, for the current visit type, the scheduling policies used to pull the next tasks. Each policy config entry in the list should at least have a ‘policy’ (policy name) and a ‘weight’ entry. For each policy in this policy_cfg list, the number of returned origins to visit will be weighted using this weight config option so that the total number of returned origins is around num_visits. Any other key/value entry in the policy configuration will be passed to the scheduler.grab_next_visit() method.This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%.
- Returns:
at most
num_visits
ListedOrigin
objects
- swh.scheduler.celery_backend.recurrent_visits.splay()[source]#
Return a random short interval by which to vary the backoffs for the visit scheduling threads
- swh.scheduler.celery_backend.recurrent_visits.send_visits_for_visit_type(scheduler: SchedulerInterface, app, visit_type: str, task_type: TaskType, policy_cfg: List[Dict[str, Any]], no_origins_scheduled_backoff: int = 1200) float [source]#
Schedule the next batch of visits for the given
visit_type
.First, we determine the number of available slots by introspecting the RabbitMQ queue.
If there’s fewer than
MIN_SLOTS_RATIO
slots available in the queue, we wait forQUEUE_FULL_BACKOFF
seconds. This avoids running the expensivegrab_next_visits()
queries when there’s not many jobs to queue.Once there’s more than
MIN_SLOTS_RATIO
slots available, we rungrab_next_visits_policy_weights()
to retrieve the next set of origin visits to schedule, and we send them to celery.If the last scheduling attempt didn’t return any origins, we sleep by default for
DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF
seconds. This avoids running the expensivegrab_next_visits()
queries too often if there’s nothing left to schedule.The
POLICY_CFG
argument is the policy configuration used to choose the next origins to visit. It is passed directly to thegrab_next_visits_policy_weights()
function.- Returns:
the earliest
time.monotonic()
value at which to run the next iteration of the loop.
- swh.scheduler.celery_backend.recurrent_visits.visit_scheduler_thread(config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]])[source]#
Target function for the visit sending thread, which initializes local connections and handles exceptions by sending them back to the main thread.
- swh.scheduler.celery_backend.recurrent_visits.VisitSchedulerThreads#
Dict storing the visit scheduler threads and their command queues