swh.scheduler package

Submodules

swh.scheduler.backend module

swh.scheduler.backend.adapt_arrow(arrow)[source]
swh.scheduler.backend.format_query(query, keys)[source]

Format a query with the given keys

class swh.scheduler.backend.SchedulerBackend(db, min_pool_conns=1, max_pool_conns=10)[source]

Bases: object

Backend for the Software Heritage scheduling database.

__init__(db, min_pool_conns=1, max_pool_conns=10)[source]
Parameters:db_conn – either a libpq connection string, or a psycopg2 connection
get_db()[source]
put_db(db)[source]
task_type_keys = ['type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay']
create_task_type(task_type, db=None, cur=None)[source]

Create a new task type ready for scheduling.

Parameters:task_type (dict) –

a dictionary with the following keys:

  • type (str): an identifier for the task type
  • description (str): a human-readable description of what the task does
  • backend_name (str): the name of the task in the job-scheduling backend
  • default_interval (datetime.timedelta): the default interval between two task runs
  • min_interval (datetime.timedelta): the minimum interval between two task runs
  • max_interval (datetime.timedelta): the maximum interval between two task runs
  • backoff_factor (float): the factor by which the interval changes at each run
  • max_queue_length (int): the maximum length of the task queue for this task type
get_task_type(task_type_name, db=None, cur=None)[source]

Retrieve the task type with id task_type_name

get_task_types(db=None, cur=None)[source]

Retrieve all registered task types

task_create_keys = ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority']
task_keys = ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority', 'id', 'current_interval']
create_tasks(tasks, policy='recurring', db=None, cur=None)[source]

Create new tasks.

Parameters:tasks (list) –

each task is a dictionary with the following keys:

  • type (str): the task type
  • arguments (dict): the arguments for the task runner, keys:
    • args (list of str): arguments
    • kwargs (dict str -> str): keyword arguments
  • next_run (datetime.datetime): the next scheduled run for the task
Returns:a list of created tasks.
set_status_tasks(task_ids, status='disabled', next_run=None, db=None, cur=None)[source]

Set the tasks’ status whose ids are listed.

If given, also set the next_run date.

disable_tasks(task_ids, db=None, cur=None)[source]

Disable the tasks whose ids are listed.

search_tasks(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None)[source]

Search tasks from selected criterions

get_tasks(task_ids, db=None, cur=None)[source]

Retrieve the info of tasks whose ids are listed.

peek_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None)[source]

Fetch the list of ready tasks

Parameters:
  • task_type (str) – filtering task per their type
  • timestamp (datetime.datetime) – peek tasks that need to be executed before that timestamp
  • num_tasks (int) – only peek at num_tasks tasks (with no priority)
  • num_tasks_priority (int) – only peek at num_tasks_priority tasks (with priority)
Returns:

a list of tasks

grab_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None)[source]

Fetch the list of ready tasks, and mark them as scheduled

Parameters:
  • task_type (str) – filtering task per their type
  • timestamp (datetime.datetime) – grab tasks that need to be executed before that timestamp
  • num_tasks (int) – only grab num_tasks tasks (with no priority)
  • num_tasks_priority (int) – only grab oneshot num_tasks tasks (with priorities)
Returns:

a list of tasks

task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata']
schedule_task_run(task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None)[source]

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters:
  • task_id (int) – the identifier for the task being scheduled
  • backend_id (str) – the identifier of the job in the backend
  • metadata (dict) – metadata to add to the task_run entry
  • timestamp (datetime.datetime) – the instant the event occurred
Returns:

a fresh task_run entry

mass_schedule_task_runs(task_runs, db=None, cur=None)[source]

Schedule a bunch of task runs.

Parameters:task_runs (list) –

a list of dicts with keys:

  • task (int): the identifier for the task being scheduled
  • backend_id (str): the identifier of the job in the backend
  • metadata (dict): metadata to add to the task_run entry
  • scheduled (datetime.datetime): the instant the event occurred
Returns:None
start_task_run(backend_id, metadata=None, timestamp=None, db=None, cur=None)[source]
Mark a given task as started, updating the corresponding task_run
entry in the database.
Parameters:
  • backend_id (str) – the identifier of the job in the backend
  • metadata (dict) – metadata to add to the task_run entry
  • timestamp (datetime.datetime) – the instant the event occurred
Returns:

the updated task_run entry

