swh.scheduler.backend module

swh.scheduler.backend.adapt_arrow(arrow)[source]
swh.scheduler.backend.format_query(query, keys)[source]

Format a query with the given keys

class swh.scheduler.backend.SchedulerBackend(db, min_pool_conns=1, max_pool_conns=10)[source]

Bases: object

Backend for the Software Heritage scheduling database.

get_db()[source]
put_db(db)[source]
task_type_keys = ['type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay']
create_task_type(task_type)[source]

Create a new task type ready for scheduling.

Parameters

task_type (dict) –

a dictionary with the following keys:

  • type (str): an identifier for the task type

  • description (str): a human-readable description of what the task does

  • backend_name (str): the name of the task in the job-scheduling backend

  • default_interval (datetime.timedelta): the default interval between two task runs

  • min_interval (datetime.timedelta): the minimum interval between two task runs

  • max_interval (datetime.timedelta): the maximum interval between two task runs

  • backoff_factor (float): the factor by which the interval changes at each run

  • max_queue_length (int): the maximum length of the task queue for this task type

get_task_type(task_type_name)[source]

Retrieve the task type with id task_type_name

get_task_types()[source]

Retrieve all registered task types

get_or_create_lister(name: str, instance_name: Optional[str] = None)swh.scheduler.model.Lister[source]

Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.

update_lister(lister: swh.scheduler.model.Lister)swh.scheduler.model.Lister[source]

Update the state for the given lister instance in the database.

Returns

a new Lister object, with all fields updated from the database

Raises

StaleData if the updated timestamp for the lister instance in

database doesn’t match the one passed by the user.

record_listed_origins(listed_origins: Iterable[swh.scheduler.model.ListedOrigin]) → List[swh.scheduler.model.ListedOrigin][source]

Record a set of origins that a lister has listed.

This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.

get_listed_origins(lister_id: Optional[uuid.UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[Tuple[uuid.UUID, str]] = None)swh.scheduler.model.PaginatedListedOriginList[source]

Get information on the listed origins matching either the url or lister_id, or both arguments.

task_create_keys = ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority']
task_keys = ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority', 'id', 'current_interval']
create_tasks(tasks, policy='recurring')[source]

Create new tasks.

Parameters

tasks (list) –

each task is a dictionary with the following keys:

  • type (str): the task type

  • arguments (dict): the arguments for the task runner, keys:

    • args (list of str): arguments

    • kwargs (dict str -> str): keyword arguments

  • next_run (datetime.datetime): the next scheduled run for the task

Returns

a list of created tasks.

set_status_tasks(task_ids, status='disabled', next_run=None)[source]

Set the tasks’ status whose ids are listed.

If given, also set the next_run date.

disable_tasks(task_ids)[source]

Disable the tasks whose ids are listed.

search_tasks(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None)[source]

Search tasks from selected criterions

get_tasks(task_ids)[source]

Retrieve the info of tasks whose ids are listed.

peek_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)[source]

Fetch the list of ready tasks

Parameters
  • task_type (str) – filtering task per their type

  • timestamp (datetime.datetime) – peek tasks that need to be executed before that timestamp

  • num_tasks (int) – only peek at num_tasks tasks (with no priority)

  • num_tasks_priority (int) – only peek at num_tasks_priority tasks (with priority)

Returns

a list of tasks

grab_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)[source]

Fetch the list of ready tasks, and mark them as scheduled

Parameters
  • task_type (str) – filtering task per their type

  • timestamp (datetime.datetime) – grab tasks that need to be executed before that timestamp

  • num_tasks (int) – only grab num_tasks tasks (with no priority)

  • num_tasks_priority (int) – only grab oneshot num_tasks tasks (with priorities)

Returns

a list of tasks

task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata']
schedule_task_run(task_id, backend_id, metadata=None, timestamp=None)[source]

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters
  • task_id (int) – the identifier for the task being scheduled

  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

a fresh task_run entry

mass_schedule_task_runs(task_runs)[source]

Schedule a bunch of task runs.

Parameters

task_runs (list) –

a list of dicts with keys:

  • task (int): the identifier for the task being scheduled

  • backend_id (str): the identifier of the job in the backend

  • metadata (dict): metadata to add to the task_run entry

  • scheduled (datetime.datetime): the instant the event occurred

Returns

None

start_task_run(backend_id, metadata=None, timestamp=None)[source]
Mark a given task as started, updating the corresponding task_run

entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

end_task_run(backend_id, status, metadata=None, timestamp=None, result=None)[source]

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None) → Dict[str, Any][source]

Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.

Returns

  • next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.

  • tasks: list of task dictionaries with the following keys:

    id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …

Return type

dict with the following keys

delete_archived_tasks(task_ids)[source]

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

task_run_keys = ['id', 'task', 'backend_id', 'scheduled', 'started', 'ended', 'metadata', 'status']
get_task_runs(task_ids, limit=None)[source]

Search task run for a task id

get_priority_ratios()[source]