swh.scheduler.cli.utils module#

swh.scheduler.cli.utils.parse_time_interval(time_str: str) timedelta[source]#

Parse a basic time interval e.g. ‘1 day’ or ‘2 hours’ into a timedelta object.

Parameters:

time_str – A string representing a basic time interval in days or hours.

Returns:

An equivalent representation of the string as a datetime.timedelta object.

Raises:

ValueError if the time interval could not be parsed.

swh.scheduler.cli.utils.schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs)[source]#
swh.scheduler.cli.utils.parse_argument(option)[source]#
swh.scheduler.cli.utils.parse_options(options: List[str]) Tuple[List[str], Dict][source]#

Parses options from a CLI as YAML and turns it into Python args and kwargs.

>>> parse_options([])
([], {})
>>> parse_options(['foo', 'bar'])
(['foo', 'bar'], {})
>>> parse_options(['[foo, bar]'])
([['foo', 'bar']], {})
>>> parse_options(['"foo"', '"bar"'])
(['foo', 'bar'], {})
>>> parse_options(['foo="bar"'])
([], {'foo': 'bar'})
>>> parse_options(['"foo"', 'bar="baz"'])
(['foo'], {'bar': 'baz'})
>>> parse_options(['42', 'bar=False'])
([42], {'bar': False})
>>> parse_options(['42', 'bar=false'])
([42], {'bar': False})
>>> parse_options(['foo', ''])
(['foo', ''], {})
>>> parse_options(['foo', 'bar='])
(['foo'], {'bar': ''})
>>> parse_options(['foo', 'null'])
(['foo', None], {})
>>> parse_options(['foo', 'bar=null'])
(['foo'], {'bar': None})
>>> parse_options(['42', '"foo'])
Traceback (most recent call last):
  ...
click.exceptions.ClickException: Invalid argument: "foo
swh.scheduler.cli.utils.get_task_type(scheduler: SchedulerInterface, visit_type: str) Dict | None[source]#

Given a visit type, return its associated task type.

swh.scheduler.cli.utils.send_to_celery(scheduler: SchedulerInterface, visit_type_to_queue: Dict[str, str], enabled: bool = True, lister_name: str | None = None, lister_instance_name: str | None = None, policy: str = 'oldest_scheduled_first', tablesample: float | None = None, absolute_cooldown: timedelta | None = None, scheduled_cooldown: timedelta | None = None, failed_cooldown: timedelta | None = None, not_found_cooldown: timedelta | None = None)[source]#

Utility function to read tasks from the scheduler and send those directly to celery.

Parameters:
  • visit_type_to_queue – Optional mapping of visit/loader type (e.g git, svn, …) to queue to send task to.

  • enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.

  • lister_name – Determine the list of origins listed from the lister with name

  • lister_instance_name – Determine the list of origins listed from the lister with instance name

  • policy – the scheduling policy used to select which visits to schedule

  • tablesample – the percentage of the table on which we run the query (None: no sampling)

  • absolute_cooldown – the minimal interval between two visits of the same origin

  • scheduled_cooldown – the minimal interval before which we can schedule the same origin again if it’s not been visited

  • failed_cooldown – the minimal interval before which we can reschedule a failed origin

  • not_found_cooldown – the minimal interval before which we can reschedule a not_found origin

swh.scheduler.cli.utils.pretty_print_list(list, indent=0)[source]#

Pretty-print a list

swh.scheduler.cli.utils.pretty_print_dict(dict, indent=0)[source]#

Pretty-print a list

swh.scheduler.cli.utils.format_dict(d)[source]#

Recursively format date objects in the dict passed as argument

swh.scheduler.cli.utils.pretty_print_run(run, indent=4)[source]#
swh.scheduler.cli.utils.pretty_print_task(task, full=False)[source]#

Pretty-print a task

If ‘full’ is True, also print the status and priority fields.

>>> import datetime
>>> task = {
...     'id': 1234,
...     'arguments': {
...         'args': ['foo', 'bar', True],
...         'kwargs': {'key': 'value', 'key2': 42},
...     },
...     'current_interval': datetime.timedelta(hours=1),
...     'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818),
...     'policy': 'oneshot',
...     'priority': None,
...     'status': 'next_run_not_scheduled',
...     'type': 'test_task',
... }
>>> print(click.unstyle(pretty_print_task(task)))
Task 1234
  Next run: ... (2019-02-21T13:52:35.407818)
  Interval: 1:00:00
  Type: test_task
  Policy: oneshot
  Args:
    'foo'
    'bar'
    True
  Keyword args:
    key: 'value'
    key2: 42

>>> print(click.unstyle(pretty_print_task(task, full=True)))
Task 1234
  Next run: ... (2019-02-21T13:52:35.407818)
  Interval: 1:00:00
  Type: test_task
  Policy: oneshot
  Status: next_run_not_scheduled
  Priority:
  Args:
    'foo'
    'bar'
    True
  Keyword args:
    key: 'value'
    key2: 42
swh.scheduler.cli.utils.task_add(scheduler: SchedulerInterface, task_type_name: str, args: List[str], kw: Dict, policy: str, priority: str | None = None, next_run: str | None = None)[source]#

Add a task task_type_name in the scheduler.

swh.scheduler.cli.utils.lister_task_type(lister_name: str, lister_type: str | None = None) str[source]#

Compute expected scheduler task type from the lister name and its optional listing type (full, incremental).

swh.scheduler.cli.utils.check_listed_origins(scheduler: SchedulerInterface, lister_name: str, instance_name: str, limit: int = 100000)[source]#
swh.scheduler.cli.utils.count_ingested_origins(scheduler: SchedulerInterface, ids: Iterable[Tuple[str, str]], instance_name: str, with_listing: bool | None = False) Tuple[Dict[str, int], List][source]#

Count number of ingested origins grouped by status.