Source code for swh.scheduler.interface

# Copyright (C) 2015-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


import datetime
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from uuid import UUID

from typing_extensions import Protocol, runtime_checkable

from swh.core.api import remote_api_endpoint
from swh.core.api.classes import PagedResult
from swh.scheduler.model import (
    ListedOrigin,
    Lister,
    OriginVisitStats,
    SchedulerMetrics,
    Task,
    TaskPolicy,
    TaskPriority,
    TaskRun,
    TaskRunStatus,
    TaskStatus,
    TaskType,
)

ListedOriginPageToken = Tuple[str, str]


[docs] class PaginatedListedOriginList(PagedResult[ListedOrigin, ListedOriginPageToken]): """A list of listed origins, with a continuation token""" def __init__( self, results: List[ListedOrigin], next_page_token: Union[None, ListedOriginPageToken, List[str]], ): parsed_next_page_token: Optional[Tuple[str, str]] = None if next_page_token is not None: if len(next_page_token) != 2: raise TypeError("Expected Tuple[str, str] or list of size 2.") parsed_next_page_token = tuple(next_page_token) # type: ignore super().__init__(results, parsed_next_page_token)
[docs] @runtime_checkable class SchedulerInterface(Protocol):
[docs] @remote_api_endpoint("task_type/create") def create_task_type(self, task_type: TaskType) -> None: """Create a new task type in database ready for scheduling. Args: task_type: a TaskType object """ ...
[docs] @remote_api_endpoint("task_type/get") def get_task_type(self, task_type_name: str) -> Optional[TaskType]: """Retrieve the registered task type with a given name Args: task_type_name: name of the task type to retrieve Returns: a TaskType object or :const:`None` if no such task type exists """ ...
[docs] @remote_api_endpoint("task_type/get_all") def get_task_types(self) -> List[TaskType]: """Retrieve all registered task types Returns: a list of TaskType objects """ ...
[docs] @remote_api_endpoint("task/create") def create_tasks( self, tasks: List[Task], policy: TaskPolicy = "recurring" ) -> List[Task]: """Register new tasks in database. Args: tasks: each task is a Task object created with at least the following parameters: - type - arguments - next_run policy: default task policy (either recurring or oneshot) to use if not set in input task objects Returns: a list of created tasks with database ids filled. """ ...
[docs] @remote_api_endpoint("task/set_status") def set_status_tasks( self, task_ids: List[int], status: TaskStatus = "disabled", next_run: Optional[datetime.datetime] = None, ) -> None: """Set the tasks' status whose ids are listed. Args: task_ids: list of tasks' identifiers status: the status to set for the tasks next_run: if provided, also set the next_run date """ ...
[docs] @remote_api_endpoint("task/disable") def disable_tasks(self, task_ids: List[int]) -> None: """Disable the tasks whose ids are listed. Args: task_ids: list of tasks' identifiers """ ...
[docs] @remote_api_endpoint("task/search") def search_tasks( self, task_id: Optional[int] = None, task_type: Optional[str] = None, status: Optional[TaskStatus] = None, priority: Optional[TaskPriority] = None, policy: Optional[TaskPolicy] = None, before: Optional[datetime.datetime] = None, after: Optional[datetime.datetime] = None, limit: Optional[int] = None, ) -> List[Task]: """Search tasks from selected criterions Args: task_id: search a task with given identifier task_type: search tasks with given type status: search tasks with given status priority: search tasks with given priority policy: search tasks with given policy before: search tasks created before given date after: search tasks created after given date limit: maximum number of tasks to return Returns: a list of found tasksa """ ...
[docs] @remote_api_endpoint("task/get") def get_tasks(self, task_ids: List[int]) -> List[Task]: """Retrieve the info of tasks whose ids are listed. Args: task_ids: list of tasks' identifiers Returns: a list of tasks """ ...
[docs] @remote_api_endpoint("task/peek_ready") def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Task]: """Fetch the list of tasks (with no priority) to be scheduled. Args: task_type: filtering task per their type timestamp: peek tasks that need to be executed before that timestamp num_tasks: only peek at num_tasks tasks (with no priority) Returns: the list of tasks which would be scheduled """ ...
[docs] @remote_api_endpoint("task/grab_ready") def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Task]: """Fetch and schedule the list of tasks (with no priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: grab tasks that need to be executed before that timestamp num_tasks: only grab num_tasks tasks (with no priority) Returns: the list of scheduled tasks """ ...
[docs] @remote_api_endpoint("task/peek_ready_with_priority") def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Task]: """Fetch list of tasks (with any priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: peek tasks that need to be executed before that timestamp num_tasks: only peek at num_tasks tasks (with no priority) Returns: the list of tasks which would be scheduled """ ...
[docs] @remote_api_endpoint("task/grab_ready_with_priority") def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Task]: """Fetch and schedule the list of tasks (with any priority) ready to be scheduled. Args: task_type: filtering task per their type timestamp: grab tasks that need to be executed before that timestamp num_tasks: only grab num_tasks tasks (with no priority) Returns: the list of scheduled tasks """ ...
[docs] @remote_api_endpoint("task_run/schedule_one") def schedule_task_run( self, task_id: int, backend_id: str, metadata: Optional[Dict[str, Any]] = None, timestamp: Optional[datetime.datetime] = None, ) -> TaskRun: """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id: the identifier for the task being scheduled backend_id: the identifier of the job in the backend metadata: metadata to add to the task_run entry timestamp: the instant the event occurred Returns: a TaskRun object """ ...
[docs] @remote_api_endpoint("task_run/schedule") def mass_schedule_task_runs(self, task_runs: List[TaskRun]) -> None: """Schedule a bunch of task runs. Args: task_runs: a list of TaskRun objects created at least with the following parameters: - task - backend_id - scheduled """ ...
[docs] @remote_api_endpoint("task_run/start") def start_task_run( self, backend_id: str, metadata: Optional[Dict[str, Any]] = None, timestamp: Optional[datetime.datetime] = None, ) -> Optional[TaskRun]: """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id: the identifier of the job in the backend metadata: metadata to add to the task_run entry timestamp: the instant the event occurred Returns: a TaskRun object with updated fields """ ...
[docs] @remote_api_endpoint("task_run/end") def end_task_run( self, backend_id: str, status: TaskRunStatus, metadata: Optional[Dict[str, Any]] = None, timestamp: Optional[datetime.datetime] = None, ) -> Optional[TaskRun]: """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id: the identifier of the job in the backend status: how the task ended metadata: metadata to add to the task_run entry timestamp: the instant the event occurred Returns: a TaskRun object with updated fields """ ...
[docs] @remote_api_endpoint("task/filter_for_archive") def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ ...
[docs] @remote_api_endpoint("task/delete_archived") def delete_archived_tasks(self, task_ids): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ ...
[docs] @remote_api_endpoint("task_run/get") def get_task_runs( self, task_ids: List[int], limit: Optional[int] = None ) -> List[TaskRun]: """Search task run for a task id""" ...
[docs] @remote_api_endpoint("listers/get") def get_listers(self, with_first_visits_to_schedule: bool = False) -> List[Lister]: """Retrieve information about all listers from the database. Args: with_first_visits_to_schedule: if :const:`True` only retrieve listers whose first visits with high priority of listed origins were not scheduled yet (those type of listers have the first_visits_queue_prefix attribute set). """ ...
[docs] @remote_api_endpoint("listers/get_by_id") def get_listers_by_id(self, lister_ids: List[str]) -> List[Lister]: """Retrieve listers in batch, using their UUID"""
[docs] @remote_api_endpoint("lister/get") def get_lister( self, name: str, instance_name: Optional[str] = None ) -> Optional[Lister]: """Retrieve information about the given instance of the lister from the database. """ ...
[docs] @remote_api_endpoint("lister/get_or_create") def get_or_create_lister( self, name: str, instance_name: Optional[str] = None, first_visits_queue_prefix: Optional[str] = None, ) -> Lister: """Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist. """ ...
[docs] @remote_api_endpoint("lister/update") def update_lister(self, lister: Lister) -> Lister: """Update the state for the given lister instance in the database. Returns: a new Lister object, with all fields updated from the database Raises: StaleData if the `updated` timestamp for the lister instance in database doesn't match the one passed by the user. """ ...
[docs] @remote_api_endpoint("origins/record") def record_listed_origins( self, listed_origins: Iterable[ListedOrigin] ) -> List[ListedOrigin]: """Record a set of origins that a lister has listed. This performs an "upsert": origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen. """ ...
[docs] @remote_api_endpoint("origins/get") def get_listed_origins( self, lister_id: Optional[UUID] = None, url: Optional[str] = None, urls: Optional[List[str]] = None, enabled: Optional[bool] = True, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, ) -> PaginatedListedOriginList: """Get information on listed origins, possibly filtered, in a paginated way. Args: lister_id: if provided, return origins discovered with that lister url: (deprecated, use ``urls`` parameter instead) if provided, return origins matching that URL urls: if provided, return origins matching these URLs enabled: If :const:`True` return only enabled origins, if :const:`False` return only disabled origins, if :const:`None` return all origins. limit: maximum number of origins per page page_token: to get the next page of origins, is returned in the :class:`PaginatedListedOriginList` object Returns: A page of listed origins """ ...
[docs] @remote_api_endpoint("lister/visit_types") def get_visit_types_for_listed_origins(self, lister: Lister) -> List[str]: """Return list of distinct visit types for the origins listed by a given lister.""" ...
[docs] @remote_api_endpoint("origins/grab_next") def grab_next_visits( self, visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12), scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. This will mark the origins as scheduled in the origin_visit_stats table, to avoid scheduling multiple visits to the same origin. Arguments: visit_type: type of visits to schedule count: number of visits to schedule policy: the scheduling policy used to select which visits to schedule enabled: Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others. lister_uuid: Determine the list of origins listed from the lister with uuid lister_name: Determine the list of origins listed from the lister with name lister_instance_name: Determine the list of origins listed from the lister with instance name timestamp: the mocked timestamp at which we're recording that the visits are being scheduled (defaults to the current time) absolute_cooldown: the minimal interval between two visits of the same origin scheduled_cooldown: the minimal interval before which we can schedule the same origin again if it's not been visited failed_cooldown: the minimal interval before which we can reschedule a failed origin not_found_cooldown: the minimal interval before which we can reschedule a not_found origin tablesample: the percentage of the table on which we run the query (None: no sampling) """ ...
[docs] @remote_api_endpoint("visit_stats/upsert") def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats] ) -> None: """Create a new origin visit stats""" ...
[docs] @remote_api_endpoint("visit_stats/get") def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]] ) -> List[OriginVisitStats]: """Retrieve the visit statistics for an origin with a given visit type. .. warning:: If some visit statistics are not found, they are filtered out of the result. So the output list may be shorter than the input list. Args: ids: an iterable of (origin_url, visit_type) tuples Returns: a list of origin visit statistics """ ...
[docs] @remote_api_endpoint("visit_scheduler/get") def visit_scheduler_queue_position_get( self, ) -> Dict[str, int]: """Retrieve all current queue positions for the recurrent visit scheduler. Returns Mapping of visit type to their current queue position """ ...
[docs] @remote_api_endpoint("visit_scheduler/set") def visit_scheduler_queue_position_set( self, visit_type: str, position: int ) -> None: """Set the current queue position of the recurrent visit scheduler for `visit_type`.""" ...
[docs] @remote_api_endpoint("scheduler_metrics/update") def update_metrics( self, lister_id: Optional[UUID] = None, timestamp: Optional[datetime.datetime] = None, ) -> List[SchedulerMetrics]: """Update the performance metrics of this scheduler instance. Returns the updated metrics. Args: lister_id: if passed, update the metrics only for this lister instance timestamp: if passed, the date at which we're updating the metrics, defaults to the database NOW() """ ...
[docs] @remote_api_endpoint("scheduler_metrics/get") def get_metrics( self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None ) -> List[SchedulerMetrics]: """Retrieve the performance metrics of this scheduler instance. Args: lister_id: filter the metrics for this lister instance only visit_type: filter the metrics for this visit type only """ ...