swh.scheduler.cli.utils module#

swh.scheduler.cli.utils.parse_time_interval(time_str: str) timedelta[source]#

Parse a basic time interval e.g. ‘1 day’ or ‘2 hours’ into a timedelta object.

Parameters:

time_str – A string representing a basic time interval in days or hours.

Returns:

An equivalent representation of the string as a datetime.timedelta object.

Raises:

ValueError if the time interval could not be parsed.

swh.scheduler.cli.utils.schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs)[source]#
swh.scheduler.cli.utils.parse_argument(option)[source]#
swh.scheduler.cli.utils.parse_options(options: List[str]) Tuple[List[str], Dict][source]#

Parses options from a CLI as YAML and turns it into Python args and kwargs.

>>> parse_options([])
([], {})
>>> parse_options(['foo', 'bar'])
(['foo', 'bar'], {})
>>> parse_options(['[foo, bar]'])
([['foo', 'bar']], {})
>>> parse_options(['"foo"', '"bar"'])
(['foo', 'bar'], {})
>>> parse_options(['foo="bar"'])
([], {'foo': 'bar'})
>>> parse_options(['"foo"', 'bar="baz"'])
(['foo'], {'bar': 'baz'})
>>> parse_options(['42', 'bar=False'])
([42], {'bar': False})
>>> parse_options(['42', 'bar=false'])
([42], {'bar': False})
>>> parse_options(['foo', ''])
(['foo', ''], {})
>>> parse_options(['foo', 'bar='])
(['foo'], {'bar': ''})
>>> parse_options(['foo', 'null'])
(['foo', None], {})
>>> parse_options(['foo', 'bar=null'])
(['foo'], {'bar': None})
>>> parse_options(['42', '"foo'])
Traceback (most recent call last):
  ...
click.exceptions.ClickException: Invalid argument: "foo
swh.scheduler.cli.utils.pretty_print_list(list: List[Any], indent: int = 0)[source]#

Pretty-print a list

swh.scheduler.cli.utils.pretty_print_dict(dict: Dict[str, Any], indent: int = 0)[source]#

Pretty-print a list

swh.scheduler.cli.utils.format_dict(d)[source]#

Recursively format date objects in the dict passed as argument

swh.scheduler.cli.utils.pretty_print_run(run: TaskRun, indent: int = 4)[source]#
swh.scheduler.cli.utils.pretty_print_task(task: Task, full: bool = False)[source]#

Pretty-print a task

If full is True, also print the status and priority fields.

>>> import datetime
>>> from swh.scheduler.model import Task, TaskArguments
>>> task = Task(
...     id=1234,
...     arguments=TaskArguments(
...         args=["foo", "bar", True],
...         kwargs={"key": "value", "key2": 42},
...     ),
...     current_interval=datetime.timedelta(hours=1),
...     next_run=datetime.datetime(2019, 2, 21, 13, 52, 35, 407818),
...     policy="oneshot",
...     priority=None,
...     status="next_run_not_scheduled",
...     type="test_task",
... )
>>> print(click.unstyle(pretty_print_task(task)))
Task 1234
  Next run: ... (2019-02-21T13:52:35.407818)
  Interval: 1:00:00
  Type: test_task
  Policy: oneshot
  Args:
    'foo'
    'bar'
    True
  Keyword args:
    key: 'value'
    key2: 42

>>> print(click.unstyle(pretty_print_task(task, full=True)))
Task 1234
  Next run: ... (2019-02-21T13:52:35.407818)
  Interval: 1:00:00
  Type: test_task
  Policy: oneshot
  Status: next_run_not_scheduled
  Priority:
  Args:
    'foo'
    'bar'
    True
  Keyword args:
    key: 'value'
    key2: 42
swh.scheduler.cli.utils.task_add(scheduler: SchedulerInterface, task_type_name: str, args: List[str], kw: Dict, policy: TaskPolicy, priority: TaskPriority | None = None, next_run: datetime | None = None)[source]#

Add a task task_type_name in the scheduler.

swh.scheduler.cli.utils.lister_task_type(lister_name: str, lister_type: str | None = None) str[source]#

Compute expected scheduler task type from the lister name and its optional listing type (full, incremental).

swh.scheduler.cli.utils.check_listed_origins(scheduler: SchedulerInterface, lister_name: str, instance_name: str, limit: int = 100000)[source]#
swh.scheduler.cli.utils.count_ingested_origins(scheduler: SchedulerInterface, ids: Iterable[Tuple[str, str]], instance_name: str, with_listing: bool | None = False) Tuple[Dict[str, int], List][source]#

Count number of ingested origins grouped by status.