swh.scheduler.celery_backend.recurrent_visits module#

This schedules the recurrent visits, for listed origins, in Celery.

For “oneshot” (save code now, lister) tasks, check the swh.scheduler.celery_backend.runner and swh.scheduler.celery_backend.pika_listener modules.

swh.scheduler.celery_backend.recurrent_visits.DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = {'bzr': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'cvs': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'default': [{'policy': 'already_visited_order_by_lag', 'weight': 40}, {'policy': 'never_visited_oldest_update_first', 'weight': 40}, {'policy': 'origins_without_last_update', 'weight': 20}], 'git': [{'policy': 'already_visited_order_by_lag', 'tablesample': 0.1, 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'tablesample': 0.1, 'weight': 49}, {'policy': 'origins_without_last_update', 'tablesample': 0.1, 'weight': 2}], 'hg': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'svn': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}]}#

Scheduling policies to use to retrieve visits for the given visit types, with their relative weights

swh.scheduler.celery_backend.recurrent_visits.MIN_SLOTS_RATIO = 0.05#

Quantity of slots that need to be available (with respect to max_queue_length) for grab_next_visits() to trigger

swh.scheduler.celery_backend.recurrent_visits.QUEUE_FULL_BACKOFF = 60#

Backoff time (in seconds) if there’s fewer than MIN_SLOTS_RATIO slots available in the queue.

swh.scheduler.celery_backend.recurrent_visits.DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 1200#

Backoff time (in seconds) if no origins have been scheduled in the current iteration

swh.scheduler.celery_backend.recurrent_visits.BACKOFF_SPLAY = 5.0#

Amplitude of the fuzziness between backoffs

swh.scheduler.celery_backend.recurrent_visits.TERMINATE = <object object>#

Termination request received from command queue (singleton used for identity comparison)

swh.scheduler.celery_backend.recurrent_visits.grab_next_visits_policy_weights(scheduler: SchedulerInterface, visit_type: str, num_visits: int, policy_cfg: List[Dict[str, Any]]) List[ListedOrigin][source]#

Get the next num_visits for the given visit_type using the corresponding set of scheduling policies.

The POLICY_CFG list sets, for the current visit type, the scheduling policies used to pull the next tasks. Each policy config entry in the list should at least have a ‘policy’ (policy name) and a ‘weight’ entry. For each policy in this policy_cfg list, the number of returned origins to visit will be weighted using this weight config option so that the total number of returned origins is around num_visits. Any other key/value entry in the policy configuration will be passed to the scheduler.grab_next_visit() method.

This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%.


at most num_visits ListedOrigin objects


Return a random short interval by which to vary the backoffs for the visit scheduling threads

swh.scheduler.celery_backend.recurrent_visits.send_visits_for_visit_type(scheduler: SchedulerInterface, app, visit_type: str, task_type: TaskType, policy_cfg: List[Dict[str, Any]], no_origins_scheduled_backoff: int = 1200) float[source]#

Schedule the next batch of visits for the given visit_type.

First, we determine the number of available slots by introspecting the RabbitMQ queue.

If there’s fewer than MIN_SLOTS_RATIO slots available in the queue, we wait for QUEUE_FULL_BACKOFF seconds. This avoids running the expensive grab_next_visits() queries when there’s not many jobs to queue.

Once there’s more than MIN_SLOTS_RATIO slots available, we run grab_next_visits_policy_weights() to retrieve the next set of origin visits to schedule, and we send them to celery.

If the last scheduling attempt didn’t return any origins, we sleep by default for DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF seconds. This avoids running the expensive grab_next_visits() queries too often if there’s nothing left to schedule.

The POLICY_CFG argument is the policy configuration used to choose the next origins to visit. It is passed directly to the grab_next_visits_policy_weights() function.


the earliest time.monotonic() value at which to run the next iteration of the loop.

swh.scheduler.celery_backend.recurrent_visits.visit_scheduler_thread(config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]])[source]#

Target function for the visit sending thread, which initializes local connections and handles exceptions by sending them back to the main thread.


Dict storing the visit scheduler threads and their command queues

alias of Dict[str, Tuple[Thread, Queue]]

swh.scheduler.celery_backend.recurrent_visits.spawn_visit_scheduler_thread(threads: Dict[str, Tuple[Thread, Queue]], exc_queue: Queue[Tuple[str, BaseException]], config: Dict[str, Any], visit_type: str)[source]#

Spawn a new thread to schedule the visits of type visit_type.

swh.scheduler.celery_backend.recurrent_visits.terminate_visit_scheduler_threads(threads: Dict[str, Tuple[Thread, Queue]]) List[str][source]#

Terminate all visit scheduler threads