end_task_run(backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None)[source]

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters:
  • backend_id (str) – the identifier of the job in the backend
  • status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’
  • metadata (dict) – metadata to add to the task_run entry
  • timestamp (datetime.datetime) – the instant the event occurred
Returns:

the updated task_run entry

filter_task_to_archive(after_ts, before_ts, limit=10, last_id=-1, db=None, cur=None)[source]

Returns the list of task/task_run prior to a given date to archive.

delete_archived_tasks(task_ids, db=None, cur=None)[source]

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

__dict__ = mappingproxy({'__module__': 'swh.scheduler.backend', '__doc__': 'Backend for the Software Heritage scheduling database.\n\n ', '__init__': <function SchedulerBackend.__init__>, 'get_db': <function SchedulerBackend.get_db>, 'put_db': <function SchedulerBackend.put_db>, 'task_type_keys': ['type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay'], 'create_task_type': <function SchedulerBackend.create_task_type>, 'get_task_type': <function SchedulerBackend.get_task_type>, 'get_task_types': <function SchedulerBackend.get_task_types>, 'task_create_keys': ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority'], 'task_keys': ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority', 'id', 'current_interval'], 'create_tasks': <function SchedulerBackend.create_tasks>, 'set_status_tasks': <function SchedulerBackend.set_status_tasks>, 'disable_tasks': <function SchedulerBackend.disable_tasks>, 'search_tasks': <function SchedulerBackend.search_tasks>, 'get_tasks': <function SchedulerBackend.get_tasks>, 'peek_ready_tasks': <function SchedulerBackend.peek_ready_tasks>, 'grab_ready_tasks': <function SchedulerBackend.grab_ready_tasks>, 'task_run_create_keys': ['task', 'backend_id', 'scheduled', 'metadata'], 'schedule_task_run': <function SchedulerBackend.schedule_task_run>, 'mass_schedule_task_runs': <function SchedulerBackend.mass_schedule_task_runs>, 'start_task_run': <function SchedulerBackend.start_task_run>, 'end_task_run': <function SchedulerBackend.end_task_run>, 'filter_task_to_archive': <function SchedulerBackend.filter_task_to_archive>, 'delete_archived_tasks': <function SchedulerBackend.delete_archived_tasks>, 'task_run_keys': ['id', 'task', 'backend_id', 'scheduled', 'started', 'ended', 'metadata', 'status'], 'get_task_runs': <function SchedulerBackend.get_task_runs>, '__dict__': <attribute '__dict__' of 'SchedulerBackend' objects>, '__weakref__': <attribute '__weakref__' of 'SchedulerBackend' objects>})
__module__ = 'swh.scheduler.backend'
__weakref__

list of weak references to the object (if defined)

task_run_keys = ['id', 'task', 'backend_id', 'scheduled', 'started', 'ended', 'metadata', 'status']
get_task_runs(task_ids, limit=None, db=None, cur=None)[source]

Search task run for a task id

swh.scheduler.backend_es module

Elastic Search backend

class swh.scheduler.backend_es.SWHElasticSearchClient(**config)[source]

Bases: object

__init__(**config)[source]

Initialize self. See help(type(self)) for accurate signature.

compute_index_name(year, month)[source]

Given a year, month, compute the index’s name.

index(data)[source]

Index given data to elasticsearch.

The field ‘ended’ in data is used to compute the index to index data to.

mget(index_name, doc_ids, chunk_size=500, source=True, log=None)[source]
Retrieve document’s full content according to their ids as per

source’s setup.

The source permits to retrieve only what’s of interest to us, e.g: - source=True ; gives back the original indexed data - source=False ; returns without the original _source field - source=[‘task_id’] ; returns only task_id in the _source field

Parameters:
  • index_name (str) – Name of the concerned index.
  • doc_ids (generator) – Generator of ids to retrieve
  • chunk_size (int) – Number of documents chunk to send for retrieval
  • source (bool/[str]) – Source of information to return
Yields:

document indexed as per source’s setup

_streaming_bulk(index_name, doc_stream, chunk_size=500, log=None)[source]
Bulk index data and returns the successful indexed data’s
identifier.
Parameters:
  • index_name (str) – Name of the concerned index.
  • doc_stream (generator) – Generator of documents to index
  • chunk_size (int) – Number of documents chunk to send for indexation
Yields:

document id indexed

streaming_bulk(index_name, doc_stream, chunk_size=500, source=True, log=None)[source]
Bulk index data and returns the successful indexed data as per

source’s setup.

