swh.scheduler.interface module

class swh.scheduler.interface.PaginatedListedOriginList(results: List[swh.scheduler.model.ListedOrigin], next_page_token: Union[None, Tuple[str, str], List[str]])[source]

Bases: swh.core.api.classes.PagedResult[swh.scheduler.model.ListedOrigin, Tuple[str, str]]

A list of listed origins, with a continuation token

results: List[swh.core.api.classes.TResult]
class swh.scheduler.interface.SchedulerInterface(*args, **kwds)[source]

Bases: typing_extensions.Protocol

create_task_type(task_type)[source]

Create a new task type ready for scheduling.

Parameters

task_type (dict) –

a dictionary with the following keys:

  • type (str): an identifier for the task type

  • description (str): a human-readable description of what the task does

  • backend_name (str): the name of the task in the job-scheduling backend

  • default_interval (datetime.timedelta): the default interval between two task runs

  • min_interval (datetime.timedelta): the minimum interval between two task runs

  • max_interval (datetime.timedelta): the maximum interval between two task runs

  • backoff_factor (float): the factor by which the interval changes at each run

  • max_queue_length (int): the maximum length of the task queue for this task type

get_task_type(task_type_name)[source]

Retrieve the task type with id task_type_name

get_task_types()[source]

Retrieve all registered task types

create_tasks(tasks, policy='recurring')[source]

Create new tasks.

Parameters

tasks (list) –

each task is a dictionary with the following keys:

  • type (str): the task type

  • arguments (dict): the arguments for the task runner, keys:

    • args (list of str): arguments

    • kwargs (dict str -> str): keyword arguments

  • next_run (datetime.datetime): the next scheduled run for the task

Returns

a list of created tasks.

set_status_tasks(task_ids: List[int], status: str = 'disabled', next_run: Optional[datetime.datetime] = None)[source]

Set the tasks’ status whose ids are listed.

If given, also set the next_run date.

disable_tasks(task_ids)[source]

Disable the tasks whose ids are listed.

search_tasks(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None)[source]

Search tasks from selected criterions

get_tasks(task_ids)[source]

Retrieve the info of tasks whose ids are listed.

peek_ready_tasks(task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None) List[Dict][source]

Fetch the list of tasks (with no priority) to be scheduled.

Parameters
  • task_type – filtering task per their type

  • timestamp – peek tasks that need to be executed before that timestamp

  • num_tasks – only peek at num_tasks tasks (with no priority)

Returns

the list of tasks which would be scheduled

grab_ready_tasks(task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None) List[Dict][source]

Fetch and schedule the list of tasks (with no priority) ready to be scheduled.

Parameters
  • task_type – filtering task per their type

  • timestamp – grab tasks that need to be executed before that timestamp

  • num_tasks – only grab num_tasks tasks (with no priority)

Returns

the list of scheduled tasks

peek_ready_priority_tasks(task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None) List[Dict][source]

Fetch list of tasks (with any priority) ready to be scheduled.

Parameters
  • task_type – filtering task per their type

  • timestamp – peek tasks that need to be executed before that timestamp

  • num_tasks – only peek at num_tasks tasks (with no priority)

Returns

a list of tasks

grab_ready_priority_tasks(task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None) List[Dict][source]

Fetch and schedule the list of tasks (with any priority) ready to be scheduled.

Parameters
  • task_type – filtering task per their type

  • timestamp – grab tasks that need to be executed before that timestamp

  • num_tasks – only grab num_tasks tasks (with no priority)

Returns

a list of tasks

schedule_task_run(task_id, backend_id, metadata=None, timestamp=None)[source]

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters
  • task_id (int) – the identifier for the task being scheduled

  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

a fresh task_run entry

mass_schedule_task_runs(task_runs)[source]

Schedule a bunch of task runs.

Parameters

task_runs (list) –

a list of dicts with keys:

  • task (int): the identifier for the task being scheduled

  • backend_id (str): the identifier of the job in the backend

  • metadata (dict): metadata to add to the task_run entry

  • scheduled (datetime.datetime): the instant the event occurred

Returns

None

