Source code for swh.scheduler.cli.origin_utils

# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Defines the <swh scheduler origin send-origins-from-file-to-celery> cli utility
functions. This uses a list of origins read from the standard input or file, massage
them into scheduler tasks to send directly to celery to a queue (according to a task
type specified).

The list of origins has been extracted by other means (e.g. sentry extract, combination
of various shell scripts, ...). Then, a human operator provides the list to the cli so
it's consumed by standard swh queues (understand scheduler configured backend).

"""

from __future__ import annotations

from functools import partial
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional

    from swh.scheduler.interface import SchedulerInterface
    from swh.scheduler.model import TaskType


[docs] def get_scheduler_task_type( scheduler: SchedulerInterface, task_type_name: str ) -> TaskType: """Retrieve a TaskType instance for a task type name from the scheduler. Args: scheduler: Scheduler instance to lookup data from task_type_name: The task type name to lookup Raises: ValueError when task_type_name or its fallback are not found. Returns: Information about the task type """ origin_task_type_name = task_type_name # Lookup standard scheduler task (e.g. load-git, load-hg, ...) scheduler_info = scheduler.get_task_type(task_type_name) while True: if scheduler_info: return scheduler_info # Lookup task type derivative (e.g. load-git-large, load-hg-bitbucket, ...) new_task_type_name = task_type_name.rsplit("-", 1)[0] if new_task_type_name == task_type_name: error_msg = f"Could not find scheduler <{origin_task_type_name}> task type" raise ValueError(error_msg) task_type_name = new_task_type_name scheduler_info = scheduler.get_task_type(task_type_name)
[docs] def lines_to_task_args( lines: Iterable[str], columns: List[str] = ["url"], postprocess: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, **kwargs, ) -> Iterator[Dict[str, Any]]: """Iterate over the lines and convert them into celery tasks ready to be sent. Args: lines: Line read from a file or stdin columns: structure of the lines to be read (usually only the url column) postprocess: An optional callable to enrich the task with **kwargs: extra static arguments to enrich the task with Yield: task ready to be sent to celery """ for line in lines: values = line.strip().split() ret = dict(zip(columns, values)) ret.update(kwargs) if postprocess: ret = postprocess(ret) yield {"args": [], "kwargs": ret}
_to_mercurial_tasks = partial( lines_to_task_args, columns=["origin_url", "archive_path"] ) _to_bitbucket_mercurial_tasks = partial( lines_to_task_args, columns=["url", "directory", "visit_date"] ) def _to_svn_tasks(lines: Iterable[str], type: str = "svn", **kwargs) -> Iterator[Dict]: """Generates proper task argument for the loader-svn worker. Yields: svn task """ if type == "svn": yield from lines_to_task_args(lines=lines, columns=["url"], **kwargs) else: yield from lines_to_task_args( lines=lines, columns=["origin_url", "archive_path"], **kwargs, ) def _update_git_task_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]: """Create simple git task kwargs from an url.""" from urllib.parse import urlparse ret = kwargs.copy() parsed_url = urlparse(ret["url"]) if parsed_url.netloc == "github.com": ret.update( # extra information for the loader-metadata { "lister_name": "github", "lister_instance_name": "github", } ) return ret # Generates task argument for loader-git for usual origins _to_git_normal_tasks = partial( lines_to_task_args, columns=["url"], postprocess=_update_git_task_kwargs, ) # Generates task argument for loader-git for origins whose packfile is very large _to_git_large_tasks = partial( _to_git_normal_tasks, pack_size_bytes=34359738368, verify_certs=False, ) # Default task arguments generator for previously scheduled kind of origins # It lifts the other callables into a simple dict to ease TASK_ARGS_GENERATOR_CALLABLES: Dict[str, Callable] = { "load-svn": _to_svn_tasks, "load-hg-from-archive-mercurial": partial( _to_mercurial_tasks, visit_date="Tue, 3 May 2016 17:16:32 +0200", ), "load-hg-bitbucket": _to_bitbucket_mercurial_tasks, "load-git-normal": _to_git_normal_tasks, "load-git-large": _to_git_large_tasks, }