swh.scheduler.backend module¶
-
class
swh.scheduler.backend.
SchedulerBackend
(db, min_pool_conns=1, max_pool_conns=10)[source]¶ Bases:
object
Backend for the Software Heritage scheduling database.
-
task_type_keys
= ['type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay']¶
-
create_task_type
(task_type)[source]¶ Create a new task type ready for scheduling.
- Parameters
task_type (dict) –
a dictionary with the following keys:
type (str): an identifier for the task type
description (str): a human-readable description of what the task does
backend_name (str): the name of the task in the job-scheduling backend
default_interval (datetime.timedelta): the default interval between two task runs
min_interval (datetime.timedelta): the minimum interval between two task runs
max_interval (datetime.timedelta): the maximum interval between two task runs
backoff_factor (float): the factor by which the interval changes at each run
max_queue_length (int): the maximum length of the task queue for this task type
-
get_lister
(name: str, instance_name: Optional[str] = None) → Optional[swh.scheduler.model.Lister][source]¶ Retrieve information about the given instance of the lister from the database.
-
get_or_create_lister
(name: str, instance_name: Optional[str] = None) → swh.scheduler.model.Lister[source]¶ Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.
-
update_lister
(lister: swh.scheduler.model.Lister) → swh.scheduler.model.Lister[source]¶ Update the state for the given lister instance in the database.
- Returns
a new Lister object, with all fields updated from the database
- Raises
StaleData if the updated timestamp for the lister instance in –
database doesn’t match the one passed by the user.
-
record_listed_origins
(listed_origins: Iterable[swh.scheduler.model.ListedOrigin]) → List[swh.scheduler.model.ListedOrigin][source]¶ Record a set of origins that a lister has listed.
This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.
-
get_listed_origins
(lister_id: Optional[uuid.UUID] = None, url: Optional[str] = None, limit: int = 1000, page_token: Optional[Tuple[str, str]] = None) → swh.scheduler.interface.PaginatedListedOriginList[source]¶ Get information on the listed origins matching either the url or lister_id, or both arguments.
-
grab_next_visits
(visit_type: str, count: int, policy: str) → List[swh.scheduler.model.ListedOrigin][source]¶ Get at most the count next origins that need to be visited with the visit_type loader according to the given scheduling policy.
This will mark the origins as “being visited” in the listed_origins table, to avoid scheduling multiple visits to the same origin.
-
task_create_keys
= ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority']¶
-
task_keys
= ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority', 'id', 'current_interval']¶
-
create_tasks
(tasks, policy='recurring')[source]¶ Create new tasks.
- Parameters
tasks (list) –
each task is a dictionary with the following keys:
type (str): the task type
arguments (dict): the arguments for the task runner, keys:
args (list of str): arguments
kwargs (dict str -> str): keyword arguments
next_run (datetime.datetime): the next scheduled run for the task
- Returns
a list of created tasks.
-
set_status_tasks
(task_ids, status='disabled', next_run=None)[source]¶ Set the tasks’ status whose ids are listed.
If given, also set the next_run date.
-
search_tasks
(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None)[source]¶ Search tasks from selected criterions
-
peek_ready_tasks
(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)[source]¶ Fetch the list of ready tasks
- Parameters
task_type (str) – filtering task per their type
timestamp (datetime.datetime) – peek tasks that need to be executed before that timestamp
num_tasks (int) – only peek at num_tasks tasks (with no priority)
num_tasks_priority (int) – only peek at num_tasks_priority tasks (with priority)
- Returns
a list of tasks
-
grab_ready_tasks
(task_type, timestamp=None, num_tasks=None, num_tasks_priority=None)[source]¶ Fetch the list of ready tasks, and mark them as scheduled
- Parameters
task_type (str) – filtering task per their type
timestamp (datetime.datetime) – grab tasks that need to be executed before that timestamp
num_tasks (int) – only grab num_tasks tasks (with no priority)
num_tasks_priority (int) – only grab oneshot num_tasks tasks (with priorities)
- Returns
a list of tasks
-
task_run_create_keys
= ['task', 'backend_id', 'scheduled', 'metadata']¶
-
schedule_task_run
(task_id, backend_id, metadata=None, timestamp=None)[source]¶ Mark a given task as scheduled, adding a task_run entry in the database.
- Parameters
task_id (int) – the identifier for the task being scheduled
backend_id (str) – the identifier of the job in the backend
metadata (dict) – metadata to add to the task_run entry
timestamp (datetime.datetime) – the instant the event occurred
- Returns
a fresh task_run entry
-
mass_schedule_task_runs
(task_runs)[source]¶ Schedule a bunch of task runs.
- Parameters
task_runs (list) –
a list of dicts with keys:
task (int): the identifier for the task being scheduled
backend_id (str): the identifier of the job in the backend
metadata (dict): metadata to add to the task_run entry
scheduled (datetime.datetime): the instant the event occurred
- Returns
None
-
start_task_run
(backend_id, metadata=None, timestamp=None)[source]¶ - Mark a given task as started, updating the corresponding task_run
entry in the database.
- Parameters
backend_id (str) – the identifier of the job in the backend
metadata (dict) – metadata to add to the task_run entry
timestamp (datetime.datetime) – the instant the event occurred
- Returns
the updated task_run entry
-
end_task_run
(backend_id, status, metadata=None, timestamp=None, result=None)[source]¶ Mark a given task as ended, updating the corresponding task_run entry in the database.
- Parameters
backend_id (str) – the identifier of the job in the backend
status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’
metadata (dict) – metadata to add to the task_run entry
timestamp (datetime.datetime) – the instant the event occurred
- Returns
the updated task_run entry
-
filter_task_to_archive
(after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None) → Dict[str, Any][source]¶ Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.
- Returns
next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.
tasks: list of task dictionaries with the following keys:
id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …
- Return type
dict with the following keys
-
delete_archived_tasks
(task_ids)[source]¶ Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.
-
task_run_keys
= ['id', 'task', 'backend_id', 'scheduled', 'started', 'ended', 'metadata', 'status']¶
-
origin_visit_stats_upsert
(origin_visit_stats: Iterable[swh.scheduler.model.OriginVisitStats]) → None[source]¶
-
origin_visit_stats_get
(ids: Iterable[Tuple[str, str]]) → List[swh.scheduler.model.OriginVisitStats][source]¶
-
update_metrics
(lister_id: Optional[uuid.UUID] = None, timestamp: Optional[datetime.datetime] = None) → List[swh.scheduler.model.SchedulerMetrics][source]¶ Update the performance metrics of this scheduler instance.
Returns the updated metrics.
- Parameters
lister_id – if passed, update the metrics only for this lister instance
timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()
-
get_metrics
(lister_id: Optional[uuid.UUID] = None, visit_type: Optional[str] = None) → List[swh.scheduler.model.SchedulerMetrics][source]¶ Retrieve the performance metrics of this scheduler instance.
- Parameters
lister_id – filter the metrics for this lister instance only
visit_type – filter the metrics for this visit type only
-