swh.scheduler.api.client module#

class swh.scheduler.api.client.RemoteScheduler(url: str, timeout: None | Tuple[float, float] | List[float] | float = None, chunk_size: int = 4096, max_retries: int = 3, pool_connections: int = 20, pool_maxsize: int = 100, adapter_kwargs: Dict[str, Any] | None = None, api_exception: Type[Exception] | None = None, reraise_exceptions: List[Type[Exception]] | None = None, **kwargs)[source]#

Bases: RPCClient

Proxy to a remote scheduler API

backend_class#

alias of SchedulerInterface

reraise_exceptions: List[Type[Exception]] = [<class 'swh.scheduler.exc.SchedulerException'>, <class 'swh.scheduler.exc.StaleData'>, <class 'swh.scheduler.exc.UnknownPolicy'>]#

On server errors, if any of the exception classes in this list has the same name as the error name, then the exception will be instantiated and raised instead of a generic RemoteException.

extra_type_decoders: Dict[str, Callable] = {'last_visit_status': <enum 'LastVisitStatus'>, 'scheduler_model': <function <lambda>>}#

Value of extra_decoders passed to json_loads or msgpack_loads to be able to deserialize more object types.

extra_type_encoders: List[Tuple[type, str, Callable]] = [(<class 'swh.scheduler.model.BaseSchedulerModel'>, 'scheduler_model', <function _encode_model_object>), (<enum 'LastVisitStatus'>, 'last_visit_status', <function _encode_enum>)]#

Value of extra_encoders passed to json_dumps or msgpack_dumps to be able to serialize more object types.

create_task_type(task_type: TaskType) None#

Create a new task type in database ready for scheduling.

Parameters:

task_type – a TaskType object

create_tasks(tasks: List[Task], policy: Literal['recurring', 'oneshot'] = 'recurring') List[Task]#

Register new tasks in database.

Parameters:
  • tasks

    each task is a Task object created with at least the following parameters:

    • type

    • arguments

    • next_run

  • policy – default task policy (either recurring or oneshot) to use if not set in input task objects

Returns:

a list of created tasks with database ids filled.

delete_archived_tasks(task_ids)#

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

disable_tasks(task_ids: List[int]) None#

Disable the tasks whose ids are listed.

Parameters:

task_ids – list of tasks’ identifiers

end_task_run(backend_id: str, status: Literal['scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'], metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None#

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters:
  • backend_id – the identifier of the job in the backend

  • status – how the task ended

  • metadata – metadata to add to the task_run entry

  • timestamp – the instant the event occurred

Returns:

a TaskRun object with updated fields

filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: str | None = None) Dict[str, Any]#

Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.

Returns:

  • next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.

  • tasks: list of task dictionaries with the following keys:

    id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …

Return type:

dict with the following keys

get_listed_origins(lister_id: UUID | None = None, url: str | None = None, urls: List[str] | None = None, enabled: bool | None = True, limit: int = 1000, page_token: Tuple[str, str] | None = None) PaginatedListedOriginList#

Get information on listed origins, possibly filtered, in a paginated way.

Parameters:
  • lister_id – if provided, return origins discovered with that lister

  • url – (deprecated, use urls parameter instead) if provided, return origins matching that URL

  • urls – if provided, return origins matching these URLs

  • enabled – If True return only enabled origins, if False return only disabled origins, if None return all origins.

  • limit – maximum number of origins per page

  • page_token – to get the next page of origins, is returned in the PaginatedListedOriginList object

Returns:

A page of listed origins

get_lister(name: str, instance_name: str | None = None) Lister | None#

Retrieve information about the given instance of the lister from the database.

get_listers() List[Lister]#

Retrieve information about all listers from the database.

get_listers_by_id(lister_ids: List[str]) List[Lister]#

Retrieve listers in batch, using their UUID

get_metrics(lister_id: UUID | None = None, visit_type: str | None = None) List[SchedulerMetrics]#

Retrieve the performance metrics of this scheduler instance.

Parameters:
  • lister_id – filter the metrics for this lister instance only

  • visit_type – filter the metrics for this visit type only

get_or_create_lister(name: str, instance_name: str | None = None) Lister#

Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.

get_task_runs(task_ids: List[int], limit: int | None = None) List[TaskRun]#

Search task run for a task id

get_task_type(task_type_name: str) TaskType | None#

Retrieve the registered task type with a given name

Parameters:

task_type_name – name of the task type to retrieve

Returns:

a TaskType object or None if no such task type exists

get_task_types() List[TaskType]#

Retrieve all registered task types

Returns:

a list of TaskType objects

get_tasks(task_ids: List[int]) List[Task]#

Retrieve the info of tasks whose ids are listed.

Parameters:

task_ids – list of tasks’ identifiers

Returns:

a list of tasks

grab_next_visits(visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: str | None = None, lister_name: str | None = None, lister_instance_name: str | None = None, timestamp: datetime | None = None, absolute_cooldown: timedelta | None = datetime.timedelta(seconds=43200), scheduled_cooldown: timedelta | None = datetime.timedelta(days=7), failed_cooldown: timedelta | None = datetime.timedelta(days=14), not_found_cooldown: timedelta | None = datetime.timedelta(days=31), tablesample: float | None = None) List[ListedOrigin]#