the source permits to retrieve only what’s of interest to us, e.g:

  • source=True ; gives back the original indexed data
  • source=False ; returns without the original _source field
  • source=[‘task_id’] ; returns only task_id in the _source field
Parameters:
  • index_name (str) – Name of the concerned index.
  • doc_stream (generator) – Document generator to index
  • chunk_size (int) – Number of documents chunk to send
  • source (bool, [str]) – the information to return
__dict__ = mappingproxy({'__module__': 'swh.scheduler.backend_es', '__init__': <function SWHElasticSearchClient.__init__>, 'compute_index_name': <function SWHElasticSearchClient.compute_index_name>, 'index': <function SWHElasticSearchClient.index>, 'mget': <function SWHElasticSearchClient.mget>, '_streaming_bulk': <function SWHElasticSearchClient._streaming_bulk>, 'streaming_bulk': <function SWHElasticSearchClient.streaming_bulk>, '__dict__': <attribute '__dict__' of 'SWHElasticSearchClient' objects>, '__weakref__': <attribute '__weakref__' of 'SWHElasticSearchClient' objects>, '__doc__': None})
__module__ = 'swh.scheduler.backend_es'
__weakref__

list of weak references to the object (if defined)

swh.scheduler.cli_utils module

swh.scheduler.task module

class swh.scheduler.task.SWHTask[source]

Bases: celery.app.task.Task

a schedulable task (abstract class)

Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated

_statsd = None
_log = None
statsd
__call__(*args, **kwargs)[source]

Call self as a function.

on_failure(exc, task_id, args, kwargs, einfo)[source]

Error handler.

This is run by the worker when the task fails.

Parameters:
  • exc (Exception) – The exception raised by the task.
  • task_id (str) – Unique id of the failed task.
  • args (Tuple) – Original arguments for the task that failed.
  • kwargs (Dict) – Original keyword arguments for the task that failed.
  • einfo (ExceptionInfo) – Exception information.
Returns:

The return value of this handler is ignored.

Return type:

None

on_success(retval, task_id, args, kwargs)[source]

Success handler.

Run by the worker if the task executes successfully.

Parameters:
  • retval (Any) – The return value of the task.
  • task_id (str) – Unique id of the executed task.
  • args (Tuple) – Original arguments for the executed task.
  • kwargs (Dict) – Original keyword arguments for the executed task.
Returns:

The return value of this handler is ignored.

Return type:

None

log
run(*args, **kwargs)[source]

The body of the task executed by workers.

__module__ = 'swh.scheduler.task'
ignore_result = False
priority = None
rate_limit = None
reject_on_worker_lost = None
request_stack = <celery.utils.threads._LocalStack object>
serializer = 'msgpack'
store_errors_even_if_ignored = False
track_started = True
typing = True

swh.scheduler.utils module

swh.scheduler.utils.get_task(task_name)[source]

Retrieve task object in our application instance by its fully qualified python name.

Parameters:task_name (str) – task’s name (e.g swh.loader.git.tasks.LoadDiskGitRepository)
Returns:Instance of task
swh.scheduler.utils.create_task_dict(type, policy, *args, **kwargs)[source]
Create a task with type and policy, scheduled for as soon as
possible.
Parameters:
  • type (str) – Type of oneshot task as per swh-scheduler’s db table task_type’s column (Ex: load-git, check-deposit)
  • policy (str) – oneshot or recurring policy
Returns:

Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks)

swh.scheduler.utils.create_oneshot_task_dict(type, *args, **kwargs)[source]

Create a oneshot task scheduled for as soon as possible.

Parameters:type (str) – Type of oneshot task as per swh-scheduler’s db table task_type’s column (Ex: load-git, check-deposit)
Returns:Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks)

Module contents

swh.scheduler.compute_nb_tasks_from(num_tasks)[source]
Compute and returns the tuple, number of tasks without priority,
number of tasks with priority.
Parameters:num_tasks (int) –
Returns:tuple number of tasks without priority (int), number of tasks with priority (int)
swh.scheduler.get_scheduler(cls, args={})[source]

Get a scheduler object of class scheduler_class with arguments scheduler_args.

Parameters:
  • scheduler (dict) – dictionary with keys:
  • cls (str) – scheduler’s class, either ‘local’ or ‘remote’
  • args (dict) – dictionary with keys, default to empty.
Returns:

local: swh.scheduler.backend.SchedulerBackend remote: swh.scheduler.api.client.RemoteScheduler

Return type:

an instance of swh.scheduler, either local or remote

Raises:

ValueError if passed an unknown storage class.