swh.scheduler.api.client module

class swh.scheduler.api.client.RemoteScheduler(url, api_exception=None, timeout=None, chunk_size=4096, reraise_exceptions=None, **kwargs)[source]

Bases: swh.core.api.RPCClient

Proxy to a remote scheduler API

backend_class

alias of swh.scheduler.interface.SchedulerInterface

reraise_exceptions: ClassVar[List[Type[Exception]]] = [<class 'swh.scheduler.exc.SchedulerException'>, <class 'swh.scheduler.exc.StaleData'>, <class 'swh.scheduler.exc.UnknownPolicy'>]
extra_type_decoders: Dict[str, Callable] = {'scheduler_model': <function <lambda>>}
extra_type_encoders: List[Tuple[type, str, Callable]] = [(<class 'swh.scheduler.model.BaseSchedulerModel'>, 'scheduler_model', <function _encode_model_object>)]
create_task_type(task_type)

Create a new task type ready for scheduling.

Parameters

task_type (dict) –

a dictionary with the following keys:

  • type (str): an identifier for the task type

  • description (str): a human-readable description of what the task does

  • backend_name (str): the name of the task in the job-scheduling backend

  • default_interval (datetime.timedelta): the default interval between two task runs

  • min_interval (datetime.timedelta): the minimum interval between two task runs

  • max_interval (datetime.timedelta): the maximum interval between two task runs

  • backoff_factor (float): the factor by which the interval changes at each run

  • max_queue_length (int): the maximum length of the task queue for this task type

create_tasks(tasks, policy='recurring')

Create new tasks.

Parameters

tasks (list) –

each task is a dictionary with the following keys:

  • type (str): the task type

  • arguments (dict): the arguments for the task runner, keys:

    • args (list of str): arguments

    • kwargs (dict str -> str): keyword arguments

  • next_run (datetime.datetime): the next scheduled run for the task

Returns

a list of created tasks.

delete_archived_tasks(task_ids)

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

disable_tasks(task_ids)

Disable the tasks whose ids are listed.

end_task_run(backend_id, status, metadata=None, timestamp=None, result=None)

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None) → Dict[str, Any]

Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.

Returns

  • next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.

  • tasks: list of task dictionaries with the following keys:

    id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …

Return type

dict with the following keys

get_listed_origins(lister_id: Optional[uuid.UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[Tuple[str, str]] = None)swh.scheduler.interface.PaginatedListedOriginList

Get information on the listed origins matching either the url or lister_id, or both arguments.

Use the limit and page_token arguments for continuation. The next page token, if any, is returned in the PaginatedListedOriginList object.

get_lister(name: str, instance_name: Optional[str] = None) → Optional[swh.scheduler.model.Lister]

Retrieve information about the given instance of the lister from the database.

get_metrics(lister_id: Optional[uuid.UUID] = None, visit_type: Optional[str] = None) → List[swh.scheduler.model.SchedulerMetrics]

Retrieve the performance metrics of this scheduler instance.

Parameters
  • lister_id – filter the metrics for this lister instance only

  • visit_type – filter the metrics for this visit type only

get_or_create_lister(name: str, instance_name: Optional[str] = None)swh.scheduler.model.Lister

Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.

get_priority_ratios()
get_task_runs(task_ids, limit=None)

Search task run for a task id

get_task_type(task_type_name)

Retrieve the task type with id task_type_name

get_task_types()

Retrieve all registered task types

get_tasks(task_ids)

Retrieve the info of tasks whose ids are listed.

grab_next_visits(visit_type: str, count: int, policy: str) → List[swh.scheduler.model.ListedOrigin]

Get at most the count next origins that need to be visited with the visit_type loader according to the given scheduling policy.

This will mark the origins as “being visited” in the listed_origins table, to avoid scheduling multiple visits to the same origin.

grab_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)

Fetch the list of ready tasks, and mark them as scheduled

Parameters
  • task_type (str) – filtering task per their type

  • timestamp (datetime.datetime) – grab tasks that need to be executed before that timestamp

  • num_tasks (int) – only grab num_tasks tasks (with no priority)

  • num_tasks_priority (int) – only grab oneshot num_tasks tasks (with priorities)

Returns

a list of tasks

mass_schedule_task_runs(task_runs)

Schedule a bunch of task runs.

Parameters

task_runs (list) –

a list of dicts with keys:

  • task (int): the identifier for the task being scheduled

  • backend_id (str): the identifier of the job in the backend

  • metadata (dict): metadata to add to the task_run entry

  • scheduled (datetime.datetime): the instant the event occurred

Returns

None

origin_visit_stats_get(ids: Iterable[Tuple[str, str]]) → List[swh.scheduler.model.OriginVisitStats]

Retrieve the stats for an origin with a given visit type

If some visit_stats are not found, they are filtered out of the result. So the output list may be of length inferior to the length of the input list.

origin_visit_stats_upsert(origin_visit_stats: Iterable[swh.scheduler.model.OriginVisitStats]) → None

Create a new origin visit stats

peek_ready_tasks(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)

Fetch the list of ready tasks

Parameters
  • task_type (str) – filtering task per their type

  • timestamp (datetime.datetime) – peek tasks that need to be executed before that timestamp

  • num_tasks (int) – only peek at num_tasks tasks (with no priority)

  • num_tasks_priority (int) – only peek at num_tasks_priority tasks (with priority)

Returns

a list of tasks

record_listed_origins(listed_origins: Iterable[swh.scheduler.model.ListedOrigin]) → List[swh.scheduler.model.ListedOrigin]

Record a set of origins that a lister has listed.

This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.

schedule_task_run(task_id, backend_id, metadata=None, timestamp=None)

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters
  • task_id (int) – the identifier for the task being scheduled

  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

a fresh task_run entry

search_tasks(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None)

Search tasks from selected criterions

set_status_tasks(task_ids, status='disabled', next_run=None)

Set the tasks’ status whose ids are listed.

If given, also set the next_run date.

start_task_run(backend_id, metadata=None, timestamp=None)
Mark a given task as started, updating the corresponding task_run

entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

update_lister(lister: swh.scheduler.model.Lister)swh.scheduler.model.Lister

Update the state for the given lister instance in the database.

Returns

a new Lister object, with all fields updated from the database

Raises

StaleData if the updated timestamp for the lister instance in

database doesn’t match the one passed by the user.

update_metrics(lister_id: Optional[uuid.UUID] = None, timestamp: Optional[datetime.datetime] = None) → List[swh.scheduler.model.SchedulerMetrics]

Update the performance metrics of this scheduler instance.

Returns the updated metrics.

Parameters
  • lister_id – if passed, update the metrics only for this lister instance

  • timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()