swh.scheduler.cli.utils module#
- swh.scheduler.cli.utils.schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs)[source]#
- swh.scheduler.cli.utils.parse_options(options: List[str]) Tuple[List[str], Dict] [source]#
Parses options from a CLI as YAML and turns it into Python args and kwargs.
>>> parse_options([]) ([], {}) >>> parse_options(['foo', 'bar']) (['foo', 'bar'], {}) >>> parse_options(['[foo, bar]']) ([['foo', 'bar']], {}) >>> parse_options(['"foo"', '"bar"']) (['foo', 'bar'], {}) >>> parse_options(['foo="bar"']) ([], {'foo': 'bar'}) >>> parse_options(['"foo"', 'bar="baz"']) (['foo'], {'bar': 'baz'}) >>> parse_options(['42', 'bar=False']) ([42], {'bar': False}) >>> parse_options(['42', 'bar=false']) ([42], {'bar': False}) >>> parse_options(['foo', '']) (['foo', ''], {}) >>> parse_options(['foo', 'bar=']) (['foo'], {'bar': ''}) >>> parse_options(['foo', 'null']) (['foo', None], {}) >>> parse_options(['foo', 'bar=null']) (['foo'], {'bar': None}) >>> parse_options(['42', '"foo']) Traceback (most recent call last): ... click.exceptions.ClickException: Invalid argument: "foo
- swh.scheduler.cli.utils.get_task_type(scheduler: SchedulerInterface, visit_type: str) Optional[Dict] [source]#
Given a visit type, return its associated task type.
- swh.scheduler.cli.utils.send_to_celery(scheduler: SchedulerInterface, visit_type_to_queue: Dict[str, str], enabled: bool = True, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, policy: str = 'oldest_scheduled_first', tablesample: Optional[float] = None)[source]#
Utility function to read tasks from the scheduler and send those directly to celery.
- Parameters:
visit_type_to_queue – Optional mapping of visit/loader type (e.g git, svn, …) to queue to send task to.
enabled – Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others.
lister_name – Determine the list of origins listed from the lister with name
lister_instance_name – Determine the list of origins listed from the lister with instance name
policy – the scheduling policy used to select which visits to schedule
tablesample – the percentage of the table on which we run the query (None: no sampling)
- swh.scheduler.cli.utils.format_dict(d)[source]#
Recursively format date objects in the dict passed as argument
- swh.scheduler.cli.utils.pretty_print_task(task, full=False)[source]#
Pretty-print a task
If ‘full’ is True, also print the status and priority fields.
>>> import datetime >>> task = { ... 'id': 1234, ... 'arguments': { ... 'args': ['foo', 'bar', True], ... 'kwargs': {'key': 'value', 'key2': 42}, ... }, ... 'current_interval': datetime.timedelta(hours=1), ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), ... 'policy': 'oneshot', ... 'priority': None, ... 'status': 'next_run_not_scheduled', ... 'type': 'test_task', ... } >>> print(click.unstyle(pretty_print_task(task))) Task 1234 Next run: ... (2019-02-21T13:52:35.407818) Interval: 1:00:00 Type: test_task Policy: oneshot Args: 'foo' 'bar' True Keyword args: key: 'value' key2: 42 >>> print(click.unstyle(pretty_print_task(task, full=True))) Task 1234 Next run: ... (2019-02-21T13:52:35.407818) Interval: 1:00:00 Type: test_task Policy: oneshot Status: next_run_not_scheduled Priority: Args: 'foo' 'bar' True Keyword args: key: 'value' key2: 42