# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Defines the <swh scheduler origin send-origins-from-file-to-celery> cli utility
functions. This uses a list of origins read from the standard input or file, massage
them into scheduler tasks to send directly to celery to a queue (according to a task
type specified).
The list of origins has been extracted by other means (e.g. sentry extract, combination
of various shell scripts, ...). Then, a human operator provides the list to the cli so
it's consumed by standard swh queues (understand scheduler configured backend).
"""
from __future__ import annotations
from functools import partial
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import TaskType
[docs]
def get_scheduler_task_type(
scheduler: SchedulerInterface, task_type_name: str
) -> TaskType:
"""Retrieve a TaskType instance for a task type name from the scheduler.
Args:
scheduler: Scheduler instance to lookup data from
task_type_name: The task type name to lookup
Raises:
ValueError when task_type_name or its fallback are not found.
Returns:
Information about the task type
"""
origin_task_type_name = task_type_name
# Lookup standard scheduler task (e.g. load-git, load-hg, ...)
scheduler_info = scheduler.get_task_type(task_type_name)
while True:
if scheduler_info:
return scheduler_info
# Lookup task type derivative (e.g. load-git-large, load-hg-bitbucket, ...)
new_task_type_name = task_type_name.rsplit("-", 1)[0]
if new_task_type_name == task_type_name:
error_msg = f"Could not find scheduler <{origin_task_type_name}> task type"
raise ValueError(error_msg)
task_type_name = new_task_type_name
scheduler_info = scheduler.get_task_type(task_type_name)
[docs]
def lines_to_task_args(
lines: Iterable[str],
columns: List[str] = ["url"],
postprocess: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
**kwargs,
) -> Iterator[Dict[str, Any]]:
"""Iterate over the lines and convert them into celery tasks ready to be sent.
Args:
lines: Line read from a file or stdin
columns: structure of the lines to be read (usually only the url column)
postprocess: An optional callable to enrich the task with
**kwargs: extra static arguments to enrich the task with
Yield:
task ready to be sent to celery
"""
for line in lines:
values = line.strip().split()
ret = dict(zip(columns, values))
ret.update(kwargs)
if postprocess:
ret = postprocess(ret)
yield {"args": [], "kwargs": ret}
_to_mercurial_tasks = partial(
lines_to_task_args, columns=["origin_url", "archive_path"]
)
_to_bitbucket_mercurial_tasks = partial(
lines_to_task_args, columns=["url", "directory", "visit_date"]
)
def _to_svn_tasks(lines: Iterable[str], type: str = "svn", **kwargs) -> Iterator[Dict]:
"""Generates proper task argument for the loader-svn worker.
Yields:
svn task
"""
if type == "svn":
yield from lines_to_task_args(lines=lines, columns=["url"], **kwargs)
else:
yield from lines_to_task_args(
lines=lines,
columns=["origin_url", "archive_path"],
**kwargs,
)
def _update_git_task_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Create simple git task kwargs from an url."""
from urllib.parse import urlparse
ret = kwargs.copy()
parsed_url = urlparse(ret["url"])
if parsed_url.netloc == "github.com":
ret.update( # extra information for the loader-metadata
{
"lister_name": "github",
"lister_instance_name": "github",
}
)
return ret
# Generates task argument for loader-git for usual origins
_to_git_normal_tasks = partial(
lines_to_task_args,
columns=["url"],
postprocess=_update_git_task_kwargs,
)
# Generates task argument for loader-git for origins whose packfile is very large
_to_git_large_tasks = partial(
_to_git_normal_tasks,
pack_size_bytes=34359738368,
verify_certs=False,
)
# Default task arguments generator for previously scheduled kind of origins
# It lifts the other callables into a simple dict to ease
TASK_ARGS_GENERATOR_CALLABLES: Dict[str, Callable] = {
"load-svn": _to_svn_tasks,
"load-hg-from-archive-mercurial": partial(
_to_mercurial_tasks,
visit_date="Tue, 3 May 2016 17:16:32 +0200",
),
"load-hg-bitbucket": _to_bitbucket_mercurial_tasks,
"load-git-normal": _to_git_normal_tasks,
"load-git-large": _to_git_large_tasks,
}