Source code for swh.scheduler.cli.utils

# Copyright (C) 2019-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

# WARNING: do not import unnecessary things here to keep cli startup time under
# control

from __future__ import annotations

from datetime import timedelta
import re
from typing import TYPE_CHECKING, Iterable

import click

if TYPE_CHECKING:
    from typing import Dict, List, Optional, Tuple

    from swh.scheduler.interface import SchedulerInterface


TASK_BATCH_SIZE = 1000  # Number of tasks per query to the scheduler


TIME_INTERVAL_REGEXP = re.compile(
    r"""
    # optional group for days
    (?:
        (?P<days>\d+\.?|\d*\.\d+) # floating point number, e.g. "1", "2.", "3.4" or ".5".
        \x20*
        (?:day|days|d)
    )?
    \x20? # optional space
    # optional group for hours
    (?:
        (?P<hours>\d+\.?|\d*\.\d+)
        \x20*
        (?:h|hr|hrs|hour|hours)
    )?
    \x20? # optional space
    # optional group for minutes
    (?:
        (?P<minutes>\d+\.?|\d*\.\d+)
        \x20*
        (?:m|min|mins|minute|minutes)
    )?
    \x20? # optional space
    # optional group for seconds
    (?:
        (?P<seconds>\d+\.?|\d*\.\d+)
        \x20*
        (?:s|sec|second|seconds)
    )?
    """,
    re.VERBOSE,
)