start_task_run(backend_id, metadata=None, timestamp=None)[source]
Mark a given task as started, updating the corresponding task_run

entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

end_task_run(backend_id, status, metadata=None, timestamp=None, result=None)[source]

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters
  • backend_id (str) – the identifier of the job in the backend

  • status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns

the updated task_run entry

filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None) Dict[str, Any][source]

Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.

Returns

  • next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.

  • tasks: list of task dictionaries with the following keys:

    id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …

Return type

dict with the following keys

delete_archived_tasks(task_ids)[source]

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

get_task_runs(task_ids, limit=None)[source]

Search task run for a task id

get_listers() List[swh.scheduler.model.Lister][source]

Retrieve information about all listers from the database.

get_lister(name: str, instance_name: Optional[str] = None) Optional[swh.scheduler.model.Lister][source]

Retrieve information about the given instance of the lister from the database.

get_or_create_lister(name: str, instance_name: Optional[str] = None) swh.scheduler.model.Lister[source]

Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.

update_lister(lister: swh.scheduler.model.Lister) swh.scheduler.model.Lister[source]

Update the state for the given lister instance in the database.

Returns

a new Lister object, with all fields updated from the database

Raises

StaleData if the updated timestamp for the lister instance in

database doesn’t match the one passed by the user.

record_listed_origins(listed_origins: Iterable[swh.scheduler.model.ListedOrigin]) List[swh.scheduler.model.ListedOrigin][source]

Record a set of origins that a lister has listed.

This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.

get_listed_origins(lister_id: Optional[uuid.UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[Tuple[str, str]] = None) swh.scheduler.interface.PaginatedListedOriginList[source]

Get information on the listed origins matching either the url or lister_id, or both arguments.

Use the limit and page_token arguments for continuation. The next page token, if any, is returned in the PaginatedListedOriginList object.

grab_next_visits(visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None) List[swh.scheduler.model.ListedOrigin][source]

Get at most the count next origins that need to be visited with the visit_type loader according to the given scheduling policy.

This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin.

Parameters
  • visit_type – type of visits to schedule

  • count – number of visits to schedule

  • policy – the scheduling policy used to select which visits to schedule

  • enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.

  • lister_uuid – Determine the list of origins listed from the lister with uuid

  • timestamp – the mocked timestamp at which we’re recording that the visits are being scheduled (defaults to the current time)

  • scheduled_cooldown – the minimal interval before which we can schedule the same origin again

  • failed_cooldown – the minimal interval before which we can reschedule a failed origin

  • not_found_cooldown – the minimal interval before which we can reschedule a not_found origin

  • tablesample – the percentage of the table on which we run the query (None: no sampling)

origin_visit_stats_upsert(origin_visit_stats: Iterable[swh.scheduler.model.OriginVisitStats]) None[source]

Create a new origin visit stats

origin_visit_stats_get(ids: Iterable[Tuple[str, str]]) List[swh.scheduler.model.OriginVisitStats][source]

Retrieve the stats for an origin with a given visit type

If some visit_stats are not found, they are filtered out of the result. So the output list may be of length inferior to the length of the input list.

visit_scheduler_queue_position_get() Dict[str, datetime.datetime][source]

Retrieve all current queue positions for the recurrent visit scheduler.

Returns

Mapping of visit type to their current queue position

visit_scheduler_queue_position_set(visit_type: str, position: datetime.datetime) None[source]

Set the current queue position of the recurrent visit scheduler for visit_type.

update_metrics(lister_id: Optional[uuid.UUID] = None, timestamp: Optional[datetime.datetime] = None) List[swh.scheduler.model.SchedulerMetrics][source]

Update the performance metrics of this scheduler instance.

Returns the updated metrics.

Parameters
  • lister_id – if passed, update the metrics only for this lister instance

  • timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()

get_metrics(lister_id: Optional[uuid.UUID] = None, visit_type: Optional[str] = None) List[swh.scheduler.model.SchedulerMetrics][source]

Retrieve the performance metrics of this scheduler instance.

Parameters
  • lister_id – filter the metrics for this lister instance only

  • visit_type – filter the metrics for this visit type only