swh.scheduler.celery_backend.config module#

swh.scheduler.celery_backend.config.setup_log_handler(loglevel=None, logfile=None, format=None, colorize=None, log_console=None, log_journal=None, **kwargs)[source]#

Setup logging according to Software Heritage preferences.

If the environment variable SWH_LOG_CONFIG is provided, this uses the targeted logging configuration file to configure logging. Otherwise, as before, this uses the default enclosed coded configuration.

swh.scheduler.celery_backend.config.setup_queues_and_tasks(sender, instance, **kwargs)[source]#

Signal called on worker start.

This automatically registers swh.scheduler.task.Task subclasses as available celery tasks.

This also subscribes the worker to the “implicit” per-task queues defined for these task classes.

swh.scheduler.celery_backend.config.on_worker_init(*args, **kwargs)[source]#
swh.scheduler.celery_backend.config.monotonic(state)[source]#

Get the current value for the monotonic clock

swh.scheduler.celery_backend.config.route_for_task(name, args, kwargs, options, task=None, **kw)[source]#

Route tasks according to the task_queue attribute in the task class

swh.scheduler.celery_backend.config.get_queue_stats(app, queue_name)[source]#

Get the statistics regarding a queue on the broker.

Parameters:

queue_name – name of the queue to check

Returns a dictionary raw from the RabbitMQ management API; or None if the current configuration does not use RabbitMQ.

Interesting keys:
  • Consumers (number of consumers for the queue)

  • messages (number of messages in queue)

  • messages_unacknowledged (number of messages currently being processed)

Documentation: https://www.rabbitmq.com/management.html#http-api

swh.scheduler.celery_backend.config.get_queue_length(app, queue_name)[source]#

Shortcut to get a queue’s length

swh.scheduler.celery_backend.config.get_available_slots(app, queue_name: str, max_length: int | None)[source]#

Get the number of tasks that can be sent to queue_name, when the queue is limited to max_length.

Returns:

The number of available slots in the queue. That result should be positive.

swh.scheduler.celery_backend.config.register_task_class(app, name, cls)[source]#

Register a class-based task under the given name

swh.scheduler.celery_backend.config.build_app(config=None)[source]#
swh.scheduler.celery_backend.config.celery_task_prerun(task_id, task, *args, **kwargs)[source]#