# Copyright (C) 2018-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import logging
from time import sleep
from typing import Any, Dict, Iterator, List, Optional, Tuple
from xmlrpc.client import Fault, ServerProxy
from tenacity.before_sleep import before_sleep_log
from swh.core.retry import http_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
# Type returned by the XML-RPC changelog call:
# package, version, release timestamp, description, serial
ChangelogEntry = Tuple[str, str, int, str, int]
# Manipulated package updated type which is a subset information
# of the ChangelogEntry type: package, max release date
PackageUpdate = Tuple[str, datetime]
# Type returned by listing a page of results
PackageListPage = List[PackageUpdate]
[docs]
@dataclass
class PyPIListerState:
"""State of PyPI lister"""
last_serial: Optional[int] = None
"""Last seen serial when visiting the pypi instance"""
def _if_rate_limited(retry_state) -> bool:
"""Custom tenacity retry predicate to handle xmlrpc client error:
.. code::
xmlrpc.client.Fault: <Fault -32500: 'HTTPTooManyRequests: The action could not
be performed because there were too many requests by the client. Limit may reset
in 1 seconds.'>
"""
attempt = retry_state.outcome
return attempt.failed and isinstance(attempt.exception(), Fault)
[docs]
def pypi_url(package_name: str) -> str:
"""Build pypi url out of a package name."""
return PyPILister.PACKAGE_URL.format(package_name=package_name)
[docs]
class PyPILister(Lister[PyPIListerState, PackageListPage]):
"""List origins from PyPI."""
LISTER_NAME = "pypi"
INSTANCE = "pypi" # As of today only the main pypi.org is used
PACKAGE_LIST_URL = "https://pypi.org/pypi" # XML-RPC url
PACKAGE_URL = "https://pypi.org/project/{package_name}/"
def __init__(
self,
scheduler: SchedulerInterface,
url: str = PACKAGE_LIST_URL,
instance: str = INSTANCE,
credentials: Optional[CredentialsType] = None,
max_origins_per_page: Optional[int] = None,
max_pages: Optional[int] = None,
enable_origins: bool = True,
):
super().__init__(
scheduler=scheduler,
url=url,
instance=instance,
credentials=credentials,
max_origins_per_page=max_origins_per_page,
max_pages=max_pages,
enable_origins=enable_origins,
)
# used as termination condition and if useful, becomes the new state when the
# visit is done
self.last_processed_serial: Optional[int] = None
[docs]
def state_from_dict(self, d: Dict[str, Any]) -> PyPIListerState:
return PyPIListerState(last_serial=d.get("last_serial"))
[docs]
def state_to_dict(self, state: PyPIListerState) -> Dict[str, Any]:
return asdict(state)
@http_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_last_serial(self, client: ServerProxy) -> int:
"""Internal detail to allow throttling when calling the changelog last entry"""
serial = client.changelog_last_serial()
assert isinstance(serial, int)
return serial
@http_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_since_serial(
self, client: ServerProxy, serial: int
) -> List[ChangelogEntry]:
"""Internal detail to allow throttling when calling the changelog listing"""
sleep(1) # to avoid the initial warning about throttling
return client.changelog_since_serial(serial) # type: ignore
[docs]
def get_pages(self) -> Iterator[PackageListPage]:
"""Iterate other changelog events per package, determine the max release date for that
package and use that max release date as last_update. When the execution is
done, this will also set the self.last_processed_serial attribute so we can
finalize the state of the lister for the next visit.
Yields:
List of Tuple of (package-name, max release-date)
"""
client = ServerProxy(self.url)
last_processed_serial = -1
if self.state.last_serial is not None:
last_processed_serial = self.state.last_serial
upstream_last_serial = self._changelog_last_serial(client)
# Paginate through result of pypi, until we read everything
while last_processed_serial < upstream_last_serial:
updated_packages = defaultdict(list)
for package, _, release_date, _, serial in self._changelog_since_serial(
client, last_processed_serial
):
updated_packages[package].append(release_date)
# Compute the max serial so we can stop when done
last_processed_serial = max(last_processed_serial, serial)
# Returns pages of result to flush regularly
yield [
(
pypi_url(package),
datetime.fromtimestamp(max(release_dates)).replace(
tzinfo=timezone.utc
),
)
for package, release_dates in updated_packages.items()
]
self.last_processed_serial = upstream_last_serial
[docs]
def get_origins_from_page(
self, packages: PackageListPage
) -> Iterator[ListedOrigin]:
"""Convert a page of PyPI repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for origin, last_update in packages:
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin,
visit_type="pypi",
last_update=last_update,
)
[docs]
def finalize(self):
"""Finalize the visit state by updating with the new last_serial if updates
actually happened.
"""
self.updated = (
self.state
and self.state.last_serial
and self.last_processed_serial
and self.state.last_serial < self.last_processed_serial
) or (not self.state.last_serial and self.last_processed_serial)
if self.updated:
self.state.last_serial = self.last_processed_serial