Source code for swh.lister.utils

# Copyright (C) 2018-2024 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


import logging
from pathlib import Path
import re
from typing import Any, Iterator, List, Optional, Tuple
from urllib.parse import parse_qsl, urlparse

from requests.exceptions import ConnectionError, InvalidSchema, SSLError

from swh.core.tarball import MIMETYPE_TO_ARCHIVE_FORMAT
from swh.lister import TARBALL_EXTENSIONS

logger = logging.getLogger(__name__)


[docs] def split_range(total_pages: int, nb_pages: int) -> Iterator[Tuple[int, int]]: """Split `total_pages` into mostly `nb_pages` ranges. In some cases, the last range can have one more element. >>> list(split_range(19, 10)) [(0, 9), (10, 19)] >>> list(split_range(20, 3)) [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)] >>> list(split_range(21, 3)) [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21)] """ prev_index = None for index in range(0, total_pages, nb_pages): if index is not None and prev_index is not None: yield prev_index, index - 1 prev_index = index if index != total_pages: yield index, total_pages
[docs] def is_valid_origin_url(url: Optional[str]) -> bool: """Returns whether the given string is a valid origin URL. This excludes Git SSH URLs and pseudo-URLs (eg. ``ssh://git@example.org:foo`` and ``git@example.org:foo``), as they are not supported by the Git loader and usually require authentication. All HTTP URLs are allowed: >>> is_valid_origin_url("http://example.org/repo.git") True >>> is_valid_origin_url("http://example.org/repo") True >>> is_valid_origin_url("https://example.org/repo") True >>> is_valid_origin_url("https://foo:bar@example.org/repo") True Scheme-less URLs are rejected; >>> is_valid_origin_url("example.org/repo") False >>> is_valid_origin_url("example.org:repo") False Git SSH URLs and pseudo-URLs are rejected: >>> is_valid_origin_url("git@example.org:repo") False >>> is_valid_origin_url("ssh://git@example.org:repo") False """ if not url: # Empty or None return False parsed = urlparse(url) if not parsed.netloc: # Is parsed as a relative URL return False if parsed.scheme == "ssh": # Git SSH URL return False return True
[docs] class ArtifactNatureUndetected(ValueError): """Raised when a remote artifact's nature (tarball, file) cannot be detected.""" pass
[docs] class ArtifactNatureMistyped(ValueError): """Raised when a remote artifact is neither a tarball nor a file. Error of this type are' probably a misconfiguration in the manifest generation that badly typed a vcs repository. """ pass
[docs] class ArtifactWithoutExtension(ValueError): """Raised when an artifact nature cannot be determined by its name.""" pass
# Rough approximation of what we can find of mimetypes for tarballs "out there" POSSIBLE_TARBALL_MIMETYPES = tuple(MIMETYPE_TO_ARCHIVE_FORMAT.keys()) PATTERN_VERSION = re.compile(r"(v*[0-9]+[.])([0-9]+[.]*)+")
[docs] def url_contains_tarball_filename( urlparsed, extensions: List[str], raise_when_no_extension: bool = True ) -> bool: """Determine whether urlparsed contains a tarball filename ending with one of the extensions passed as parameter, path parts and query parameters are checked. This also account for the edge case of a filename with only a version as name (so no extension in the end.) Raises: ArtifactWithoutExtension in case no extension is available and raise_when_no_extension is True (the default) """ paths = [Path(p) for (_, p) in [("_", urlparsed.path)] + parse_qsl(urlparsed.query)] match = any( path_part.endswith(tuple(extensions)) for path in paths for path_part in path.parts ) if match: return match if raise_when_no_extension and not any(path.suffix != "" for path in paths): raise ArtifactWithoutExtension # Some false negative can happen (e.g. https://<netloc>/path/0.1.5)), so make sure # to catch those name = Path(urlparsed.path).name if not PATTERN_VERSION.match(name): return match if raise_when_no_extension: raise ArtifactWithoutExtension return False
[docs] def is_tarball( urls: List[str], request: Optional[Any] = None, ) -> Tuple[bool, str]: """Determine whether a list of files actually are tarball or simple files. This iterates over the list of urls provided to detect the artifact's nature. When this cannot be answered simply out of the url and ``request`` is provided, this executes a HTTP `HEAD` query on the url to determine the information. If request is not provided, this raises an ArtifactNatureUndetected exception. If, at the end of the iteration on the urls, no detection could be deduced, this raises an ArtifactNatureUndetected. Args: urls: name of the remote files to check for artifact nature. request: (Optional) Request object allowing http calls. If not provided and naive check cannot detect anything, this raises ArtifactNatureUndetected. Raises: ArtifactNatureUndetected when the artifact's nature cannot be detected out of its urls ArtifactNatureMistyped when the artifact is not a tarball nor a file. It's up to the caller to do what's right with it. Returns: A tuple (bool, url). The boolean represents whether the url is an archive or not. The second parameter is the actual url once the head request is issued as a fallback of not finding out whether the urls are tarballs or not. """ def _is_tarball(url): """Determine out of an extension whether url is a tarball. Raises: ArtifactWithoutExtension in case no extension is available """ urlparsed = urlparse(url) if urlparsed.scheme not in ("http", "https", "ftp"): raise ArtifactNatureMistyped(f"Mistyped artifact '{url}'") return url_contains_tarball_filename(urlparsed, TARBALL_EXTENSIONS) # Check all urls and as soon as an url allows the nature detection, this stops. exceptions_to_raise = [] for url in urls: try: return _is_tarball(url), urls[0] except ArtifactWithoutExtension: if request is None: exc = ArtifactNatureUndetected( f"Cannot determine artifact type from url <{url}>" ) exceptions_to_raise.append(exc) continue logger.warning( "Cannot detect extension for <%s>. Fallback to http head query", url, ) try: response = request.head(url) except (InvalidSchema, SSLError, ConnectionError): exc = ArtifactNatureUndetected( f"Cannot determine artifact type from url <{url}>" ) exceptions_to_raise.append(exc) continue if not response.ok or response.status_code == 404: exc = ArtifactNatureUndetected( f"Cannot determine artifact type from url <{url}>" ) exceptions_to_raise.append(exc) continue location = response.headers.get("Location") if location: # It's not always present logger.debug("Location: %s", location) try: return _is_tarball(location), url except ArtifactWithoutExtension: logger.warning( "Still cannot detect extension through location <%s>...", url, ) origin = urls[0] content_type = response.headers.get("Content-Type") if content_type: logger.debug("Content-Type: %s", content_type) if content_type == "application/json": return False, origin return content_type.startswith(POSSIBLE_TARBALL_MIMETYPES), origin content_disposition = response.headers.get("Content-Disposition") if content_disposition: logger.debug("Content-Disposition: %s", content_disposition) if "filename=" in content_disposition: fields = content_disposition.split("; ") for field in fields: if "filename=" in field: _, filename = field.split("filename=") break return ( url_contains_tarball_filename( urlparse(filename), TARBALL_EXTENSIONS, raise_when_no_extension=False, ), origin, ) if len(exceptions_to_raise) > 0: raise exceptions_to_raise[0] raise ArtifactNatureUndetected( f"Cannot determine artifact type from url <{urls[0]}>" )