swh.scheduler.cli.utils module#

swh.scheduler.cli.utils.parse_time_interval(time_str: str) timedelta[source]#

Parse a basic time interval e.g. ‘1 day’ or ‘2 hours’ into a timedelta object.

Parameters:

time_str – A string representing a basic time interval in days or hours.

Returns:

An equivalent representation of the string as a datetime.timedelta object.

Raises:

ValueError if the time interval could not be parsed.

swh.scheduler.cli.utils.schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs)[source]#
swh.scheduler.cli.utils.parse_argument(option)[source]#
swh.scheduler.cli.utils.parse_options(options: List[str]) Tuple[List[str], Dict][source]#

Parses options from a CLI as YAML and turns it into Python args and kwargs.

>>> parse_options([])
([], {})
>>> parse_options(['foo', 'bar'])
(['foo', 'bar'], {})
>>> parse_options(['[foo, bar]'])
([['foo', 'bar']], {})
>>> parse_options(['"foo"', '"bar"'])
(['foo', 'bar'], {})
>>> parse_options(['foo="bar"'])
([], {'foo': 'bar'})
>>> parse_options(['"foo"', 'bar="baz"'])
(['foo'], {'bar': 'baz'})
>>> parse_options(['42', 'bar=False'])
([42], {'bar': False})
>>> parse_options(['42', 'bar=false'])
([42], {'bar': False})
>>> parse_options(['foo', ''])
(['foo', ''], {})
>>> parse_options(['foo', 'bar='])
(['foo'], {'bar': ''})
>>> parse_options(['foo', 'null'])
(['foo', None], {})
>>> parse_options(['foo', 'bar=null'])
(['foo'], {'bar': None})
>>> parse_options(['42', '"foo'])
Traceback (most recent call last):
  ...
click.exceptions.ClickException: Invalid argument: "foo
swh.scheduler.cli.utils.get_task_type(scheduler: SchedulerInterface, visit_type: str) TaskType | None[source]#

Given a visit type, return its associated task type.

swh.scheduler.cli.utils.send_to_celery(scheduler: SchedulerInterface, visit_type_to_queue: Dict[str, str], enabled: bool = True, lister_name: str | None = None, lister_instance_name: str | None = None, policy: str = 'oldest_scheduled_first', tablesample: float | None = None, absolute_cooldown: timedelta | None = None, scheduled_cooldown: timedelta | None = None, failed_cooldown: timedelta | None = None, not_found_cooldown: timedelta | None = None)[source]#

Utility function to read tasks from the scheduler and send those directly to celery.

Parameters:
  • visit_type_to_queue – Optional mapping of visit/loader type (e.g git, svn, …) to queue to send task to.

  • enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.

  • lister_name – Determine the list of origins listed from the lister with name

  • lister_instance_name – Determine the list of origins listed from the lister with instance name

  • policy – the scheduling policy used to select which visits to schedule

  • tablesample – the percentage of the table on which we run the query (None: no sampling)

  • absolute_cooldown – the minimal interval between two visits of the same origin

  • scheduled_cooldown – the minimal interval before which we can schedule the same origin again if it’s not been visited

  • failed_cooldown – the minimal interval before which we can reschedule a failed origin

  • not_found_cooldown – the minimal interval before which we can reschedule a not_found origin

swh.scheduler.cli.utils.pretty_print_list(list: List[Any], indent: int = 0)[source]#

Pretty-print a list

swh.scheduler.cli.utils.pretty_print_dict(dict: Dict[str, Any], indent: int = 0)[source]#

Pretty-print a list

swh.scheduler.cli.utils.format_dict(d)[source]#

Recursively format date objects in the dict passed as argument

swh.scheduler.cli.utils.pretty_print_run(run: TaskRun, indent: int = 4)[source]#
swh.scheduler.cli.utils.pretty_print_task(task: Task, full: bool = False)[source]#

Pretty-print a task

If full is True, also print the status and priority fields.

>>> import datetime
>>> from swh.scheduler.model import Task, TaskArguments
>>> task = Task(
...     id=1234,
...     arguments=TaskArguments(
...         args=["foo", "bar", True],
...         kwargs={"key": "value", "key2": 42},
...     ),
...     current_interval=datetime.timedelta(hours=1),
...     next_run=datetime.datetime(2019, 2, 21, 13, 52, 35, 407818),
...     policy="oneshot",
...     priority=None,
...     status="next_run_not_scheduled",
...     type="test_task",
... )
>>> print(click.unstyle(pretty_print_task(task)))
Task 1234
  Next run: ... (2019-02-21T13:52:35.407818)
  Interval: 1:00:00
  Type: test_task
  Policy: oneshot
  Args:
    'foo'
    'bar'
    True
  Keyword args:
    key: 'value'
    key2: 42

>>> print(click.unstyle(pretty_print_task(task, full=True)))
Task 1234
  Next run: ... (2019-02-21T13:52:35.407818)
  Interval: 1:00:00
  Type: test_task
  Policy: oneshot
  Status: next_run_not_scheduled
  Priority:
  Args:
    'foo'
    'bar'
    True
  Keyword args:
    key: 'value'
    key2: 42
swh.scheduler.cli.utils.task_add(scheduler: SchedulerInterface, task_type_name: str, args: List[str], kw: Dict, policy: TaskPolicy, priority: TaskPriority | None = None, next_run: datetime | None = None)[source]#

Add a task task_type_name in the scheduler.

swh.scheduler.cli.utils.lister_task_type(lister_name: str, lister_type: str | None = None) str[source]#

Compute expected scheduler task type from the lister name and its optional listing type (full, incremental).

swh.scheduler.cli.utils.check_listed_origins(scheduler: SchedulerInterface, lister_name: str, instance_name: str, limit: int = 100000)[source]#
swh.scheduler.cli.utils.count_ingested_origins(scheduler: SchedulerInterface, ids: Iterable[Tuple[str, str]], instance_name: str, with_listing: bool | None = False) Tuple[Dict[str, int], List][source]#

Count number of ingested origins grouped by status.