swh.scheduler.celery_backend.recurrent_visits module#

This schedules the recurrent visits, for listed origins, in Celery.

For “oneshot” (save code now, lister) tasks, check the swh.scheduler.celery_backend.runner and swh.scheduler.celery_backend.pika_listener modules.

swh.scheduler.celery_backend.recurrent_visits.DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = {'bzr': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'cvs': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'default': [{'policy': 'already_visited_order_by_lag', 'weight': 40}, {'policy': 'never_visited_oldest_update_first', 'weight': 40}, {'policy': 'origins_without_last_update', 'weight': 20}], 'git': [{'policy': 'already_visited_order_by_lag', 'tablesample': 0.1, 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'tablesample': 0.1, 'weight': 49}, {'policy': 'origins_without_last_update', 'tablesample': 0.1, 'weight': 2}], 'hg': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}], 'svn': [{'policy': 'already_visited_order_by_lag', 'weight': 49}, {'policy': 'never_visited_oldest_update_first', 'weight': 49}, {'policy': 'origins_without_last_update', 'weight': 2}]}#

Scheduling policies to use to retrieve visits for the given visit types, with their relative weights

swh.scheduler.celery_backend.recurrent_visits.MIN_SLOTS_RATIO = 0.05#

Quantity of slots that need to be available (with respect to max_queue_length) for grab_next_visits() to trigger

swh.scheduler.celery_backend.recurrent_visits.QUEUE_FULL_BACKOFF = 60#

Backoff time (in seconds) if there’s fewer than MIN_SLOTS_RATIO slots available in the queue.

swh.scheduler.celery_backend.recurrent_visits.DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF = 1200#

Backoff time (in seconds) if no origins have been scheduled in the current iteration

swh.scheduler.celery_backend.recurrent_visits.BACKOFF_SPLAY = 5.0#

Amplitude of the fuzziness between backoffs

swh.scheduler.celery_backend.recurrent_visits.TERMINATE = <object object>#

Termination request received from command queue (singleton used for identity comparison)

swh.scheduler.celery_backend.recurrent_visits.grab_next_visits_policy_weights(scheduler: SchedulerInterface, visit_type: str, num_visits: int, policy_cfg: List[Dict[str, Any]]) List[ListedOrigin][source]#

Get the next num_visits for the given visit_type using the corresponding set of scheduling policies.

The POLICY_CFG list sets, for the current visit type, the scheduling policies used to pull the next tasks. Each policy config entry in the list should at least have a ‘policy’ (policy name) and a ‘weight’ entry. For each policy in this policy_cfg list, the number of returned origins to visit will be weighted using this weight config option so that the total number of returned origins is around num_visits. Any other key/value entry in the policy configuration will be passed to the scheduler.grab_next_visit() method.

This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%.

Returns:

at most num_visits ListedOrigin objects

swh.scheduler.celery_backend.recurrent_visits.splay()[source]#

Return a random short interval by which to vary the backoffs for the visit scheduling threads

swh.scheduler.celery_backend.recurrent_visits.send_visits_for_visit_type(scheduler: SchedulerInterface, app, visit_type: str, task_type: TaskType, policy_cfg: List[Dict[str, Any]], no_origins_scheduled_backoff: int = 1200) float[source]#

Schedule the next batch of visits for the given visit_type.

First, we determine the number of available slots by introspecting the RabbitMQ queue.

If there’s fewer than MIN_SLOTS_RATIO slots available in the queue, we wait for QUEUE_FULL_BACKOFF seconds. This avoids running the expensive grab_next_visits() queries when there’s not many jobs to queue.

Once there’s more than MIN_SLOTS_RATIO slots available, we run grab_next_visits_policy_weights() to retrieve the next set of origin visits to schedule, and we send them to celery.

If the last scheduling attempt didn’t return any origins, we sleep by default for DEFAULT_NO_ORIGINS_SCHEDULED_BACKOFF seconds. This avoids running the expensive grab_next_visits() queries too often if there’s nothing left to schedule.

The POLICY_CFG argument is the policy configuration used to choose the next origins to visit. It is passed directly to the grab_next_visits_policy_weights() function.

Returns:

the earliest time.monotonic() value at which to run the next iteration of the loop.

swh.scheduler.celery_backend.recurrent_visits.visit_scheduler_thread(config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]])[source]#

Target function for the visit sending thread, which initializes local connections and handles exceptions by sending them back to the main thread.

swh.scheduler.celery_backend.recurrent_visits.VisitSchedulerThreads#

Dict storing the visit scheduler threads and their command queues

alias of Dict[str, Tuple[Thread, Queue]]

swh.scheduler.celery_backend.recurrent_visits.spawn_visit_scheduler_thread(threads: Dict[str, Tuple[Thread, Queue]], exc_queue: Queue[Tuple[str, BaseException]], config: Dict[str, Any], visit_type: str)[source]#

Spawn a new thread to schedule the visits of type visit_type.

swh.scheduler.celery_backend.recurrent_visits.terminate_visit_scheduler_threads(threads: Dict[str, Tuple[Thread, Queue]]) List[str][source]#

Terminate all visit scheduler threads