swh.scheduler.api.client module#
- class swh.scheduler.api.client.RemoteScheduler(url: str, timeout: None | Tuple[float, float] | List[float] | float = None, chunk_size: int = 4096, max_retries: int = 3, pool_connections: int = 20, pool_maxsize: int = 100, adapter_kwargs: Dict[str, Any] | None = None, api_exception: Type[Exception] | None = None, reraise_exceptions: List[Type[Exception]] | None = None, enable_requests_retry: bool | None = None, **kwargs)[source]#
Bases:
RPCClient
Proxy to a remote scheduler API
- backend_class#
alias of
SchedulerInterface
- reraise_exceptions: List[Type[Exception]] = [<class 'swh.scheduler.exc.SchedulerException'>, <class 'swh.scheduler.exc.StaleData'>, <class 'swh.scheduler.exc.UnknownPolicy'>]#
On server errors, if any of the exception classes in this list has the same name as the error name, then the exception will be instantiated and raised instead of a generic RemoteException.
- extra_type_decoders: Dict[str, Callable] = {'last_visit_status': <enum 'LastVisitStatus'>, 'scheduler_model': <function <lambda>>}#
Value of extra_decoders passed to json_loads or msgpack_loads to be able to deserialize more object types.
- extra_type_encoders: List[Tuple[type, str, Callable]] = [(<class 'swh.scheduler.model.BaseSchedulerModel'>, 'scheduler_model', <function _encode_model_object>), (<enum 'LastVisitStatus'>, 'last_visit_status', <function _encode_enum>)]#
Value of extra_encoders passed to json_dumps or msgpack_dumps to be able to serialize more object types.
- enable_requests_retry: bool = True#
If set to
True
, requests sent by the client will be retried when encountering specific errors. Default policy is to retry when connection errors or transient remote exceptions are raised. Subclasses can change that policy by overriding theretry_policy()
method.
- create_task_type(task_type: TaskType) None #
Create a new task type in database ready for scheduling.
- Parameters:
task_type – a TaskType object
- create_tasks(tasks: List[Task], policy: Literal['recurring', 'oneshot'] = 'recurring') List[Task] #
Register new tasks in database.
- Parameters:
tasks –
each task is a Task object created with at least the following parameters:
type
arguments
next_run
policy – default task policy (either recurring or oneshot) to use if not set in input task objects
- Returns:
a list of created tasks with database ids filled.
- delete_archived_tasks(task_ids)#
Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.
- disable_tasks(task_ids: List[int]) None #
Disable the tasks whose ids are listed.
- Parameters:
task_ids – list of tasks’ identifiers
- end_task_run(backend_id: str, status: Literal['scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'], metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None #
Mark a given task as ended, updating the corresponding task_run entry in the database.
- Parameters:
backend_id – the identifier of the job in the backend
status – how the task ended
metadata – metadata to add to the task_run entry
timestamp – the instant the event occurred
- Returns:
a TaskRun object with updated fields
- filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: str | None = None) Dict[str, Any] #
Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.
- Returns:
next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.
tasks: list of task dictionaries with the following keys:
id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …
- Return type:
dict with the following keys
- get_listed_origins(lister_id: UUID | None = None, url: str | None = None, urls: List[str] | None = None, enabled: bool | None = True, limit: int = 1000, page_token: Tuple[str, str] | None = None) PaginatedListedOriginList #
Get information on listed origins, possibly filtered, in a paginated way.
- Parameters:
lister_id – if provided, return origins discovered with that lister
url – (deprecated, use
urls
parameter instead) if provided, return origins matching that URLurls – if provided, return origins matching these URLs
enabled – If
True
return only enabled origins, ifFalse
return only disabled origins, ifNone
return all origins.limit – maximum number of origins per page
page_token – to get the next page of origins, is returned in the
PaginatedListedOriginList
object
- Returns:
A page of listed origins
- get_lister(name: str, instance_name: str | None = None) Lister | None #
Retrieve information about the given instance of the lister from the database.
- get_listers(with_first_visits_to_schedule: bool = False) List[Lister] #
Retrieve information about all listers from the database.
- Parameters:
with_first_visits_to_schedule – if
True
only retrieve listers whose first visits with high priority of listed origins were not scheduled yet (those type of listers have the first_visits_queue_prefix attribute set).
- get_metrics(lister_id: UUID | None = None, visit_type: str | None = None) List[SchedulerMetrics] #
Retrieve the performance metrics of this scheduler instance.
- Parameters:
lister_id – filter the metrics for this lister instance only
visit_type – filter the metrics for this visit type only
- get_or_create_lister(name: str, instance_name: str | None = None, first_visits_queue_prefix: str | None = None) Lister #
Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.
- get_task_runs(task_ids: List[int], limit: int | None = None) List[TaskRun] #
Search task run for a task id
- get_task_type(task_type_name: str) TaskType | None #
Retrieve the registered task type with a given name
- Parameters:
task_type_name – name of the task type to retrieve
- Returns:
a TaskType object or
None
if no such task type exists
- get_task_types() List[TaskType] #
Retrieve all registered task types
- Returns:
a list of TaskType objects
- get_tasks(task_ids: List[int]) List[Task] #
Retrieve the info of tasks whose ids are listed.
- Parameters:
task_ids – list of tasks’ identifiers
- Returns:
a list of tasks
- get_visit_types_for_listed_origins(lister: Lister) List[str] #
Return list of distinct visit types for the origins listed by a given lister.
- grab_next_visits(visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: str | None = None, lister_name: str | None = None, lister_instance_name: str | None = None, timestamp: datetime | None = None, absolute_cooldown: timedelta | None = datetime.timedelta(seconds=43200), scheduled_cooldown: timedelta | None = datetime.timedelta(days=7), failed_cooldown: timedelta | None = datetime.timedelta(days=14), not_found_cooldown: timedelta | None = datetime.timedelta(days=31), tablesample: float | None = None) List[ListedOrigin] #
Get at most the count next origins that need to be visited with the visit_type loader according to the given scheduling policy.
This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin.
- Parameters:
visit_type – type of visits to schedule
count – number of visits to schedule
policy – the scheduling policy used to select which visits to schedule
enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.
lister_uuid – Determine the list of origins listed from the lister with uuid
lister_name – Determine the list of origins listed from the lister with name
lister_instance_name – Determine the list of origins listed from the lister with instance name
timestamp – the mocked timestamp at which we’re recording that the visits are being scheduled (defaults to the current time)
absolute_cooldown – the minimal interval between two visits of the same origin
scheduled_cooldown – the minimal interval before which we can schedule the same origin again if it’s not been visited
failed_cooldown – the minimal interval before which we can reschedule a failed origin
not_found_cooldown – the minimal interval before which we can reschedule a not_found origin
tablesample – the percentage of the table on which we run the query (None: no sampling)
- grab_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] #
Fetch and schedule the list of tasks (with any priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – grab tasks that need to be executed before that timestamp
num_tasks – only grab num_tasks tasks (with no priority)
- Returns:
the list of scheduled tasks
- grab_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] #
Fetch and schedule the list of tasks (with no priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – grab tasks that need to be executed before that timestamp
num_tasks – only grab num_tasks tasks (with no priority)
- Returns:
the list of scheduled tasks
- mass_schedule_task_runs(task_runs: List[TaskRun]) None #
Schedule a bunch of task runs.
- Parameters:
task_runs –
a list of TaskRun objects created at least with the following parameters:
task
backend_id
scheduled
- origin_visit_stats_get(ids: Iterable[Tuple[str, str]]) List[OriginVisitStats] #
Retrieve the visit statistics for an origin with a given visit type.
Warning
If some visit statistics are not found, they are filtered out of the result. So the output list may be shorter than the input list.
- Parameters:
ids – an iterable of (origin_url, visit_type) tuples
- Returns:
a list of origin visit statistics
- origin_visit_stats_upsert(origin_visit_stats: Iterable[OriginVisitStats]) None #
Create a new origin visit stats
- peek_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] #
Fetch list of tasks (with any priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – peek tasks that need to be executed before that timestamp
num_tasks – only peek at num_tasks tasks (with no priority)
- Returns:
the list of tasks which would be scheduled
- peek_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] #
Fetch the list of tasks (with no priority) to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – peek tasks that need to be executed before that timestamp
num_tasks – only peek at num_tasks tasks (with no priority)
- Returns:
the list of tasks which would be scheduled
- record_listed_origins(listed_origins: Iterable[ListedOrigin]) List[ListedOrigin] #
Record a set of origins that a lister has listed.
This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.
- schedule_task_run(task_id: int, backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun #
Mark a given task as scheduled, adding a task_run entry in the database.
- Parameters:
task_id – the identifier for the task being scheduled
backend_id – the identifier of the job in the backend
metadata – metadata to add to the task_run entry
timestamp – the instant the event occurred
- Returns:
a TaskRun object
- search_tasks(task_id: int | None = None, task_type: str | None = None, status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] | None = None, priority: Literal['high', 'normal', 'low'] | None = None, policy: Literal['recurring', 'oneshot'] | None = None, before: datetime | None = None, after: datetime | None = None, limit: int | None = None) List[Task] #
Search tasks from selected criterions
- Parameters:
task_id – search a task with given identifier
task_type – search tasks with given type
status – search tasks with given status
priority – search tasks with given priority
policy – search tasks with given policy
before – search tasks created before given date
after – search tasks created after given date
limit – maximum number of tasks to return
- Returns:
a list of found tasksa
- set_status_tasks(task_ids: List[int], status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] = 'disabled', next_run: datetime | None = None) None #
Set the tasks’ status whose ids are listed.
- Parameters:
task_ids – list of tasks’ identifiers
status – the status to set for the tasks
next_run – if provided, also set the next_run date
- start_task_run(backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None #
- Mark a given task as started, updating the corresponding task_run
entry in the database.
- Parameters:
backend_id – the identifier of the job in the backend
metadata – metadata to add to the task_run entry
timestamp – the instant the event occurred
- Returns:
a TaskRun object with updated fields
- update_lister(lister: Lister) Lister #
Update the state for the given lister instance in the database.
- Returns:
a new Lister object, with all fields updated from the database
- Raises:
StaleData if the updated timestamp for the lister instance in –
database doesn’t match the one passed by the user.
- update_metrics(lister_id: UUID | None = None, timestamp: datetime | None = None) List[SchedulerMetrics] #
Update the performance metrics of this scheduler instance.
Returns the updated metrics.
- Parameters:
lister_id – if passed, update the metrics only for this lister instance
timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()