swh.scheduler.api.client module

class swh.scheduler.api.client.RemoteScheduler(url, api_exception=None, timeout=None, chunk_size=4096, reraise_exceptions=None, **kwargs)[source]

Bases: swh.core.api.RPCClient

Proxy to a remote scheduler API

backend_class

alias of swh.scheduler.interface.SchedulerInterface

reraise_exceptions: ClassVar[List[Type[Exception]]] = [<class 'swh.scheduler.exc.SchedulerException'>, <class 'swh.scheduler.exc.StaleData'>]
extra_type_decoders: Dict[str, Callable] = {'scheduler_model': <function <lambda>>}
extra_type_encoders: List[Tuple[type, str, Callable]] = [(<class 'swh.scheduler.model.BaseSchedulerModel'>, 'scheduler_model', <function _encode_model_object>)]
create_task_type(task_type)

Create a new task type ready for scheduling.

Parameters

task_type (dict) –

a dictionary with the following keys:

  • type (str): an identifier for the task type

  • description (str): a human-readable description of what the task does

  • backend_name (str): the name of the task in the job-scheduling backend

  • default_interval (datetime.timedelta): the default interval between two task runs

  • min_interval (datetime.timedelta): the minimum interval between two task runs

  • max_interval (datetime.timedelta): the maximum interval between two task runs

  • backoff_factor (float): the factor by which the interval changes at each run

  • max_queue_length (int): the maximum length of the task queue for this task type

create_tasks(tasks, policy='recurring')

Create new tasks.

Parameters

tasks (list) –

each task is a dictionary with the following keys:

  • type (str): the task type

  • arguments (dict): the arguments for the task runner, keys:

    • args (list of str): arguments

    • kwargs (dict str -> str): keyword arguments

  • next_run (datetime.datetime): the next scheduled run for the task

Returns

a list of created tasks.

delete_archived_tasks(task_ids)

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

disable_tasks(task_ids)

Disable the tasks whose ids are listed.

end_task_run(backend_id, status, metadata=None, timestamp=None, result=None)

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None) → Dict[str, Any]

Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.

Returns

  • next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.

  • tasks: list of task dictionaries with the following keys:

    id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …

Return type

dict with the following keys

get_listed_origins(lister_id: Optional[uuid.UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[Tuple[uuid.UUID, str]] = None)swh.scheduler.model.PaginatedListedOriginList

Get information on the listed origins matching either the url or lister_id, or both arguments.

Use the limit and page_token arguments for continuation. The next page token, if any, is returned in the PaginatedListedOriginList object.

get_or_create_lister(name: str, instance_name: Optional[str] = None)swh.scheduler.model.Lister

Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.

get_priority_ratios()
get_task_runs(task_ids, limit=None)

Search task run for a task id

get_task_type(task_type_name)

Retrieve the task type with id task_type_name

get_task_types()

Retrieve all registered task types

get_tasks(task_ids)

Retrieve the info of tasks whose ids are listed.

grab_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)

Fetch the list of ready tasks, and mark them as scheduled

Parameters
  • task_type (str) – filtering task per their type

  • timestamp (datetime.datetime) – grab tasks that need to be executed before that timestamp

  • num_tasks (int) – only grab num_tasks tasks (with no priority)

  • num_tasks_priority (int) – only grab oneshot num_tasks tasks (with priorities)

Returns

a list of tasks

mass_schedule_task_runs(task_runs)

Schedule a bunch of task runs.

Parameters

task_runs (list) –

a list of dicts with keys:

  • task (int): the identifier for the task being scheduled

  • backend_id (str): the identifier of the job in the backend

  • metadata (dict): metadata to add to the task_run entry

  • scheduled (datetime.datetime): the instant the event occurred

Returns

None

peek_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)

Fetch the list of ready tasks

Parameters
  • task_type (str) – filtering task per their type

  • timestamp (datetime.datetime) – peek tasks that need to be executed before that timestamp

  • num_tasks (int) – only peek at num_tasks tasks (with no priority)

  • num_tasks_priority (int) – only peek at num_tasks_priority tasks (with priority)

Returns

a list of tasks

record_listed_origins(listed_origins: Iterable[swh.scheduler.model.ListedOrigin]) → List[swh.scheduler.model.ListedOrigin]

Record a set of origins that a lister has listed.

This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.

schedule_task_run(task_id, backend_id, metadata=None, timestamp=None)

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters
  • task_id (int) – the identifier for the task being scheduled

  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

a fresh task_run entry

search_tasks(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None)

Search tasks from selected criterions

set_status_tasks(task_ids, status='disabled', next_run=None)

Set the tasks’ status whose ids are listed.

If given, also set the next_run date.

start_task_run(backend_id, metadata=None, timestamp=None)
Mark a given task as started, updating the corresponding task_run

entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

update_lister(lister: swh.scheduler.model.Lister)swh.scheduler.model.Lister

Update the state for the given lister instance in the database.

Returns

a new Lister object, with all fields updated from the database

Raises

StaleData if the updated timestamp for the lister instance in

database doesn’t match the one passed by the user.