Source code for swh.web.client.client

# Copyright (C) 2019-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Python client for the Software Heritage Web API

Light wrapper around requests for the archive API, taking care of data
conversions and pagination.

.. code-block:: python

   from swh.web.client.client import WebAPIClient
   cli = WebAPIClient()

   # retrieve any archived object via its SWHID
   cli.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6')

   # same, but for specific object types
   cli.revision('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6')

   # get() always retrieve entire objects, following pagination
   # WARNING: this might *not* be what you want for large objects
   cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a')

   # type-specific methods support explicit iteration through pages
   next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764'))

"""
import concurrent.futures
from datetime import datetime
import heapq
import logging
import queue
import threading
import time
from typing import (
    Any,
    Callable,
    Collection,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    Union,
)
from urllib.parse import urlparse
import weakref

import attr
import dateutil.parser
import requests
import requests.status_codes

from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.swhids import CoreSWHID, ObjectType
from swh.web.client.cli import DEFAULT_CONFIG

logger = logging.getLogger(__name__)

SWHIDish = Union[CoreSWHID, str]


CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
ORIGIN_VISIT = "origin_visit"
ORIGIN = "origin"


# how many nanoseconds is one second:
#
# We use nanoseconds for the time arythmetic because it does not suffer from
# the same issue as floats with precision and drifting.
_1_SECOND = 1_000_000_000


class _RateLimitInfo:
    """Object that holds rate limit information and can compute delay

    The rate limiting information always come from an http request.

    It holds the following attributes:

    start:         date of the request start (seconds since epoch)
    end:           date of the request end (seconds since epoch)
    limit:         maximum number of requests in the rate limit window
    remaining:     number of remaining requestss
    reset_date:    date of rate limit window reset (second since epoch)

    reset_date_ns: date of rate limit window reset (nanoseconds since epoch)
    wait_ns:       ideal number of nanoseconds to wait between each request.
    free_token:    the number of token available initially
                   (see the `setup_free_token` method for details)

    >>> reset = 150
    >>> ### test replacement logic
    >>> # some old information to replace
    >>> old = _RateLimitInfo(42, 44, 1000, 800, reset)
    >>> # some information with the same date but lower budget
    >>> new = _RateLimitInfo(42, 44, 1000, 700, reset)
    >>> # some information with a later window
    >>> newer = _RateLimitInfo(42, 44, 1000, 100, reset * 2)
    >>> # the old one is replaced by a lower budget
    >>> assert new.replacing(old)
    >>> # the later window replace the older window
    >>> assert newer.replacing(old)
    >>> assert newer.replacing(new)
    >>> ### test delay logic
    >>> # with a full budget
    >>> full = _RateLimitInfo(42, 50, 1000, 1000, reset)
    >>> assert (0.0999 * _1_SECOND) < full.wait_ns < (0.1001 * _1_SECOND)
    >>> # with a half full budget
    >>> half = _RateLimitInfo(42, 50, 1000, 500, reset)
    >>> assert (0.1999 * _1_SECOND) < half.wait_ns < (0.2001 * _1_SECOND)
    >>> # with an empty budget
    >>> empty = _RateLimitInfo(42, 50, 1000, 0, reset)
    >>> assert (99.001 * _1_SECOND) < empty.wait_ns < (100.001 * _1_SECOND)
    """

    # amount of the available budget that we should give for free
    INITIAL_FREE_TOKEN_RATIO = 0.1
    # under which budget should we stop giving away free tokens
    FREE_TOKENS_CUTOFF_RATIO = 0.25

    def __init__(
        self,
        start: float,
        end: float,
        limit: int,
        remaining: int,
        reset: float,
    ):
        self.start = start
        self.end = end
        self.limit = limit
        self.remaining = remaining
        self.reset_date = reset

        self.free_token = 0

    def _lt__(self, other) -> bool:
        return (
            self.reset_date,
            self.start,
            self.end,
            self.remaining,
            self.limit,
            self.free_token,
        ) < (
            other.reset_date,
            other.start,
            other.end,
            other.remaining,
            other.limit,
            other.free_token,
        )

    def __repr__(self) -> str:
        r = "<RateLimitInfo start=%s, end=%s, budget=%d/%d, reset_date=%d>"
        r %= (self.start, self.end, self.remaining, self.limit, self.reset_date)
        return r

    @property
    def reset_date_ns(self) -> int:
        reset_date_ns = int(self.reset_date * _1_SECOND)
        # cache the value for future call
        self.__dict__["reset_date_ns"] = reset_date_ns
        return reset_date_ns

    @property
    def wait_ns(self) -> int:
        duration = (self.reset_date - self.end) * _1_SECOND
        limited = self.remaining
        limited -= self.free_token
        if limited <= 0:
            wait_ns = duration
        else:
            wait_ns = duration // limited
        wait_ns = int(wait_ns)
        wait_ns = max(wait_ns, 1)  # make sure wait_ns > 0
        # cache the value for future call
        self.__dict__["wait_ns"] = wait_ns
        return wait_ns

    def replacing(self, other: "_RateLimitInfo") -> bool:
        """return True if `self` as "better" information that "other"o

        The two criteria to decide something is better are:
        - `self` is about a later rate limiting windows than `other`
        - `self` is about the same window but requires a significantly longer
                 wait time.
        """
        if other.reset_date != self.reset_date:
            # the one with a later reset date is likely more up to date.
            return other.reset_date < self.reset_date

        # replace is significantly stricter
        return other.wait_ns < (self.wait_ns / 0.9)

    def setup_free_token(self) -> None:
        """setup the count of free token, adjusting the wait time

        This is a simple way to provide "progressive" rate limiting, that allow
        fast-paced initial request while slowing larger batch of request.

        This is achieved by initially granting a fraction of the initial budget
        without delay. The remaining budget is then paced on the remaining
        time.

        Example:

            We start with:
            - remaining request 100
            - remaining window: 60 seconds
            - free budget: 10%

            So `setup_free_token` results in:
            - 10 token available,
            - 90 requests paced over 60s, i.e. one request every ⅔ second.

            This give use a basic "progressive" rate in practice:
            - issuing   1 request  requires  0.00 seconds (0.00s / request)
            - issuing  10 requests requires  0.00 seconds (0.00s / request)
            - issuing  11 requests requires  0.66 seconds (0.06s / request)
            - issuing  15 requests requires  3.33 seconds (0.22s / request)
            - issuing  20 requests requires  6.66 seconds (0.33s / request)
            - issuing  50 requests requires 26.66 seconds (0.53s / request)
            - issuing 100 requests requires 60.00 seconds (0.60s / request)

        The progressive effect is still "less" good that having a fancy curve
        for the token generation, this is currently "good enough" for a
        fraction of the complexity.
        """
        if self.remaining / self.limit <= self.FREE_TOKENS_CUTOFF_RATIO:
            # we don't grant free token if the budget is running low as this is
            # the sign that many request are being issued in general.
            self.free_token = 0
        else:
            free = self.remaining * self.INITIAL_FREE_TOKEN_RATIO
            self.free_token = int(free)
        # clear `wait_ns` cache
        self.__dict__.pop("wait_ns", None)


@attr.s(slots=True, order=True)
class _RateLimitEvent:
    """Represent a date at which a rate limiting action is needed for a client

    This is used by _RateLimitEnforcer to schedule actions.
    """

    # when is this event due, (from time.monotonic_ns())
    date = attr.ib(type=int)
    # the rate limit this try to enforce
    rate_limit = attr.ib(type=_RateLimitInfo)
    # the client it applies too
    client_ref = attr.ib(type=weakref.ref)


_ALL_CLIENT_TYPE = weakref.WeakKeyDictionary["WebAPIClient", _RateLimitInfo]


# a pair of Semaphore: `(available, waiting)`
#
# `available`:
#   hold one token for each request we can currently make while respecting the
#   rate limit. It is filled in the background by the _RateLimitEnforcer`
#   thread.
# `waiting`:
#   hold one token for each request currently trying to acquire a "available"
#   token. These tokens are added and removed by the code doing the request.
#   (and ultimately by the `_RateLimitEnforcer` thread through
#   WebAPIClient._clear_rate_limit_tokens)
_RateLimitTokens = Tuple[threading.Semaphore, threading.Semaphore]


class _RateLimitEnforcer:
    """process rate limiting information into rate limiting token

    This object is meant to be run forever in a daemon thread. That daemon
    thread is started by the `_RateLimitEnforcer._get_limiter` class method when
    needed.

    The WebAPIClient send the rate limiting information they receive from the
    server into the `feed` Queue they get from that same `_get_limiter` class
    method, using the `new_info` class method.

    This function process these rate limiting information and slowly issue
    "available request" token to a `threading.Semaphore` instance at the
    appropriate rate.  These "available request" token are consumed by request,
    practically reducing the rate of requests.
    """

    _queue: Optional[queue.Queue] = None
    _limiter: Optional["_RateLimitEnforcer"] = None
    _limiter_thread = None  # not really needed, but lets keep it around.
    _limiter_lock = threading.Lock()

    @classmethod
    def new_info(cls, client: "WebAPIClient", info: _RateLimitInfo) -> None:
        """pass new _RateLimitInfo from client to the _RateLimitEnforcer"""
        feed = cls._get_limiter()
        feed.put((client, info))

    @classmethod
    def current_rate_limit_delay(cls, client: "WebAPIClient") -> float:
        """return the current rate limit delay for this Client (in second)"""
        limiter = cls._limiter
        if limiter is None:
            return 0.0
        rate_limit = limiter._all_clients.get(client)
        if rate_limit is None:
            return 0.0
        wait_ns = rate_limit.wait_ns
        if wait_ns == 1:
            return 0.0
        return max(rate_limit.wait_ns / _1_SECOND, 0.0)

    @classmethod
    def _get_limiter(cls) -> queue.Queue:
        """return the current queue that gather rate limit information

        That function will initialize that Queue and the associated daemon thread
        the first time it is called.
        """
        if cls._queue is None:
            with cls._limiter_lock:
                if cls._queue is None:
                    cls._queue = queue.Queue()
                    cls._limiter = cls(cls._queue)
                    cls._limiter_thread = threading.Thread(
                        target=cls._limiter._run_forever,
                        name=f"{__name__}._RateLimitEnforcer.run",
                        daemon=True,
                    )
                    cls._limiter_thread.start()
        return cls._queue

    def __init__(self, feed: queue.Queue):
        self._feed: queue.Queue = feed
        # a heap of _RateLimitInfo
        #
        # contains a date-ordered list of the futur _RateLimitInfo to proceed.
        #
        self._events: list[_RateLimitEvent] = []
        # a mapping for most up-to-date information for each WebAPIClient
        self._all_clients: _ALL_CLIENT_TYPE = weakref.WeakKeyDictionary()

    def _run_forever(self):
        """main entry points loop forever.

        Proceed new incoming information from Client and managing the
        _RateLimitTokens of the associated client.

        This must be run in a daemonized thread.
        """
        while True:
            self._consume_ready_events()
            self._process_infos()

    def _add_event(self, event: _RateLimitEvent) -> None:
        """schedule a new event"""
        heapq.heappush(self._events, event)

    def _consume_ready_events(self) -> None:
        """Consume Rate Limit event that are ready

        This find all events whose time is up, and process them.
        """

        current = time.monotonic_ns()
        for client, this_event in self._next_events(current):
            client._add_one_rate_limit_token()

            rate_limit = this_event.rate_limit

            # schedule the next event
            if rate_limit.reset_date_ns >= current:
                this_event.date += rate_limit.wait_ns
                self._add_event(this_event)
            else:
                # The windows closed. we should not reschedule an event.
                # The first request in the new window will rearm the logic.
                self._all_clients.pop(client, None)
                client._clear_rate_limit_tokens()

    def _next_events(
        self,
        current: int,
    ) -> Iterator[Tuple["WebAPIClient", _RateLimitEvent]]:
        """iterate over the (client, event) pair that is both ready and valid

        Readiness is computed compared to "current".

        return None if no such events exists.
        """
        while self._events and self._events[0].date <= current:
            event = heapq.heappop(self._events)

            # determine if that event is still valid
            client = event.client_ref()
            if client is None:
                # that client is no longer active
                continue
            latest_rate_limit = self._all_clients.get(client)
            if latest_rate_limit is None:
                # that client is no longer active (narrow race)
                continue
            if latest_rate_limit is not event.rate_limit:
                # that event was superseded by a more recent one, lets ignore it
                continue
            yield (client, event)

    def _process_infos(self):
        """process incoming _RateLimitInfo

        This process all available _RateLimitInfo, then wait until the next
        _RateLimitEvent is due for newer _RateLimitInfo.

        So if no new _RateLimitInfo come, this is equivalent to a sleep until
        the next _RateLimitEvent.
        """
        for current, client, rate_limit in self._next_infos():
            old = self._all_clients.get(client)
            if old is None or rate_limit.replacing(old):
                # We lets consider the time between the generation of this
                # limit server side and its processing negligible
                current = time.monotonic_ns()
                event = _RateLimitEvent(
                    date=current + rate_limit.wait_ns,
                    client_ref=weakref.ref(client),
                    rate_limit=rate_limit,
                )
                self._all_clients[client] = rate_limit
                if old is None:
                    # If this is the initial requests, we give the user a small
                    # free budget
                    #
                    # We do not do this when renewing the windows because we
                    # assume that if some rate limit information was still in
                    # place from the previous windows, the connection is
                    # somewhat heavily used.
                    rate_limit.setup_free_token()
                if old is None or old.reset_date != rate_limit.reset_date:
                    client._refresh_rate_limit_tokens(rate_limit.free_token)
                self._add_event(event)

    def _next_infos(
        self,
    ) -> Iterator[Tuple[int, "WebAPIClient", _RateLimitInfo]]:
        """iterate over the available (client, _RateLimitInfo) pairs

        If no new information are currently available, this wait until then
        wait until the next _RateLimitEvent is due. If no new information
        arrive during that time. The iteration is over.
        """
        while True:
            current = time.monotonic_ns()
            wait_ns = _1_SECOND  # default wait if there is nothing to do
            if self._events:
                wait_ns = self._events[0].date - current
            try:
                # passing timeout 0, or negative timeout will create issue, so
                # we set the minimum to one nano second.
                wait_ns = max(wait_ns, 1)
                wait = wait_ns / _1_SECOND
                client, rate_limit = self._feed.get(timeout=wait)
            except queue.Empty:
                # No external information received.
                #
                # However we reached the next items in `self._events` and need
                # to process them.
                break
            else:
                yield (current, client, rate_limit)


def _free_existing_request(tokens: _RateLimitTokens) -> None:
    r"""Internal Rate Limiting Method. Do not call directly.

    Unlock all waiting requests for a _RateLimitTokens. This is used when
    the rate limit windows for which the tokens applies concludes.

    The _RateLimitTokens consist of two Semaphores, "available" and
    "waiting".

    When this method no other code should be able to get access to that
    token.  Some client might still hold a reference to it, but the
    `_RateLimitEnforcer` will no longer add "available" tokens to it.

    The "waiting" semaphore contains one token for each request currently
    waiting for an available token. The goal is to unlock all of them. So
    add one "available" token for each "waiting-token" we can find. We
    acquire them before issuing the "available-token" otherwise the Thread
    doing request will concurrently acquire their "waiting-token" too,
    making our count wrong.

    /!\ This is an internal method related to rate-limiting management.  /!\
    /!\ It should only be called by the `_RateLimitEnforcer` function. /!\
    """
    available, waiting = tokens
    need_unlock = 0
    while waiting.acquire(blocking=False):
        need_unlock += 1
    for r in range(need_unlock):
        available.release()


def _get_object_id_hex(swhidish: SWHIDish) -> str:
    """Parse string or SWHID and return the hex value of the object_id"""
    if isinstance(swhidish, str):
        swhid = CoreSWHID.from_string(swhidish)
    else:
        swhid = swhidish

    return hash_to_hex(swhid.object_id)


[docs] def typify_json(data: Any, obj_type: str) -> Any: """Type API responses using pythonic types where appropriate The following conversions are performed: - identifiers are converted from strings to SWHID instances - timestamps are converted from strings to datetime.datetime objects """ def to_swhid(object_type: Union[str, ObjectType], s: Any) -> CoreSWHID: if isinstance(object_type, str): parsed_object_type = ObjectType[object_type.upper()] else: parsed_object_type = object_type return CoreSWHID(object_type=parsed_object_type, object_id=hash_to_bytes(s)) def to_date(date: str) -> datetime: return dateutil.parser.parse(date) def to_optional_date(date: Optional[str]) -> Optional[datetime]: return None if date is None else to_date(date) # The date attribute is optional for Revision and Release object def obj_type_of_entry_type(s): if s == "file": return ObjectType.CONTENT elif s == "dir": return ObjectType.DIRECTORY elif s == "rev": return ObjectType.REVISION else: raise ValueError(f"invalid directory entry type: {s}") if obj_type == SNAPSHOT: for name, target in data.items(): if target["target_type"] != "alias": # alias targets do not point to objects via SWHIDs; others do target["target"] = to_swhid(target["target_type"], target["target"]) elif obj_type == REVISION: data["id"] = to_swhid(obj_type, data["id"]) data["directory"] = to_swhid(DIRECTORY, data["directory"]) for key in ("date", "committer_date"): data[key] = to_optional_date(data[key]) for parent in data["parents"]: parent["id"] = to_swhid(REVISION, parent["id"]) elif obj_type == RELEASE: data["id"] = to_swhid(obj_type, data["id"]) data["date"] = to_optional_date(data["date"]) data["target"] = to_swhid(data["target_type"], data["target"]) elif obj_type == DIRECTORY: dir_swhid = None for entry in data: dir_swhid = dir_swhid or to_swhid(obj_type, entry["dir_id"]) entry["dir_id"] = dir_swhid entry["target"] = to_swhid( obj_type_of_entry_type(entry["type"]), entry["target"] ) elif obj_type == CONTENT: pass # nothing to do for contents elif obj_type == ORIGIN_VISIT: data["date"] = to_date(data["date"]) if data["snapshot"] is not None: data["snapshot"] = to_swhid("snapshot", data["snapshot"]) else: raise ValueError(f"invalid object type: {obj_type}") return data
def _parse_limit_header(response) -> Tuple[Optional[int], Optional[int], Optional[int]]: """parse the X-RateLimit Headers if any return a `(Limit, Remaining, Reset)` tuple Limit: containing the requests quota in the time window; Remaining: containing the remaining requests quota in the current window; Reset: date of the current windows reset, as UTC second. """ limit = response.headers.get("X-RateLimit-Limit") if limit is not None: limit = int(limit) remaining = response.headers.get("X-RateLimit-Remaining") if remaining is not None: remaining = int(remaining) reset = response.headers.get("X-RateLimit-Reset") if reset is not None: reset = int(reset) return (limit, remaining, reset) # The maximum amount of SWHID that one can request in a single `known` request KNOWN_QUERY_LIMIT = 1000 def _get_known_chunk(swhids): """slice a list of `swhids` into smaller list of size KNOWN_QUERY_LIMIT""" for i in range(0, len(swhids), KNOWN_QUERY_LIMIT): yield swhids[i : i + KNOWN_QUERY_LIMIT] MAX_RETRY = 10 DEFAULT_RETRY_REASONS = { requests.status_codes.codes.TOO_MANY_REQUESTS, }
[docs] class WebAPIClient: """Client for the Software Heritage archive Web API, see :swh_web:`api/`""" DEFAULT_AUTOMATIC_CONCURENCY = 20 def __init__( self, api_url: str = DEFAULT_CONFIG["api_url"], bearer_token: Optional[str] = DEFAULT_CONFIG["bearer_token"], request_retry=MAX_RETRY, retry_status=DEFAULT_RETRY_REASONS, use_rate_limit: bool = True, automatic_concurrent_queries: bool = True, max_automatic_concurrency: Optional[int] = None, ): """Create a client for the Software Heritage Web API See: :swh_web:`api/` Args: api_url: base URL for API calls bearer_token: optional bearer token to do authenticated API calls use_rate_limit: enable or disable request pacing according to server rate limit information. automatic_concurrent_queries: if :const:`True`, some large requests that need to be chunked might automatically be issued in parallel max_automatic_concurrency: maximum number of concurrent requests when ``automatic_concurrent_queries`` is set With rate limiting enabled (the default), the client will adjust its request rate if the server provides Rate limiting headers. The rate limiting will pace out the available requests evenly in the rate limit windows. (except for a small initial budget as explained below) For example, if there is 600 request remaining for a windows that reset in 5 minutes (300 second), a request will be issuable every 0.5 seconds. This pace will be enforced overall, allowing for period of inactivity between faster spike. For example (using the same number as above): - A client that tries to issue requests continuously will have to wait 0.5 second between each requests. - A client that did not issue requests for 1 minutes (60 seconds) will be able to issue 120 requests right away (60 / 0.5) before having to wait 0.5 second between requests. The above is true regardless of the number of threads using the same WebAPIClient. In practice, to avoid slowing down small application doing few requests, 10% of the available budget is available immediately, the other 90% of the requests being spread out over the rate limit window.o This initial "immediate" budget is only granted if at least 25% of the total request budget is available. """ api_url = api_url.rstrip("/") u = urlparse(api_url) self.api_url = api_url self.api_path = u.path self.bearer_token = bearer_token self._max_retry = request_retry self._retry_status = retry_status self._getters: Dict[ObjectType, Callable[[SWHIDish, bool], Any]] = { ObjectType.CONTENT: self.content, ObjectType.DIRECTORY: self.directory, ObjectType.RELEASE: self.release, ObjectType.REVISION: self.revision, ObjectType.SNAPSHOT: self._get_snapshot, } # assume we will do multiple call and keep the connection alive self._session = requests.Session() self._use_rate_limit: bool = use_rate_limit self._rate_tokens: Optional[_RateLimitTokens] = None self._automatic_concurrent_queries: bool = automatic_concurrent_queries if max_automatic_concurrency is None: max_automatic_concurrency = self.DEFAULT_AUTOMATIC_CONCURENCY self._max_automatic_concurrency: int = max_automatic_concurrency # used for automatic concurrent queries self._thread_pool = None @property def rate_limit_delay(self): """current rate limit delay in second""" return _RateLimitEnforcer.current_rate_limit_delay(self) def _add_one_rate_limit_token(self) -> None: r"""Internal Rate Limiting Method. Do not call directly. This method is called when one extra request can be issued. /!\ This is an internal method related to rate-limiting management. /!\ /!\ It should only be called by the `_RateLimitEnforcer` function. /!\ """ assert self._rate_tokens is not None self._rate_tokens[0].release() def _clear_rate_limit_tokens(self) -> None: r"""Internal Rate Limiting Method. Do not call directly. This is called used when a rate limit window conclude as we reached its end date. At that point we disable rate limiting and free any waiting requests. In some case, information about a new windows will be received before we detect this windows expiration and `_refresh_rate_limit_tokens` will be called instead. /!\ This is an internal method related to rate-limiting management. /!\ /!\ It should only be called by the `_RateLimitEnforcer` function. /!\ """ tokens = self._rate_tokens self._rate_tokens = None if tokens is not None: _free_existing_request(tokens) def _refresh_rate_limit_tokens(self, free_token=0) -> None: r"""Internal Rate Limiting Method. Do not call directly. Setup a new Rate limit Windows by resetting the rate limit Semaphores. This is used when a RateLimitInfo for a newer windows is received. When a rate limit windows conclude, there is two possibles situations: 1) There is no waiting request: the number of available token in ≥ 0. 2) There is waiting request: the number of available token is 0. In the case (1) We need to discard this available tokens. A new Rate limiting window will start, and it need a blank slate. The request we did not do early are irrelevant for that new windows. For example, let says rate limit a window s 1 minute long with 100 request. If the client issued only one request in each of the past two minutes, it used only 2 request out of a 200 total budget. However server side the budget reset for each windows, so at the start of the new windows, the client can only issue 100 request over the next minute. If we preserved token from only window to the next, at the point the client would have 198 available token already (+ 100 to accumulate over the next minute) a number totally disconnected from the server state. So, we have to reset the Semaphore token for each window. However, some request might still be waiting for token on the Semaphore of the old Windows. If we do nothing they would be stuck forever. We could do some fancy logic to transfer these waiting requests to the new semaphore, but is is significantly simpler to just unlock them. They'll consume some of the budget of the new windows, and the rate limiting will adjust to the remaining budget. If the number of waiting request is exceed (or even it close to) the total budget of the next windows, this means request are being made at an unreasonable parallelism level and there will be troubles anyways. /!\ This is an internal method related to rate-limiting management. /!\ /!\ It should only be called by the `_RateLimitEnforcer` function. /!\ """ if not self._use_rate_limit: return tokens = self._rate_tokens self._rate_tokens = ( threading.Semaphore(), # available request threading.Semaphore(), # waiting request ) for i in range(free_token): self._rate_tokens[0].release() if tokens is not None: _free_existing_request(tokens) def _call( self, query: str, http_method: str = "get", **req_args ) -> requests.models.Response: """Dispatcher for archive API invocation Args: query: API method to be invoked, rooted at api_url http_method: HTTP method to be invoked, one of: 'get', 'head' req_args: extra keyword arguments for requests.get()/.head() Raises: requests.HTTPError: if HTTP request fails and http_method is 'get' """ url = None if urlparse(query).scheme: # absolute URL url = query else: # relative URL; prepend base API URL url = "/".join([self.api_url, query]) headers = {} if self.bearer_token is not None: headers = {"Authorization": f"Bearer {self.bearer_token}"} if http_method not in ("get", "post", "head"): raise ValueError(f"unsupported HTTP method: {http_method}") return self._retryable_call(http_method, url, headers, req_args) def _retryable_call(self, http_method, url, headers, req_args): assert http_method in ("get", "post", "head"), http_method retry = self._max_retry delay = 0.1 while retry > 0: retry -= 1 r = self._one_call(http_method, url, headers, req_args) if r.status_code not in self._retry_status: r.raise_for_status() break if logger.isEnabledFor(logging.DEBUG): msg = ( f"HTTP RETRY {http_method} {url}" f" delay={delay:.6f} remaining-tries={retry}" ) logger.debug(msg) time.sleep(delay) delay *= 2 return r def _one_call(self, http_method, url, headers, req_args): """call on request and update rate limit info if available""" assert http_method in ("get", "post", "head"), http_method is_dbg = logger.isEnabledFor(logging.DEBUG) delay = 0 pre_grab = time.monotonic() tokens = self._rate_tokens if tokens is not None: available, waiting = tokens # signal we wait for a token, to ensure a refresh of the rate_token # does not leave us hanging forever. # # See `_free_existing_request(…)` for details. waiting.release() try: # If the `rate_token` tuple changed since we read it, this means # the tokens Semaphore where refreshed, and it might have # happened before our "waiting-token" was registered. Therefore # we cannot 100% rely on the "waiting-token" to ensure we will # eventually get a "available-request-token" available for us. # # We ignore the rate limiting logic in that case. The race is # narrow enough that it is unlikely to create issue in # practice. # # If the `rate_token` tuple did not change, we are certain the # "waiting-token" will be taken in account in the case a # refresh happens while waiting for an "available-request-token". if tokens is self._rate_tokens: # respect the rate limit enforced globally # # the `available` Semaphore is filled by the code in # _RateLimitEnforcer available.acquire() finally: # signal we no longer need to be saved from infinite hang # # We do a non-blocking acquire, because if the _RateLimitTokens # is being discarded, the `_RateLimitEnforcer` might have # acquired *our* "waiting-token" in the process of unlocking # this thread. waiting.acquire(blocking=False) delay = time.monotonic() - pre_grab if is_dbg: dbg_msg = f"HTTP CALL {http_method} {url}" if delay: dbg_msg += f" delay={delay:.6f}" logger.debug(dbg_msg) start = time.time() if http_method == "get": r = self._session.get(url, **req_args, headers=headers) elif http_method == "post": r = self._session.post(url, **req_args, headers=headers) elif http_method == "head": r = self._session.head(url, **req_args, headers=headers) end = time.time() if is_dbg: dbg_msg = f"HTTP REPLY {r.status_code} {http_method} {url}" rate_limit_header = _parse_limit_header(r) if None not in rate_limit_header: new = _RateLimitInfo(start, end, *rate_limit_header) if is_dbg: dbg_msg += " rate-limit-info=%r" % new _RateLimitEnforcer.new_info(self, new) if is_dbg: logger.debug(dbg_msg) return r def _call_groups( self, query: str, args_groups: Collection[Dict[str, Any]], **req_args, ) -> Iterator[requests.models.Response]: """Call the same endpoint multiple times with a series of arguments The responses are yielded in any order. Requests might be issued in parallel according to the value of ``self._automatic_concurrent_queries``. .. note:: Through ``self._rate_tokens``, the actual pace of requests will comply with rate limit information provided by the server. """ if len(args_groups) <= 1 or self._automatic_concurrent_queries: for args in args_groups: loop_args = req_args.copy() loop_args.update(args) yield self._call(query, **loop_args) else: with concurrent.futures.ThreadPoolExecutor( max_workers=self._max_automatic_concurrency ) as executor: pending = [] for args in args_groups: loop_args = req_args.copy() loop_args.update(args) f = executor.submit(self._call, query, **loop_args) pending.append(f) for future in concurrent.futures.as_completed(pending): yield future.result() def _get_snapshot(self, swhid: SWHIDish, typify: bool = True) -> Dict[str, Any]: """Analogous to self.snapshot(), but zipping through partial snapshots, merging them together before returning """ snapshot = {} for snp in self.snapshot(swhid, typify): snapshot.update(snp) return snapshot
[docs] def get(self, swhid: SWHIDish, typify: bool = True, **req_args) -> Any: """Retrieve information about an object of any kind Dispatcher method over the more specific methods content(), directory(), etc. Note that this method will buffer the entire output in case of long, iterable output (e.g., for snapshot()), see the iter() method for streaming. """ if isinstance(swhid, str): obj_type = CoreSWHID.from_string(swhid).object_type else: obj_type = swhid.object_type return self._getters[obj_type](swhid, typify)
[docs] def iter( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Iterator[Dict[str, Any]]: """Stream over the information about an object of any kind Streaming variant of get() """ if isinstance(swhid, str): obj_type = CoreSWHID.from_string(swhid).object_type else: obj_type = swhid.object_type if obj_type == ObjectType.SNAPSHOT: yield from self.snapshot(swhid, typify) elif obj_type == ObjectType.REVISION: yield from [self.revision(swhid, typify)] elif obj_type == ObjectType.RELEASE: yield from [self.release(swhid, typify)] elif obj_type == ObjectType.DIRECTORY: yield from self.directory(swhid, typify) elif obj_type == ObjectType.CONTENT: yield from [self.content(swhid, typify)] else: raise ValueError(f"invalid object type: {obj_type}")
[docs] def content( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Dict[str, Any]: """Retrieve information about a content object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call( f"content/sha1_git:{_get_object_id_hex(swhid)}/", **req_args ).json() return typify_json(json, CONTENT) if typify else json
[docs] def directory( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> List[Dict[str, Any]]: """Retrieve information about a directory object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call(f"directory/{_get_object_id_hex(swhid)}/", **req_args).json() return typify_json(json, DIRECTORY) if typify else json
[docs] def revision( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Dict[str, Any]: """Retrieve information about a revision object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call(f"revision/{_get_object_id_hex(swhid)}/", **req_args).json() return typify_json(json, REVISION) if typify else json
[docs] def release( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Dict[str, Any]: """Retrieve information about a release object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call(f"release/{_get_object_id_hex(swhid)}/", **req_args).json() return typify_json(json, RELEASE) if typify else json
[docs] def snapshot( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Iterator[Dict[str, Any]]: """Retrieve information about a snapshot object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Returns: an iterator over partial snapshots (dictionaries mapping branch names to information about where they point to), each containing a subset of available branches Raises: requests.HTTPError: if HTTP request fails """ done = False r = None query = f"snapshot/{_get_object_id_hex(swhid)}/" while not done: r = self._call(query, http_method="get", **req_args) json = r.json()["branches"] yield typify_json(json, SNAPSHOT) if typify else json if "next" in r.links and "url" in r.links["next"]: query = r.links["next"]["url"] else: done = True
[docs] def visits( self, origin: str, per_page: Optional[int] = None, last_visit: Optional[int] = None, typify: bool = True, **req_args, ) -> Iterator[Dict[str, Any]]: """List visits of an origin Args: origin: the URL of a software origin per_page: the number of visits to list last_visit: visit to start listing from typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Returns: an iterator over visits of the origin Raises: requests.HTTPError: if HTTP request fails """ done = False r = None params = [] if last_visit is not None: params.append(("last_visit", last_visit)) if per_page is not None: params.append(("per_page", per_page)) query = f"origin/{origin}/visits/" while not done: r = self._call(query, http_method="get", params=params, **req_args) yield from [typify_json(v, ORIGIN_VISIT) if typify else v for v in r.json()] if "next" in r.links and "url" in r.links["next"]: params = [] query = r.links["next"]["url"] else: done = True
[docs] def last_visit(self, origin: str, typify: bool = True) -> Dict[str, Any]: """Return the last visit of an origin. Args: origin: the URL of a software origin typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) Returns: The last visit for that origin Raises: requests.HTTPError: if HTTP request fails """ query = f"origin/{origin}/visit/latest/" r = self._call(query, http_method="get") visit = r.json() return typify_json(visit, ORIGIN_VISIT) if typify else visit
[docs] def known( self, swhids: Iterable[SWHIDish], **req_args ) -> Dict[CoreSWHID, Dict[Any, Any]]: """Verify the presence in the archive of several objects at once Args: swhids: SWHIDs of the objects to verify Returns: a dictionary mapping object SWHIDs to archive information about them; the dictionary includes a "known" key associated to a boolean value that is true if and only if the object is known to the archive Raises: requests.HTTPError: if HTTP request fails """ all_swh_ids = list(swhids) chunks = [list(map(str, c)) for c in _get_known_chunk(all_swh_ids)] args_group = [{"json": ids} for ids in chunks] req_args["http_method"] = "post" responses = self._call_groups("known/", args_group, **req_args) replies = (i for r in responses for i in r.json().items()) return {CoreSWHID.from_string(k): v for k, v in replies}
[docs] def content_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a content object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"content/sha1_git:{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) )
[docs] def directory_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a directory object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"directory/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) )
[docs] def revision_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a revision object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"revision/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) )
[docs] def release_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a release object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"release/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) )
[docs] def snapshot_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a snapshot object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"snapshot/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) )
[docs] def origin_exists(self, origin: str, **req_args) -> bool: """Check if an origin object exists in the archive Args: origin: the URL of a software origin req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"origin/{origin}/get/", http_method="head", **req_args, ) )
[docs] def content_raw(self, swhid: SWHIDish, **req_args) -> Iterator[bytes]: """Iterate over the raw content of a content object Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ r = self._call( f"content/sha1_git:{_get_object_id_hex(swhid)}/raw/", stream=True, **req_args, ) r.raise_for_status() yield from r.iter_content(chunk_size=None, decode_unicode=False)
[docs] def origin_save(self, visit_type: str, origin: str) -> Dict: """Save code now query for the origin with visit_type. Args: visit_type: Type of the visit origin: the origin to save Returns: The resulting dict of the visit saved Raises: requests.HTTPError: if HTTP request fails """ q = f"origin/save/{visit_type}/url/{origin}/" r = self._call(q, http_method="post") return r.json()
[docs] def get_origin(self, swhid: CoreSWHID) -> Optional[Any]: """Walk the compressed graph to discover the origin of a given swhid This method exist for the swh-scanner and is likely to change significantly and/or be replaced, we do not recommend using it. """ key = str(swhid) q = ( f"graph/randomwalk/{key}/ori/" f"?direction=backward&limit=-1&resolve_origins=true" ) with self._call(q, http_method="get") as r: return r.text
[docs] def cooking_request( self, bundle_type: str, swhid: SWHIDish, email: Optional[str] = None, **req_args ) -> Dict[str, Any]: """Request a cooking of a bundle Args: bundle_type: Type of the bundle swhid: object persistent identifier email: e-mail to notify when the archive is ready req_args: extra keyword arguments for requests.post() Returns: an object containing the following keys: fetch_url (string): the url from which to download the archive progress_message (string): message describing the cooking task progress id (number): the cooking task id status (string): the cooking task status (new/pending/done/failed) swhid (string): the identifier of the object to cook Raises: requests.HTTPError: if HTTP request fails """ q = f"vault/{bundle_type}/{swhid}/" r = self._call( q, http_method="post", json={"email": email}, **req_args, ) r.raise_for_status() return r.json()
[docs] def cooking_check( self, bundle_type: str, swhid: SWHIDish, **req_args ) -> Dict[str, Any]: """Check the status of a cooking task Args: bundle_type: Type of the bundle swhid: object persistent identifier req_args: extra keyword arguments for requests.get() Returns: an object containing the following keys: fetch_url (string): the url from which to download the archive progress_message (string): message describing the cooking task progress id (number): the cooking task id status (string): the cooking task status (new/pending/done/failed) swhid (string): the identifier of the object to cook Raises: requests.HTTPError: if HTTP request fails """ q = f"vault/{bundle_type}/{swhid}/" r = self._call( q, http_method="get", **req_args, ) r.raise_for_status() return r.json()
[docs] def cooking_fetch( self, bundle_type: str, swhid: SWHIDish, **req_args ) -> requests.models.Response: """Fetch the archive of a cooking task Args: bundle_type: Type of the bundle swhid: object persistent identifier req_args: extra keyword arguments for requests.get() Returns: a requests.models.Response object containing a stream of the archive Raises: requests.HTTPError: if HTTP request fails """ q = f"vault/{bundle_type}/{swhid}/raw" r = self._call( q, http_method="get", stream=True, **req_args, ) r.raise_for_status() return r