swh.scheduler.backend module#

swh.scheduler.backend.adapt_LastVisitStatus(v: LastVisitStatus)[source]#
swh.scheduler.backend.format_query(query, keys)[source]#

Format a query with the given keys

class swh.scheduler.backend.SchedulerBackend(db, min_pool_conns=1, max_pool_conns=10)[source]#

Bases: object

Backend for the Software Heritage scheduling database.

Parameters:

db_conn – either a libpq connection string, or a psycopg2 connection

current_version = 35#
get_db()[source]#
put_db(db)[source]#
task_type_keys = ['type', 'description', 'backend_name', 'default_interval', 'min_interval', 'max_interval', 'backoff_factor', 'max_queue_length', 'num_retries', 'retry_delay']#
create_task_type(task_type)[source]#

Create a new task type ready for scheduling.

Parameters:

task_type (dict) –

a dictionary with the following keys:

  • type (str): an identifier for the task type

  • description (str): a human-readable description of what the task does

  • backend_name (str): the name of the task in the job-scheduling backend

  • default_interval (datetime.timedelta): the default interval between two task runs

  • min_interval (datetime.timedelta): the minimum interval between two task runs

  • max_interval (datetime.timedelta): the maximum interval between two task runs

  • backoff_factor (float): the factor by which the interval changes at each run

  • max_queue_length (int): the maximum length of the task queue for this task type

get_task_type(task_type_name)[source]#

Retrieve the task type with id task_type_name

get_task_types()[source]#

Retrieve all registered task types

get_listers() List[Lister][source]#

Retrieve information about all listers from the database.

get_listers_by_id(lister_ids: List[str]) List[Lister][source]#

Retrieve listers in batch, using their UUID

get_lister(name: str, instance_name: str | None = None) Lister | None[source]#

Retrieve information about the given instance of the lister from the database.

get_or_create_lister(name: str, instance_name: str | None = None) Lister[source]#

Retrieve information about the given instance of the lister from the database, or create the entry if it did not exist.

update_lister(lister: Lister) Lister[source]#

Update the state for the given lister instance in the database.

Returns:

a new Lister object, with all fields updated from the database

Raises:

StaleData if the updated timestamp for the lister instance in

database doesn’t match the one passed by the user.

record_listed_origins(listed_origins: Iterable[ListedOrigin]) List[ListedOrigin][source]#

Record a set of origins that a lister has listed.

This performs an “upsert”: origins with the same (lister_id, url, visit_type) values are updated with new values for extra_loader_arguments, last_update and last_seen.

get_listed_origins(lister_id: UUID | None = None, url: str | None = None, enabled: bool | None = True, limit: int = 1000, page_token: Tuple[str, str] | None = None) PaginatedListedOriginList[source]#

Get information on the listed origins matching either the url or lister_id, or both arguments.

grab_next_visits(visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: str | None = None, lister_name: str | None = None, lister_instance_name: str | None = None, timestamp: datetime | None = None, absolute_cooldown: timedelta | None = datetime.timedelta(seconds=43200), scheduled_cooldown: timedelta | None = datetime.timedelta(days=7), failed_cooldown: timedelta | None = datetime.timedelta(days=14), not_found_cooldown: timedelta | None = datetime.timedelta(days=31), tablesample: float | None = None) List[ListedOrigin][source]#
task_create_keys = ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority']#
task_keys = ['type', 'arguments', 'next_run', 'policy', 'status', 'retries_left', 'priority', 'id', 'current_interval']#
create_tasks(tasks, policy='recurring')[source]#

Create new tasks.

Parameters:

tasks (list) –

each task is a dictionary with the following keys:

  • type (str): the task type

  • arguments (dict): the arguments for the task runner, keys:

    • args (list of str): arguments

    • kwargs (dict str -> str): keyword arguments

  • next_run (datetime.datetime): the next scheduled run for the task

Returns:

a list of created tasks.

set_status_tasks(task_ids: List[int], status: str = 'disabled', next_run: datetime | None = None)[source]#

Set the tasks’ status whose ids are listed.

If given, also set the next_run date.