[docs] def parse_time_interval(time_str: str) -> timedelta: """Parse a basic time interval e.g. '1 day' or '2 hours' into a timedelta object. Args: time_str: A string representing a basic time interval in days or hours. Returns: An equivalent representation of the string as a datetime.timedelta object. Raises: ValueError if the time interval could not be parsed. """ parts = TIME_INTERVAL_REGEXP.fullmatch(time_str) if not parts: raise ValueError(f"{time_str!r} could not be parsed as a time interval") time_params = { name: float(param) for name, param in parts.groupdict().items() if param } if not time_params: # The regexp lets a bare space go through raise ValueError(f"{time_str!r} could not be parsed as a time interval") return timedelta(**time_params)
[docs] def schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs): from itertools import islice from swh.scheduler.utils import create_task_dict nb_origins = 0 nb_tasks = 0 while True: task_batch = [] for _ in range(TASK_BATCH_SIZE): # Group origins origin_batch = [] for origin in islice(origins, origin_batch_size): origin_batch.append(origin) nb_origins += len(origin_batch) if not origin_batch: break # Create a task for these origins args = [origin_batch] task_dict = create_task_dict(task_type, "oneshot", *args, **kwargs) task_batch.append(task_dict) # Schedule a batch of tasks if not task_batch: break nb_tasks += len(task_batch) if scheduler: scheduler.create_tasks(task_batch) click.echo("Scheduled %d tasks (%d origins)." % (nb_tasks, nb_origins)) # Print final status. if nb_tasks: click.echo("Done.") else: click.echo("Nothing to do (no origin metadata matched the criteria).")
[docs] def parse_argument(option): import yaml if option == "": # yaml.safe_load("") returns None return "" try: return yaml.safe_load(option) except Exception: raise click.ClickException("Invalid argument: {}".format(option))
[docs] def parse_options(options: List[str]) -> Tuple[List[str], Dict]: """Parses options from a CLI as YAML and turns it into Python args and kwargs. >>> parse_options([]) ([], {}) >>> parse_options(['foo', 'bar']) (['foo', 'bar'], {}) >>> parse_options(['[foo, bar]']) ([['foo', 'bar']], {}) >>> parse_options(['"foo"', '"bar"']) (['foo', 'bar'], {}) >>> parse_options(['foo="bar"']) ([], {'foo': 'bar'}) >>> parse_options(['"foo"', 'bar="baz"']) (['foo'], {'bar': 'baz'}) >>> parse_options(['42', 'bar=False']) ([42], {'bar': False}) >>> parse_options(['42', 'bar=false']) ([42], {'bar': False}) >>> parse_options(['foo', '']) (['foo', ''], {}) >>> parse_options(['foo', 'bar=']) (['foo'], {'bar': ''}) >>> parse_options(['foo', 'null']) (['foo', None], {}) >>> parse_options(['foo', 'bar=null']) (['foo'], {'bar': None}) >>> parse_options(['42', '"foo']) Traceback (most recent call last): ... click.exceptions.ClickException: Invalid argument: "foo """ kw_pairs = [x.split("=", 1) for x in options if "=" in x] args = [parse_argument(x) for x in options if "=" not in x] kw = {k: parse_argument(v) for (k, v) in kw_pairs} return (args, kw)
[docs] def get_task_type(scheduler: SchedulerInterface, visit_type: str) -> Optional[Dict]: "Given a visit type, return its associated task type." return scheduler.get_task_type(f"load-{visit_type}")
[docs] def send_to_celery( scheduler: SchedulerInterface, visit_type_to_queue: Dict[str, str], enabled: bool = True, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, policy: str = "oldest_scheduled_first", tablesample: Optional[float] = None, absolute_cooldown: Optional[timedelta] = None, scheduled_cooldown: Optional[timedelta] = None, failed_cooldown: Optional[timedelta] = None, not_found_cooldown: Optional[timedelta] = None, ): """Utility function to read tasks from the scheduler and send those directly to celery. Args: visit_type_to_queue: Optional mapping of visit/loader type (e.g git, svn, ...) to queue to send task to. enabled: Determine whether we want to list enabled or disabled origins. As default, we want reasonably enabled origins. For some edge case, we might want the others. lister_name: Determine the list of origins listed from the lister with name lister_instance_name: Determine the list of origins listed from the lister with instance name policy: the scheduling policy used to select which visits to schedule tablesample: the percentage of the table on which we run the query (None: no sampling) absolute_cooldown: the minimal interval between two visits of the same origin scheduled_cooldown: the minimal interval before which we can schedule the same origin again if it's not been visited failed_cooldown: the minimal interval before which we can reschedule a failed origin not_found_cooldown: the minimal interval before which we can reschedule a not_found origin """ from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import app, get_available_slots from ..utils import create_origin_task_dicts for visit_type_name, queue_name in visit_type_to_queue.items(): task_type = get_task_type(scheduler, visit_type_name) assert task_type is not None task_name = task_type["backend_name"] num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) click.echo(f"{num_tasks} slots available in celery queue") origins = scheduler.grab_next_visits( visit_type_name, num_tasks, policy=policy, tablesample=tablesample, enabled=enabled, lister_name=lister_name, lister_instance_name=lister_instance_name, absolute_cooldown=absolute_cooldown, scheduled_cooldown=scheduled_cooldown, failed_cooldown=failed_cooldown, not_found_cooldown=not_found_cooldown, ) click.echo(f"{len(origins)} visits to send to celery") for task_dict in create_origin_task_dicts(origins, scheduler): app.send_task( task_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, )
[docs] def pretty_print_list(list, indent=0): """Pretty-print a list""" return "".join("%s%r\n" % (" " * indent, item) for item in list)
[docs] def pretty_print_dict(dict, indent=0): """Pretty-print a list""" return "".join( "%s%s: %r\n" % (" " * indent, click.style(key, bold=True), value) for key, value in sorted(dict.items()) )
[docs] def format_dict(d): """Recursively format date objects in the dict passed as argument""" import datetime ret = {} for k, v in d.items(): if isinstance(v, (datetime.date, datetime.datetime)): v = v.isoformat() elif isinstance(v, dict): v = format_dict(v) ret[k] = v return ret
[docs] def pretty_print_run(run, indent=4): fmt = ( "{indent}{backend_id} [{status}]\n" "{indent} scheduled: {scheduled} [{started}:{ended}]" ) return fmt.format(indent=" " * indent, **format_dict(run))
[docs] def pretty_print_task(task, full=False): """Pretty-print a task If 'full' is True, also print the status and priority fields. >>> import datetime >>> task = { ... 'id': 1234, ... 'arguments': { ... 'args': ['foo', 'bar', True], ... 'kwargs': {'key': 'value', 'key2': 42}, ... }, ... 'current_interval': datetime.timedelta(hours=1), ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), ... 'policy': 'oneshot', ... 'priority': None, ... 'status': 'next_run_not_scheduled', ... 'type': 'test_task', ... } >>> print(click.unstyle(pretty_print_task(task))) Task 1234 Next run: ... (2019-02-21T13:52:35.407818) Interval: 1:00:00 Type: test_task Policy: oneshot Args: 'foo' 'bar' True Keyword args: key: 'value' key2: 42 <BLANKLINE> >>> print(click.unstyle(pretty_print_task(task, full=True))) Task 1234 Next run: ... (2019-02-21T13:52:35.407818) Interval: 1:00:00 Type: test_task Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: 'foo' 'bar' True Keyword args: key: 'value' key2: 42 <BLANKLINE> """ import humanize next_run = task["next_run"] lines = [ "%s %s\n" % (click.style("Task", bold=True), task["id"]), click.style(" Next run: ", bold=True), "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), "\n", click.style(" Interval: ", bold=True), str(task["current_interval"]), "\n", click.style(" Type: ", bold=True), task["type"] or "", "\n", click.style(" Policy: ", bold=True), task["policy"] or "", "\n", ] if full: lines += [ click.style(" Status: ", bold=True), task["status"] or "", "\n", click.style(" Priority: ", bold=True), task["priority"] or "", "\n", ] lines += [ click.style(" Args:\n", bold=True), pretty_print_list(task["arguments"]["args"], indent=4), click.style(" Keyword args:\n", bold=True), pretty_print_dict(task["arguments"]["kwargs"], indent=4), ] return "".join(lines)
[docs] def task_add( scheduler: SchedulerInterface, task_type_name: str, args: List[str], kw: Dict, policy: str, priority: Optional[str] = None, next_run: Optional[str] = None, ): """Add a task task_type_name in the scheduler.""" from swh.scheduler.utils import utcnow task = { "type": task_type_name, "policy": policy, "priority": priority, "arguments": { "args": args, "kwargs": kw, }, "next_run": next_run or utcnow(), } created = scheduler.create_tasks([task]) output = [f"Created {len(created)} tasks\n"] for task in created: output.append(pretty_print_task(task)) click.echo("\n".join(output))
[docs] def lister_task_type(lister_name: str, lister_type: Optional[str] = None) -> str: """Compute expected scheduler task type from the lister name and its optional listing type (full, incremental). """ prefix = f"list-{lister_name}" return f"{prefix}-{lister_type}" if lister_type else prefix
[docs] def check_listed_origins( scheduler: SchedulerInterface, lister_name: str, instance_name: str, limit: int = 100000, ): listed_origins_lister = scheduler.get_lister( name=lister_name, instance_name=instance_name ) if listed_origins_lister is None: exit( f"Forge {instance_name} ({lister_name}) isn't registered \ in the scheduler database." ) else: lister_id = listed_origins_lister.id listed_origins = scheduler.get_listed_origins( lister_id=lister_id, enabled=None, limit=limit, ).results if len(listed_origins) == 0: exit( f"Forge {instance_name} ({lister_name}) has {len(listed_origins)} \ listed origin in the scheduler database." ) return listed_origins
[docs] def count_ingested_origins( scheduler: SchedulerInterface, ids: Iterable[Tuple[str, str]], instance_name: str, with_listing: Optional[bool] = False, ) -> Tuple[Dict[str, int], List]: """Count number of ingested origins grouped by status.""" ingested_origins = scheduler.origin_visit_stats_get(ids=ids) status_counters = { "failed": 0, "None": 0, "not_found": 0, "successful": 0, "total": len(ingested_origins), } if with_listing: ingested_origins_table = [] if status_counters["total"] == 0: exit( f"Forge {instance_name} has {len(ingested_origins)} \ scheduled ingest in the scheduler database." ) for ingested_origin in ingested_origins: if ingested_origin.last_visit_status is not None: if with_listing: ingested_origins_table.append( [ ingested_origin.url, ingested_origin.last_visit_status.value, str(ingested_origin.last_visit), ] ) counter_key = ingested_origin.last_visit_status.value else: counter_key = None status_counters[str(counter_key)] += 1 return status_counters, ingested_origins_table if with_listing else []