swh.scheduler.api.client module#

class swh.scheduler.api.client.RemoteScheduler(url: str, timeout: None | Tuple[float, float] | List[float] | float = None, chunk_size: int = 4096, max_retries: int = 3, pool_connections: int = 20, pool_maxsize: int = 100, adapter_kwargs: Dict[str, Any] | None = None, api_exception: Type[Exception] | None = None, reraise_exceptions: List[Type[Exception]] | None = None, enable_requests_retry: bool | None = None, **kwargs)[source]#

Bases: RPCClient

Proxy to a remote scheduler API

backend_class#

alias of SchedulerInterface

reraise_exceptions: List[Type[Exception]] = [<class 'swh.scheduler.exc.SchedulerException'>, <class 'swh.scheduler.exc.StaleData'>, <class 'swh.scheduler.exc.UnknownPolicy'>]#

On server errors, if any of the exception classes in this list has the same name as the error name, then the exception will be instantiated and raised instead of a generic RemoteException.

extra_type_decoders: Dict[str, Callable] = {'last_visit_status': <enum 'LastVisitStatus'>, 'scheduler_model': <function <lambda>>}#

Value of extra_decoders passed to json_loads or msgpack_loads to be able to deserialize more object types.

extra_type_encoders: List[Tuple[type, str, Callable]] = [(<class 'swh.scheduler.model.BaseSchedulerModel'>, 'scheduler_model', <function _encode_model_object>), (<enum 'LastVisitStatus'>, 'last_visit_status', <function _encode_enum>)]#

Value of extra_encoders passed to json_dumps or msgpack_dumps to be able to serialize more object types.

enable_requests_retry: bool = True#

If set to True, requests sent by the client will be retried when encountering specific errors. Default policy is to retry when connection errors or transient remote exceptions are raised. Subclasses can change that policy by overriding the retry_policy() method.

create_task_type(task_type: TaskType) None#

Create a new task type in database ready for scheduling.

Parameters:

task_type – a TaskType object

create_tasks(tasks: List[Task], policy: Literal['recurring', 'oneshot'] = 'recurring') List[Task]#

Register new tasks in database.

Parameters:
  • tasks

    each task is a Task object created with at least the following parameters:

    • type

    • arguments

    • next_run

  • policy – default task policy (either recurring or oneshot) to use if not set in input task objects

Returns:

a list of created tasks with database ids filled.

delete_archived_tasks(task_ids)#

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

disable_tasks(task_ids: List[int]) None#

Disable the tasks whose ids are listed.

Parameters:

task_ids – list of tasks’ identifiers

end_task_run(backend_id: str, status: Literal['scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'], metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None#

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters:
  • backend_id – the identifier of the job in the backend

  • status – how the task ended

  • metadata – metadata to add to the task_run entry

  • timestamp – the instant the event occurred

Returns:

a TaskRun object with updated fields

filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: str | None = None) Dict[str, Any]#

Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.

Returns:

  • next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.

  • tasks: list of task dictionaries with the following keys:

    id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …

Return type:

dict with the following keys

get_listed_origins(lister_id: UUID | None = None, url: str | None = None, urls: List[str] | None = None, enabled: bool | None = True, limit: int = 1000, page_token: Tuple[str, str] | None = None) PaginatedListedOriginList#

Get information on listed origins, possibly filtered, in a paginated way.

Parameters:
  • lister_id – if provided, return origins discovered with that lister

  • url – (deprecated, use urls parameter instead) if provided, return origins matching that URL

  • urls – if provided, return origins matching these URLs

  • enabled – If True return only enabled origins, if False return only disabled origins, if None return all origins.

  • limit – maximum number of origins per page

  • page_token – to get the next page of origins, is returned in the PaginatedListedOriginList object

Returns:

A page of listed origins

get_lister(name: str, instance_name: str | None = None) Lister | None#

Retrieve information about the given instance of the lister from the database.

get_listers(with_first_visits_to_schedule: bool = False) List[Lister]#

Retrieve information about all listers from the database.

Parameters:

with_first_visits_to_schedule – if True only retrieve listers whose first visits with high priority of listed origins were not scheduled yet (those type of listers have the first_visits_queue_prefix attribute set).

get_listers_by_id(lister_ids: List[str]) List[Lister]#

Retrieve listers in batch, using their UUID

get_metrics(lister_id: UUID | None = None, visit_type: str | None = None) List[SchedulerMetrics]#

Retrieve the performance metrics of this scheduler instance.

Parameters:
  • lister_id – filter the metrics for this lister instance only

  • visit_type – filter the metrics for this visit type only

get_or_create_lister(name: str, instance_name: str | None = None, first_visits_queue_prefix: str | None = None) Lister#

Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.

get_task_runs(task_ids: List[int], limit: int | None = None) List[TaskRun]#

Search task run for a task id

get_task_type(task_type_name: str) TaskType | None#

Retrieve the registered task type with a given name

Parameters:

task_type_name – name of the task type to retrieve

Returns:

a TaskType object or None if no such task type exists

get_task_types() List[TaskType]#

Retrieve all registered task types

Returns:

a list of TaskType objects

get_tasks(task_ids: List[int]) List[Task]#

Retrieve the info of tasks whose ids are listed.

Parameters:

task_ids – list of tasks’ identifiers

Returns:

a list of tasks

get_visit_types_for_listed_origins(lister: Lister) List[str]#

Return list of distinct visit types for the origins listed by a given lister.