Get at most the count next origins that need to be visited with the visit_type loader according to the given scheduling policy.

This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin.

Parameters:
  • visit_type – type of visits to schedule

  • count – number of visits to schedule

  • policy – the scheduling policy used to select which visits to schedule

  • enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.

  • lister_uuid – Determine the list of origins listed from the lister with uuid

  • lister_name – Determine the list of origins listed from the lister with name

  • lister_instance_name – Determine the list of origins listed from the lister with instance name

  • timestamp – the mocked timestamp at which we’re recording that the visits are being scheduled (defaults to the current time)

  • absolute_cooldown – the minimal interval between two visits of the same origin

  • scheduled_cooldown – the minimal interval before which we can schedule the same origin again if it’s not been visited

  • failed_cooldown – the minimal interval before which we can reschedule a failed origin

  • not_found_cooldown – the minimal interval before which we can reschedule a not_found origin

  • tablesample – the percentage of the table on which we run the query (None: no sampling)

grab_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch and schedule the list of tasks (with any priority) ready to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – grab tasks that need to be executed before that timestamp

  • num_tasks – only grab num_tasks tasks (with no priority)

Returns:

the list of scheduled tasks

grab_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch and schedule the list of tasks (with no priority) ready to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – grab tasks that need to be executed before that timestamp

  • num_tasks – only grab num_tasks tasks (with no priority)

Returns:

the list of scheduled tasks

mass_schedule_task_runs(task_runs: List[TaskRun]) None#

Schedule a bunch of task runs.

Parameters:

task_runs

a list of TaskRun objects created at least with the following parameters:

  • task

  • backend_id

  • scheduled

origin_visit_stats_get(ids: Iterable[Tuple[str, str]]) List[OriginVisitStats]#

Retrieve the visit statistics for an origin with a given visit type.

Warning

If some visit statistics are not found, they are filtered out of the result. So the output list may be shorter than the input list.

Parameters:

ids – an iterable of (origin_url, visit_type) tuples

Returns:

a list of origin visit statistics

origin_visit_stats_upsert(origin_visit_stats: Iterable[OriginVisitStats]) None#

Create a new origin visit stats

peek_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch list of tasks (with any priority) ready to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – peek tasks that need to be executed before that timestamp

  • num_tasks – only peek at num_tasks tasks (with no priority)

Returns:

the list of tasks which would be scheduled

peek_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task]#

Fetch the list of tasks (with no priority) to be scheduled.

Parameters:
  • task_type – filtering task per their type

  • timestamp – peek tasks that need to be executed before that timestamp

  • num_tasks – only peek at num_tasks tasks (with no priority)

Returns:

the list of tasks which would be scheduled

record_listed_origins(listed_origins: Iterable[ListedOrigin]) List[ListedOrigin]#

Record a set of origins that a lister has listed.

This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.

schedule_task_run(task_id: int, backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun#

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters:
  • task_id – the identifier for the task being scheduled

  • backend_id – the identifier of the job in the backend

  • metadata – metadata to add to the task_run entry

  • timestamp – the instant the event occurred

Returns:

a TaskRun object

search_tasks(task_id: int | None = None, task_type: str | None = None, status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] | None = None, priority: Literal['high', 'normal', 'low'] | None = None, policy: Literal['recurring', 'oneshot'] | None = None, before: datetime | None = None, after: datetime | None = None, limit: int | None = None) List[Task]#

Search tasks from selected criterions

Parameters:
  • task_id – search a task with given identifier

  • task_type – search tasks with given type

  • status – search tasks with given status

  • priority – search tasks with given priority

  • policy – search tasks with given policy

  • before – search tasks created before given date

  • after – search tasks created after given date

  • limit – maximum number of tasks to return

Returns:

a list of found tasksa

set_status_tasks(task_ids: List[int], status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] = 'disabled', next_run: datetime | None = None) None#

Set the tasks’ status whose ids are listed.

Parameters:
  • task_ids – list of tasks’ identifiers

  • status – the status to set for the tasks

  • next_run – if provided, also set the next_run date

start_task_run(backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None#
Mark a given task as started, updating the corresponding task_run

entry in the database.

Parameters:
  • backend_id – the identifier of the job in the backend

  • metadata – metadata to add to the task_run entry

  • timestamp – the instant the event occurred

Returns:

a TaskRun object with updated fields

update_lister(lister: Lister) Lister#

Update the state for the given lister instance in the database.

Returns:

a new Lister object, with all fields updated from the database

Raises:

StaleData if the updated timestamp for the lister instance in

database doesn’t match the one passed by the user.

update_metrics(lister_id: UUID | None = None, timestamp: datetime | None = None) List[SchedulerMetrics]#

Update the performance metrics of this scheduler instance.

Returns the updated metrics.

Parameters:
  • lister_id – if passed, update the metrics only for this lister instance

  • timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()

visit_scheduler_queue_position_get() Dict[str, int]#

Retrieve all current queue positions for the recurrent visit scheduler.

Returns

Mapping of visit type to their current queue position

visit_scheduler_queue_position_set(visit_type: str, position: int) None#

Set the current queue position of the recurrent visit scheduler for visit_type.