# Copyright (C) 2018-2024 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from pathlib import Path
import re
from typing import Any, Iterator, List, Optional, Tuple
from urllib.parse import parse_qsl, urlparse
from requests.exceptions import ConnectionError, InvalidSchema, SSLError
from swh.core.tarball import MIMETYPE_TO_ARCHIVE_FORMAT
from swh.lister import TARBALL_EXTENSIONS
logger = logging.getLogger(__name__)
[docs]
def split_range(total_pages: int, nb_pages: int) -> Iterator[Tuple[int, int]]:
"""Split `total_pages` into mostly `nb_pages` ranges. In some cases, the last range can
have one more element.
>>> list(split_range(19, 10))
[(0, 9), (10, 19)]
>>> list(split_range(20, 3))
[(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)]
>>> list(split_range(21, 3))
[(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21)]
"""
prev_index = None
for index in range(0, total_pages, nb_pages):
if index is not None and prev_index is not None:
yield prev_index, index - 1
prev_index = index
if index != total_pages:
yield index, total_pages
[docs]
def is_valid_origin_url(url: Optional[str]) -> bool:
"""Returns whether the given string is a valid origin URL.
This excludes Git SSH URLs and pseudo-URLs (eg. ``ssh://git@example.org:foo``
and ``git@example.org:foo``), as they are not supported by the Git loader
and usually require authentication.
All HTTP URLs are allowed:
>>> is_valid_origin_url("http://example.org/repo.git")
True
>>> is_valid_origin_url("http://example.org/repo")
True
>>> is_valid_origin_url("https://example.org/repo")
True
>>> is_valid_origin_url("https://foo:bar@example.org/repo")
True
Scheme-less URLs are rejected;
>>> is_valid_origin_url("example.org/repo")
False
>>> is_valid_origin_url("example.org:repo")
False
Git SSH URLs and pseudo-URLs are rejected:
>>> is_valid_origin_url("git@example.org:repo")
False
>>> is_valid_origin_url("ssh://git@example.org:repo")
False
"""
if not url:
# Empty or None
return False
parsed = urlparse(url)
if not parsed.netloc:
# Is parsed as a relative URL
return False
if parsed.scheme == "ssh":
# Git SSH URL
return False
return True
[docs]
class ArtifactNatureUndetected(ValueError):
"""Raised when a remote artifact's nature (tarball, file) cannot be detected."""
pass
[docs]
class ArtifactNatureMistyped(ValueError):
"""Raised when a remote artifact is neither a tarball nor a file.
Error of this type are' probably a misconfiguration in the manifest generation that
badly typed a vcs repository.
"""
pass
[docs]
class ArtifactWithoutExtension(ValueError):
"""Raised when an artifact nature cannot be determined by its name."""
pass
# Rough approximation of what we can find of mimetypes for tarballs "out there"
POSSIBLE_TARBALL_MIMETYPES = tuple(MIMETYPE_TO_ARCHIVE_FORMAT.keys())
PATTERN_VERSION = re.compile(r"(v*[0-9]+[.])([0-9]+[.]*)+")
[docs]
def url_contains_tarball_filename(
urlparsed, extensions: List[str], raise_when_no_extension: bool = True
) -> bool:
"""Determine whether urlparsed contains a tarball filename ending with one of the
extensions passed as parameter, path parts and query parameters are checked.
This also account for the edge case of a filename with only a version as name (so no
extension in the end.)
Raises:
ArtifactWithoutExtension in case no extension is available and
raise_when_no_extension is True (the default)
"""
paths = [Path(p) for (_, p) in [("_", urlparsed.path)] + parse_qsl(urlparsed.query)]
match = any(
path_part.endswith(tuple(extensions))
for path in paths
for path_part in path.parts
)
if match:
return match
if raise_when_no_extension and not any(path.suffix != "" for path in paths):
raise ArtifactWithoutExtension
# Some false negative can happen (e.g. https://<netloc>/path/0.1.5)), so make sure
# to catch those
name = Path(urlparsed.path).name
if not PATTERN_VERSION.match(name):
return match
if raise_when_no_extension:
raise ArtifactWithoutExtension
return False
[docs]
def is_tarball(
urls: List[str],
request: Optional[Any] = None,
) -> Tuple[bool, str]:
"""Determine whether a list of files actually are tarball or simple files.
This iterates over the list of urls provided to detect the artifact's nature. When
this cannot be answered simply out of the url and ``request`` is provided, this
executes a HTTP `HEAD` query on the url to determine the information. If request is
not provided, this raises an ArtifactNatureUndetected exception.
If, at the end of the iteration on the urls, no detection could be deduced, this
raises an ArtifactNatureUndetected.
Args:
urls: name of the remote files to check for artifact nature.
request: (Optional) Request object allowing http calls. If not provided and
naive check cannot detect anything, this raises ArtifactNatureUndetected.
Raises:
ArtifactNatureUndetected when the artifact's nature cannot be detected out
of its urls
ArtifactNatureMistyped when the artifact is not a tarball nor a file. It's up to
the caller to do what's right with it.
Returns: A tuple (bool, url). The boolean represents whether the url is an archive
or not. The second parameter is the actual url once the head request is issued
as a fallback of not finding out whether the urls are tarballs or not.
"""
def _is_tarball(url):
"""Determine out of an extension whether url is a tarball.
Raises:
ArtifactWithoutExtension in case no extension is available
"""
urlparsed = urlparse(url)
if urlparsed.scheme not in ("http", "https", "ftp"):
raise ArtifactNatureMistyped(f"Mistyped artifact '{url}'")
return url_contains_tarball_filename(urlparsed, TARBALL_EXTENSIONS)
# Check all urls and as soon as an url allows the nature detection, this stops.
exceptions_to_raise = []
for url in urls:
try:
return _is_tarball(url), urls[0]
except ArtifactWithoutExtension:
if request is None:
exc = ArtifactNatureUndetected(
f"Cannot determine artifact type from url <{url}>"
)
exceptions_to_raise.append(exc)
continue
logger.warning(
"Cannot detect extension for <%s>. Fallback to http head query",
url,
)
try:
response = request.head(url)
except (InvalidSchema, SSLError, ConnectionError):
exc = ArtifactNatureUndetected(
f"Cannot determine artifact type from url <{url}>"
)
exceptions_to_raise.append(exc)
continue
if not response.ok or response.status_code == 404:
exc = ArtifactNatureUndetected(
f"Cannot determine artifact type from url <{url}>"
)
exceptions_to_raise.append(exc)
continue
location = response.headers.get("Location")
if location: # It's not always present
logger.debug("Location: %s", location)
try:
return _is_tarball(location), url
except ArtifactWithoutExtension:
logger.warning(
"Still cannot detect extension through location <%s>...",
url,
)
origin = urls[0]
content_type = response.headers.get("Content-Type")
if content_type:
logger.debug("Content-Type: %s", content_type)
if content_type == "application/json":
return False, origin
return content_type.startswith(POSSIBLE_TARBALL_MIMETYPES), origin
content_disposition = response.headers.get("Content-Disposition")
if content_disposition:
logger.debug("Content-Disposition: %s", content_disposition)
if "filename=" in content_disposition:
fields = content_disposition.split("; ")
for field in fields:
if "filename=" in field:
_, filename = field.split("filename=")
break
return (
url_contains_tarball_filename(
urlparse(filename),
TARBALL_EXTENSIONS,
raise_when_no_extension=False,
),
origin,
)
if len(exceptions_to_raise) > 0:
raise exceptions_to_raise[0]
raise ArtifactNatureUndetected(
f"Cannot determine artifact type from url <{urls[0]}>"
)