grab_next_visits(visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: str | None = None, lister_name: str | None = None, lister_instance_name: str | None = None, timestamp: datetime | None = None, absolute_cooldown: timedelta | None = datetime.timedelta(seconds=43200), scheduled_cooldown: timedelta | None = datetime.timedelta(days=7), failed_cooldown: timedelta | None = datetime.timedelta(days=14), not_found_cooldown: timedelta | None = datetime.timedelta(days=31), tablesample: float | None = None) List[ListedOrigin]#

Get at most the count next origins that need to be visited with the visit_type loader according to the given scheduling policy.

This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin.

Parameters:
  • visit_type – type of visits to schedule

  • count – number of visits to schedule

  • policy – the scheduling policy used to select which visits to schedule

  • enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.

  • lister_uuid – Determine the list of origins listed from the lister with uuid

  • lister_name – Determine the list of origins listed from the lister with name

  • lister_instance_name – Determine the list of origins listed from the lister with instance name

  • timestamp – the mocked timestamp at which we’re recording that the visits are being scheduled (defaults to the current time)

  • absolute_cooldown – the minimal interval between two visits of the same origin

  • scheduled_cooldown – the minimal interval before which we can schedule the same origin again if it’s not been visited

  • failed_cooldown – the minimal interval before which we can reschedule a failed origin

  • not_found_cooldown – the minimal interval before which we can reschedule a not_found origin

  • tablesample – the percentage of the table on which we run the query (None: no sampling)

grab_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch and schedule the list of tasks (with any priority) ready to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – grab tasks that need to be executed before that timestamp

  • num_tasks – only grab num_tasks tasks (with no priority)

Returns:

the list of scheduled tasks

grab_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch and schedule the list of tasks (with no priority) ready to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – grab tasks that need to be executed before that timestamp

  • num_tasks – only grab num_tasks tasks (with no priority)

Returns:

the list of scheduled tasks

mass_schedule_task_runs(task_runs: List[TaskRun]) None#

Schedule a bunch of task runs.

Parameters:

task_runs

a list of TaskRun objects created at least with the following parameters:

  • task

  • backend_id

  • scheduled

origin_visit_stats_get(ids: Iterable[Tuple[str, str]]) List[OriginVisitStats]#

Retrieve the visit statistics for an origin with a given visit type.

Warning

If some visit statistics are not found, they are filtered out of the result. So the output list may be shorter than the input list.

Parameters:

ids – an iterable of (origin_url, visit_type) tuples

Returns:

a list of origin visit statistics

origin_visit_stats_upsert(origin_visit_stats: Iterable[OriginVisitStats]) None#

Create a new origin visit stats

peek_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch list of tasks (with any priority) ready to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – peek tasks that need to be executed before that timestamp

  • num_tasks – only peek at num_tasks tasks (with no priority)

Returns:

the list of tasks which would be scheduled

peek_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch the list of tasks (with no priority) to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – peek tasks that need to be executed before that timestamp

  • num_tasks – only peek at num_tasks tasks (with no priority)

Returns:

the list of tasks which would be scheduled

record_listed_origins(listed_origins: Iterable[ListedOrigin]) List[ListedOrigin]#

Record a set of origins that a lister has listed.

This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.

schedule_task_run(task_id: int, backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun#

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters:
  • task_id – the identifier for the task being scheduled

  • backend_id – the identifier of the job in the backend

  • metadata – metadata to add to the task_run entry

  • timestamp – the instant the event occurred

Returns:

a TaskRun object

search_tasks(task_id: int | None = None, task_type: str | None = None, status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] | None = None, priority: Literal['high', 'normal', 'low'] | None = None, policy: Literal['recurring', 'oneshot'] | None = None, before: datetime | None = None, after: datetime | None = None, limit: int | None = None) List[Task]#

Search tasks from selected criterions

Parameters:
  • task_id – search a task with given identifier

  • task_type – search tasks with given type

  • status – search tasks with given status

  • priority – search tasks with given priority

  • policy – search tasks with given policy

  • before – search tasks created before given date

  • after – search tasks created after given date

  • limit – maximum number of tasks to return

Returns:

a list of found tasksa

set_status_tasks(task_ids: List[int], status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] = 'disabled', next_run: datetime | None = None) None#

Set the tasks’ status whose ids are listed.

Parameters:
  • task_ids – list of tasks’ identifiers

  • status – the status to set for the tasks

  • next_run – if provided, also set the next_run date

start_task_run(backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None#
Mark a given task as started, updating the corresponding task_run

entry in the database.

Parameters:
  • backend_id – the identifier of the job in the backend

  • metadata – metadata to add to the task_run entry

  • timestamp – the instant the event occurred

Returns:

a TaskRun object with updated fields

update_lister(lister: Lister) Lister#

Update the state for the given lister instance in the database.

Returns:

a new Lister object, with all fields updated from the database

Raises:

StaleData if the updated timestamp for the lister instance in

database doesn’t match the one passed by the user.

update_metrics(lister_id: UUID | None = None, timestamp: datetime | None = None) List[SchedulerMetrics]#

Update the performance metrics of this scheduler instance.

Returns the updated metrics.

Parameters:
  • lister_id – if passed, update the metrics only for this lister instance

  • timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()

visit_scheduler_queue_position_get() Dict[str, int]#

Retrieve all current queue positions for the recurrent visit scheduler.

Returns

Mapping of visit type to their current queue position

visit_scheduler_queue_position_set(visit_type: str, position: int) None#

Set the current queue position of the recurrent visit scheduler for visit_type.