swh.scheduler.backend module#
- swh.scheduler.backend.adapt_LastVisitStatus(v: LastVisitStatus)[source]#
- swh.scheduler.backend.format_query(query: str, keys: Sequence[str]) str [source]#
Format a query with the given keys
- class swh.scheduler.backend.SchedulerBackend(db, min_pool_conns=1, max_pool_conns=10)[source]#
Bases:
object
Backend for the Software Heritage scheduling database.
- Parameters:
db – either a libpq connection string, or a psycopg2 connection
- current_version = 38#
- task_type_keys = ['type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay']#
- create_task_type(task_type: TaskType) None [source]#
Create a new task type ready for scheduling.
- Parameters:
task_type – a TaskType dictionary
- get_task_type(task_type_name: str) TaskType | None [source]#
Retrieve the task type with id task_type_name
- get_listers(with_first_visits_to_schedule: bool = False) List[Lister] [source]#
Retrieve information about all listers from the database.
- get_listers_by_id(lister_ids: List[str]) List[Lister] [source]#
Retrieve listers in batch, using their UUID
- get_lister(name: str, instance_name: str | None = None) Lister | None [source]#
Retrieve information about the given instance of the lister from the database.
- get_or_create_lister(name: str, instance_name: str | None = None, first_visits_queue_prefix: str | None = None) Lister [source]#
Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.
- update_lister(lister: Lister) Lister [source]#
Update the state for the given lister instance in the database.
- Returns:
a new Lister object, with all fields updated from the database
- Raises:
StaleData if the updated timestamp for the lister instance in –
database doesn’t match the one passed by the user.
- record_listed_origins(listed_origins: Iterable[ListedOrigin]) List[ListedOrigin] [source]#
Record a set of origins that a lister has listed.
This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.
- get_listed_origins(lister_id: UUID | None = None, url: str | None = None, urls: List[str] | None = None, enabled: bool | None = True, limit: int = 1000, page_token: Tuple[str, str] | None = None) PaginatedListedOriginList [source]#
Get information on the listed origins matching either the url or lister_id, or both arguments.
- grab_next_visits(visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: str | None = None, lister_name: str | None = None, lister_instance_name: str | None = None, timestamp: datetime | None = None, absolute_cooldown: timedelta | None = datetime.timedelta(seconds=43200), scheduled_cooldown: timedelta | None = datetime.timedelta(days=7), failed_cooldown: timedelta | None = datetime.timedelta(days=14), not_found_cooldown: timedelta | None = datetime.timedelta(days=31), tablesample: float | None = None) List[ListedOrigin] [source]#
- task_keys = ['type', 'arguments', 'next_run', 'status', 'policy', 'retries_left', 'id', 'current_interval', 'priority']#
- task_create_keys = ['type', 'arguments', 'next_run', 'status', 'policy', 'retries_left', 'priority']#
- create_tasks(tasks: List[Task], policy: Literal['recurring', 'oneshot'] = 'recurring') List[Task] [source]#
Create new tasks.
- Parameters:
tasks –
each task is a Task object created with at least the following parameters:
type
arguments
next_run
policy – default task policy (either recurring or oneshot) to use if not set in input task objects
- Returns:
a list of created tasks with database ids filled
- set_status_tasks(task_ids: List[int], status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] = 'disabled', next_run: datetime | None = None) None [source]#
Set the tasks’ status whose ids are listed.
- Parameters:
task_ids – list of tasks’ identifiers
status – the status to set for the tasks
next_run – if provided, also set the next_run date
- disable_tasks(task_ids: List[int]) None [source]#
Disable the tasks whose ids are listed.
- Parameters:
task_ids – list of tasks’ identifiers
- search_tasks(task_id: int | None = None, task_type: str | None = None, status: Literal['next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'] | None = None, priority: Literal['high', 'normal', 'low'] | None = None, policy: Literal['recurring', 'oneshot'] | None = None, before: datetime | None = None, after: datetime | None = None, limit: int | None = None) List[Task] [source]#
Search tasks from selected criterions
- Parameters:
task_id – search a task with given identifier
task_type – search tasks with given type
status – search tasks with given status
priority – search tasks with given priority
policy – search tasks with given policy
before – search tasks created before given date
after – search tasks created after given date
limit – maximum number of tasks to return
- Returns:
a list of found tasks
- get_tasks(task_ids: List[int]) List[Task] [source]#
Retrieve the info of tasks whose ids are listed.
- Parameters:
task_ids – list of tasks’ identifiers
- Returns:
a list of Task objects
- peek_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] [source]#
Fetch the list of tasks (with no priority) to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – peek tasks that need to be executed before that timestamp
num_tasks – only peek at num_tasks tasks (with no priority)
- Returns:
the list of tasks which would be scheduled
- grab_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] [source]#
Fetch and schedule the list of tasks (with no priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – grab tasks that need to be executed before that timestamp
num_tasks – only grab num_tasks tasks (with no priority)
- Returns:
the list of scheduled tasks
- peek_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] [source]#
Fetch list of tasks (with any priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – peek tasks that need to be executed before that timestamp
num_tasks – only peek at num_tasks tasks (with no priority)
- Returns:
the list of tasks which would be scheduled
- grab_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Task] [source]#
Fetch and schedule the list of tasks (with any priority) ready to be scheduled.
- Parameters:
task_type – filtering task per their type
timestamp – grab tasks that need to be executed before that timestamp
num_tasks – only grab num_tasks tasks (with no priority)
- Returns:
the list of scheduled tasks
- task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata']#
- schedule_task_run(task_id: int, backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun [source]#
Mark a given task as scheduled, adding a task_run entry in the database.
- Parameters:
task_id – the identifier for the task being scheduled
backend_id – the identifier of the job in the backend
metadata – metadata to add to the task_run entry
timestamp – the instant the event occurred
- Returns:
a TaskRun object
- mass_schedule_task_runs(task_runs: List[TaskRun]) None [source]#
Schedule a bunch of task runs.
- Parameters:
task_runs –
a list of TaskRun objects created at least with the following parameters:
task
backend_id
scheduled
- start_task_run(backend_id: str, metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None [source]#
- Mark a given task as started, updating the corresponding task_run
entry in the database.
- Parameters:
backend_id – the identifier of the job in the backend
metadata – metadata to add to the task_run entry
timestamp – the instant the event occurred
- Returns:
a TaskRun object with updated fields, or None if there was no TaskRun recorded with a matching backend_id.
- end_task_run(backend_id: str, status: Literal['scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'], metadata: Dict[str, Any] | None = None, timestamp: datetime | None = None) TaskRun | None [source]#
Mark a given task as ended, updating the corresponding task_run entry in the database.
- Parameters:
backend_id – the identifier of the job in the backend
status – how the task ended
metadata – metadata to add to the task_run entry
timestamp – the instant the event occurred
- Returns:
a TaskRun object with updated fields, or None if there was no TaskRun recorded with a matching backend_id.
- filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: str | None = None) Dict[str, Any] [source]#
Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.
- Returns:
next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.
tasks: list of task dictionaries with the following keys:
id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …
- Return type:
dict with the following keys
- delete_archived_tasks(task_ids)[source]#
Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.
- task_run_keys = ['id', 'task', 'backend_id', 'scheduled', 'started', 'ended', 'metadata', 'status']#
- get_task_runs(task_ids: List[int], limit: int | None = None) List[TaskRun] [source]#
Search task run for a task id
- origin_visit_stats_upsert(origin_visit_stats: Iterable[OriginVisitStats]) None [source]#
- update_metrics(lister_id: UUID | None = None, timestamp: datetime | None = None) List[SchedulerMetrics] [source]#
Update the performance metrics of this scheduler instance.
Returns the updated metrics.
- Parameters:
lister_id – if passed, update the metrics only for this lister instance
timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()
- class swh.scheduler.backend.TemporarySchedulerBackend[source]#
Bases:
SchedulerBackend
Temporary postgresql backend for the Software Heritage scheduling database.
A temporary scheduler database is spawned then removed when the backend gets destroyed.
It can be used for testing SWH components that require a scheduler instance (listers for instance).
- Parameters:
db – either a libpq connection string, or a psycopg2 connection