swh.scheduler.api.client module#
- class swh.scheduler.api.client.RemoteScheduler(url, api_exception=None, timeout=None, chunk_size=4096, reraise_exceptions=None, **kwargs)[source]#
Bases:
RPCClient
Proxy to a remote scheduler API
- backend_class#
alias of
SchedulerInterface
- reraise_exceptions: ClassVar[List[Type[Exception]]] = [<class 'swh.scheduler.exc.SchedulerException'>, <class 'swh.scheduler.exc.StaleData'>, <class 'swh.scheduler.exc.UnknownPolicy'>]#
On server errors, if any of the exception classes in this list has the same name as the error name, then the exception will be instantiated and raised instead of a generic RemoteException.
- extra_type_decoders: Dict[str, Callable] = {'last_visit_status': <enum 'LastVisitStatus'>, 'scheduler_model': <function <lambda>>}#
Value of extra_decoders passed to json_loads or msgpack_loads to be able to deserialize more object types.
- extra_type_encoders: List[Tuple[type, str, Callable]] = [(<class 'swh.scheduler.model.BaseSchedulerModel'>, 'scheduler_model', <function _encode_model_object>), (<enum 'LastVisitStatus'>, 'last_visit_status', <function _encode_enum>)]#
Value of extra_encoders passed to json_dumps or msgpack_dumps to be able to serialize more object types.
- create_task_type(task_type)#
Create a new task type ready for scheduling.
- Parameters:
task_type (dict) –
a dictionary with the following keys:
type (str): an identifier for the task type
description (str): a human-readable description of what the task does
backend_name (str): the name of the task in the job-scheduling backend
default_interval (datetime.timedelta): the default interval between two task runs
min_interval (datetime.timedelta): the minimum interval between two task runs
max_interval (datetime.timedelta): the maximum interval between two task runs
backoff_factor (float): the factor by which the interval changes at each run
max_queue_length (int): the maximum length of the task queue for this task type
- create_tasks(tasks, policy='recurring')#
Create new tasks.
- Parameters:
tasks (list) –
each task is a dictionary with the following keys:
type (str): the task type
arguments (dict): the arguments for the task runner, keys:
args (list of str): arguments
kwargs (dict str -> str): keyword arguments
next_run (datetime.datetime): the next scheduled run for the task
- Returns:
a list of created tasks.
- delete_archived_tasks(task_ids)#
Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.
- disable_tasks(task_ids)#
Disable the tasks whose ids are listed.
- end_task_run(backend_id, status, metadata=None, timestamp=None, result=None)#
Mark a given task as ended, updating the corresponding task_run entry in the database.
- Parameters:
backend_id (str) – the identifier of the job in the backend
status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’
metadata (dict) – metadata to add to the task_run entry
timestamp (datetime.datetime) – the instant the event occurred
- Returns:
the updated task_run entry
- filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None) Dict[str, Any] #
Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.
- Returns:
next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.
tasks: list of task dictionaries with the following keys:
id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …
- Return type:
dict with the following keys
- get_listed_origins(lister_id: Optional[UUID] = None, url: Optional[str] = None, enabled: Optional[bool] = True, limit: int = 1000, page_token: Optional[Tuple[str, str]] = None) PaginatedListedOriginList #
Get information on listed origins, possibly filtered, in a paginated way.
- Parameters:
lister_id – if provided, return origins discovered with that lister
url – if provided, return origins matching that URL
enabled – If
True
return only enabled origins, ifFalse
return only disabled origins, ifNone
return all origins.limit – maximum number of origins per page
page_token – to get the next page of origins, is returned in the
PaginatedListedOriginList
object
- Returns:
A page of listed origins
- get_lister(name: str, instance_name: Optional[str] = None) Optional[Lister] #
Retrieve information about the given instance of the lister from the database.
- get_metrics(lister_id: Optional[UUID] = None, visit_type: Optional[str] = None) List[SchedulerMetrics] #
Retrieve the performance metrics of this scheduler instance.
- Parameters:
lister_id – filter the metrics for this lister instance only
visit_type – filter the metrics for this visit type only
- get_or_create_lister(name: str, instance_name: Optional[str] = None) Lister #
Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.
- get_task_runs(task_ids, limit=None)#
Search task run for a task id
- get_task_type(task_type_name)#
Retrieve the task type with id task_type_name
- get_task_types()#
Retrieve all registered task types
- get_tasks(task_ids)#
Retrieve the info of tasks whose ids are listed.
- grab_next_visits(visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, timestamp: Optional[datetime] = None, absolute_cooldown: Optional[timedelta] = datetime.timedelta(seconds=43200), scheduled_cooldown: Optional[timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None) List[ListedOrigin] #
Get at most the count next origins that need to be visited with the visit_type loader according to the given scheduling policy.
This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin.
- Parameters:
visit_type – type of visits to schedule
count – number of visits to schedule
policy – the scheduling policy used to select which visits to schedule
enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.
lister_uuid – Determine the list of origins listed from the lister with uuid
lister_name – Determine the list of origins listed from the lister with name
lister_instance_name – Determine the list of origins listed from the lister with instance name
timestamp – the mocked timestamp at which we’re recording that the visits are being scheduled (defaults to the current time)
absolute_cooldown – the minimal interval between two visits of the same origin
scheduled_cooldown – the minimal interval before which we can schedule the same origin again if it’s not been visited
failed_cooldown – the minimal interval before which we can reschedule a failed origin
not_found_cooldown – the minimal interval before which we can reschedule a not_found origin
tablesample – the percentage of the table on which we run the query (None: no sampling)
- grab_ready_priority_tasks(task_type: str, timestamp: Optional[datetime] = None, num_tasks: Optional[int] = None) List[Dict] #
Fetch and schedule the list of tasks (with any priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – grab tasks that need to be executed before that timestamp
num_tasks – only grab num_tasks tasks (with no priority)
- Returns:
a list of tasks
- grab_ready_tasks(task_type: str, timestamp: Optional[datetime] = None, num_tasks: Optional[int] = None) List[Dict] #
Fetch and schedule the list of tasks (with no priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – grab tasks that need to be executed before that timestamp
num_tasks – only grab num_tasks tasks (with no priority)
- Returns:
the list of scheduled tasks
- mass_schedule_task_runs(task_runs)#
Schedule a bunch of task runs.
- Parameters:
task_runs (list) –
a list of dicts with keys:
task (int): the identifier for the task being scheduled
backend_id (str): the identifier of the job in the backend
metadata (dict): metadata to add to the task_run entry
scheduled (datetime.datetime): the instant the event occurred
- Returns:
None
- origin_visit_stats_get(ids: Iterable[Tuple[str, str]]) List[OriginVisitStats] #
Retrieve the stats for an origin with a given visit type
If some visit_stats are not found, they are filtered out of the result. So the output list may be of length inferior to the length of the input list.
- origin_visit_stats_upsert(origin_visit_stats: Iterable[OriginVisitStats]) None #
Create a new origin visit stats
- peek_ready_priority_tasks(task_type: str, timestamp: Optional[datetime] = None, num_tasks: Optional[int] = None) List[Dict] #
Fetch list of tasks (with any priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – peek tasks that need to be executed before that timestamp
num_tasks – only peek at num_tasks tasks (with no priority)
- Returns:
a list of tasks
- peek_ready_tasks(task_type: str, timestamp: Optional[datetime] = None, num_tasks: Optional[int] = None) List[Dict] #
Fetch the list of tasks (with no priority) to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – peek tasks that need to be executed before that timestamp
num_tasks – only peek at num_tasks tasks (with no priority)
- Returns:
the list of tasks which would be scheduled
- record_listed_origins(listed_origins: Iterable[ListedOrigin]) List[ListedOrigin] #
Record a set of origins that a lister has listed.
This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.
- schedule_task_run(task_id, backend_id, metadata=None, timestamp=None)#
Mark a given task as scheduled, adding a task_run entry in the database.
- Parameters:
task_id (int) – the identifier for the task being scheduled
backend_id (str) – the identifier of the job in the backend
metadata (dict) – metadata to add to the task_run entry
timestamp (datetime.datetime) – the instant the event occurred
- Returns:
a fresh task_run entry
- search_tasks(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None)#
Search tasks from selected criterions
- set_status_tasks(task_ids: List[int], status: str = 'disabled', next_run: Optional[datetime] = None)#
Set the tasks’ status whose ids are listed.
If given, also set the next_run date.
- start_task_run(backend_id, metadata=None, timestamp=None)#
- Mark a given task as started, updating the corresponding task_run
entry in the database.
- Parameters:
backend_id (str) – the identifier of the job in the backend
metadata (dict) – metadata to add to the task_run entry
timestamp (datetime.datetime) – the instant the event occurred
- Returns:
the updated task_run entry
- update_lister(lister: Lister) Lister #
Update the state for the given lister instance in the database.
- Returns:
a new Lister object, with all fields updated from the database
- Raises:
StaleData if the updated timestamp for the lister instance in –
database doesn’t match the one passed by the user.
- update_metrics(lister_id: Optional[UUID] = None, timestamp: Optional[datetime] = None) List[SchedulerMetrics] #
Update the performance metrics of this scheduler instance.
Returns the updated metrics.
- Parameters:
lister_id – if passed, update the metrics only for this lister instance
timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()