Source code for swh.scheduler.cli.task_type

# Copyright (C) 2016-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from __future__ import annotations

from datetime import timedelta
from importlib import import_module
import logging
from typing import TYPE_CHECKING, Any, Dict

# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from pkg_resources import iter_entry_points

from swh.scheduler.model import TaskType

if TYPE_CHECKING:
    from swh.scheduler.task import SWHTask

from . import cli

logger = logging.getLogger(__name__)


DEFAULT_TASK_TYPE_PARAMETERS = {
    "full": dict(  # for tasks like 'list_xxx_full()'
        default_interval=timedelta(days=90),
        min_interval=timedelta(days=90),
        max_interval=timedelta(days=90),
        backoff_factor=1.0,
    ),
    "*": dict(  # value if not suffix matches
        default_interval=timedelta(days=1),
        min_interval=timedelta(days=1),
        max_interval=timedelta(days=1),
        backoff_factor=1.0,
    ),
}


def _plugin_worker_descriptions():
    return {
        entry_point.name: entry_point
        for entry_point in iter_entry_points("swh.workers")
    }


@cli.group("task-type")
@click.pass_context
def task_type(ctx):
    """Manipulate task types.

    Expected configuration:

    \b
    * :ref:`cli-config-scheduler`
    """
    scheduler = ctx.obj["scheduler"]
    if not scheduler:
        ctx.fail("Scheduler class (local/remote) must be instantiated")


@task_type.command("list")
@click.option("--verbose", "-v", is_flag=True, default=False, help="Verbose mode")
@click.option(
    "--task_type",
    "-t",
    multiple=True,
    default=None,
    help="List task types of given type",
)
@click.option(
    "--task_name",
    "-n",
    multiple=True,
    default=None,
    help="List task types of given backend task name",
)
@click.pass_context
def list_task_types(ctx, verbose, task_type, task_name):
    from operator import attrgetter

    click.echo("Known task types:")
    if verbose:
        tmpl = (
            click.style("{type}: ", bold=True)
            + """{backend_name}
  {description}
  interval: {default_interval} [{min_interval}, {max_interval}]
  backoff_factor: {backoff_factor}
  max_queue_length: {max_queue_length}
  num_retries: {num_retries}
  retry_delay: {retry_delay}
"""
        )
    else:
        tmpl = "{type}:\n  {description}"
    for tasktype in sorted(
        ctx.obj["scheduler"].get_task_types(), key=attrgetter("type")
    ):
        if task_type and tasktype.type not in task_type:
            continue
        if task_name and tasktype.backend_name not in task_name:
            continue
        click.echo(tmpl.format(**tasktype.to_dict()))


@task_type.command("register")
@click.option(
    "--plugins",
    "-p",
    "plugins",
    multiple=True,
    help="Registers task-types for provided plugins. " "Defaults to all",
)
@click.pass_context
def register_task_types(ctx, plugins):
    """Register missing task-type entries in the scheduler.

    According to declared tasks in each loaded worker (e.g. lister, loader,
    ...) plugins.

    """
    import celery.app.task

    scheduler = ctx.obj["scheduler"]

    plugin_worker_descriptions = _plugin_worker_descriptions()
    plugin_names = list(sorted(plugin_worker_descriptions))

    if not plugins or plugins == ("all",):
        plugins = plugin_names
    else:
        unknown_plugins = [plugin for plugin in plugins if plugin not in plugin_names]
        if unknown_plugins:
            if len(unknown_plugins) == 1:
                error_msg = f"That provided plugin is unknown: {unknown_plugins[0]}."
            else:
                error_msg = (
                    f"Those provided plugins are unknown: {', '.join(unknown_plugins)}."
                )
            ctx.fail(f"{error_msg}\nAvailable ones are: {', '.join(plugin_names)}.")

    for plugin in plugins:
        entrypoint = plugin_worker_descriptions[plugin]
        logger.info("Loading entrypoint for plugin %s", plugin)
        registry_entry = entrypoint.load()()

        for task_module in registry_entry["task_modules"]:
            mod = import_module(task_module)
            for task_name in (x for x in dir(mod) if not x.startswith("_")):
                logger.debug("Loading task name %s", task_name)
                taskobj = getattr(mod, task_name)
                if isinstance(taskobj, celery.app.task.Task):
                    tt_name = task_name.replace("_", "-")
                    task_cfg = registry_entry.get("task_types", {}).get(tt_name, {})
                    ensure_task_type(task_module, tt_name, taskobj, task_cfg, scheduler)


[docs] def ensure_task_type( task_module: str, task_type_name: str, swhtask: SWHTask, task_config: Dict[str, Any], scheduler, ): """Ensure a given task-type (for the task_module) exists in the scheduler. Args: task_module: task module we are currently checking for task type consistency task_type: the type of the task to check/insert (correspond to the 'type' field in the db) swhtask: the SWHTask instance the task-type correspond to task_config: a dict with specific/overloaded values for the task-type to be created scheduler: the scheduler object used to access the scheduler db """ task_type_params = dict( type=task_type_name, backend_name=swhtask.name, description=swhtask.__doc__, ) for suffix, defaults in DEFAULT_TASK_TYPE_PARAMETERS.items(): if task_type_name.endswith("-" + suffix): task_type_params.update(defaults) break else: task_type_params.update(DEFAULT_TASK_TYPE_PARAMETERS["*"]) task_type_params.update(task_config) task_type = TaskType(**task_type_params) current_task_type = scheduler.get_task_type(task_type_name) if current_task_type: # Ensure the existing task_type is consistent in the scheduler if current_task_type.backend_name != task_type.backend_name: logger.warning( "Existing task type %s for module %s has a " "different backend name than current " "code version provides (%s vs. %s)", task_type, task_module, current_task_type.backend_name, task_type.backend_name, ) else: logger.info("Create task type %s in scheduler", task_type_name) logger.debug(" %s", task_type) scheduler.create_task_type(task_type)
@task_type.command("add") @click.argument("type", required=True) @click.argument("task-name", required=True) @click.argument("description", required=True) @click.option( "--default-interval", "-i", default="90 days", help='Default interval ("90 days" by default)', ) @click.option( "--min-interval", default=None, help="Minimum interval (default interval if not set)", ) @click.option( "--max-interval", "-i", default=None, help="Maximal interval (default interval if not set)", ) @click.option("--backoff-factor", "-f", type=float, default=1, help="Backoff factor") @click.pass_context def add_task_type( ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor, ): """Create a new task type""" task_type = TaskType( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, ) ctx.obj["scheduler"].create_task_type(task_type) click.echo("OK")