disable_tasks(task_ids)[source]#

Disable the tasks whose ids are listed.

search_tasks(task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None)[source]#

Search tasks from selected criterions

get_tasks(task_ids)[source]#

Retrieve the info of tasks whose ids are listed.

peek_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Dict][source]#
grab_ready_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Dict][source]#
peek_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Dict][source]#
grab_ready_priority_tasks(task_type: str, timestamp: datetime | None = None, num_tasks: int | None = None) List[Dict][source]#
task_run_create_keys = ['task', 'backend_id', 'scheduled', 'metadata']#
schedule_task_run(task_id, backend_id, metadata=None, timestamp=None)[source]#

Mark a given task as scheduled, adding a task_run entry in the database.

Parameters:
  • task_id (int) – the identifier for the task being scheduled

  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns:

a fresh task_run entry

mass_schedule_task_runs(task_runs)[source]#

Schedule a bunch of task runs.

Parameters:

task_runs (list) –

a list of dicts with keys:

  • task (int): the identifier for the task being scheduled

  • backend_id (str): the identifier of the job in the backend

  • metadata (dict): metadata to add to the task_run entry

  • scheduled (datetime.datetime): the instant the event occurred

Returns:

None

start_task_run(backend_id, metadata=None, timestamp=None)[source]#
Mark a given task as started, updating the corresponding task_run

entry in the database.

Parameters:
  • backend_id (str) – the identifier of the job in the backend

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns:

the updated task_run entry

end_task_run(backend_id, status, metadata=None, timestamp=None, result=None)[source]#

Mark a given task as ended, updating the corresponding task_run entry in the database.

Parameters:
  • backend_id (str) – the identifier of the job in the backend

  • status (str) – how the task ended; one of: ‘eventful’, ‘uneventful’, ‘failed’

  • metadata (dict) – metadata to add to the task_run entry

  • timestamp (datetime.datetime) – the instant the event occurred

Returns:

the updated task_run entry

filter_task_to_archive(after_ts: str, before_ts: str, limit: int = 10, page_token: str | None = None) Dict[str, Any][source]#

Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result.

Returns:

  • next_page_token: opaque token to be used as page_token to retrieve the next page of result. If absent, there is no more pages to gather.

  • tasks: list of task dictionaries with the following keys:

    id (str): origin task id started (Optional[datetime]): started date scheduled (datetime): scheduled date arguments (json dict): task’s arguments …

Return type:

dict with the following keys

delete_archived_tasks(task_ids)[source]#

Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be.

task_run_keys = ['id', 'task', 'backend_id', 'scheduled', 'started', 'ended', 'metadata', 'status']#
get_task_runs(task_ids, limit=None)[source]#

Search task run for a task id

origin_visit_stats_upsert(origin_visit_stats: Iterable[OriginVisitStats]) None[source]#
origin_visit_stats_get(ids: Iterable[Tuple[str, str]]) List[OriginVisitStats][source]#
visit_scheduler_queue_position_get() Dict[str, int][source]#
visit_scheduler_queue_position_set(visit_type: str, position: int) None[source]#
update_metrics(lister_id: UUID | None = None, timestamp: datetime | None = None) List[SchedulerMetrics][source]#

Update the performance metrics of this scheduler instance.

Returns the updated metrics.

Parameters:
  • lister_id – if passed, update the metrics only for this lister instance

  • timestamp – if passed, the date at which we’re updating the metrics, defaults to the database NOW()

get_metrics(lister_id: UUID | None = None, visit_type: str | None = None) List[SchedulerMetrics][source]#

Retrieve the performance metrics of this scheduler instance.

Parameters:
  • lister_id – filter the metrics for this lister instance only

  • visit_type – filter the metrics for this visit type only

class swh.scheduler.backend.TemporarySchedulerBackend[source]#

Bases: SchedulerBackend

Temporary postgresql backend for the Software Heritage scheduling database.

A temporary scheduler database is spawned then removed when the backend gets destroyed.

It can be used for testing SWH components that require a scheduler instance (listers for instance).

Parameters:

db_conn – either a libpq connection string, or a psycopg2 connection