Source code for swh.lister.npm.lister

# Copyright (C) 2018-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from dataclasses import asdict, dataclass
import logging
from typing import Any, Dict, Iterator, List, Optional

import iso8601
import requests
from tenacity.before_sleep import before_sleep_log

from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, Lister
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin

logger = logging.getLogger(__name__)


[docs]@dataclass class NpmListerState: """State of npm lister""" last_seq: Optional[int] = None
[docs]class NpmLister(Lister[NpmListerState, List[Dict[str, Any]]]): """ List all packages hosted on the npm registry. The lister is based on the npm replication API powered by a CouchDB database (https://docs.couchdb.org/en/stable/api/database/). Args: scheduler: a scheduler instance page_size: number of packages info to return per page when querying npm API incremental: defines if incremental listing should be used, in that case only modified or new packages since last incremental listing operation will be returned, otherwise all packages will be listed in lexicographical order """ LISTER_NAME = "npm" INSTANCE = "npm" API_BASE_URL = "https://replicate.npmjs.com" API_INCREMENTAL_LISTING_URL = f"{API_BASE_URL}/_changes" API_FULL_LISTING_URL = f"{API_BASE_URL}/_all_docs" PACKAGE_URL_TEMPLATE = "https://www.npmjs.com/package/{package_name}" def __init__( self, scheduler: SchedulerInterface, page_size: int = 1000, incremental: bool = False, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=self.API_INCREMENTAL_LISTING_URL if incremental else self.API_FULL_LISTING_URL, instance=self.INSTANCE, ) self.page_size = page_size if not incremental: # in full listing mode, first package in each page corresponds to the one # provided as the startkey query parameter value, so we increment the page # size by one to avoid double package processing self.page_size += 1 self.incremental = incremental self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT} )
[docs] def state_from_dict(self, d: Dict[str, Any]) -> NpmListerState: return NpmListerState(**d)
[docs] def state_to_dict(self, state: NpmListerState) -> Dict[str, Any]: return asdict(state)
[docs] def request_params(self, last_package_id: str) -> Dict[str, Any]: # include package JSON document to get its last update date params = {"limit": self.page_size, "include_docs": "true"} if self.incremental: params["since"] = last_package_id else: params["startkey"] = last_package_id return params
[docs] @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) def page_request(self, last_package_id: str) -> requests.Response: params = self.request_params(last_package_id) logger.debug("Fetching URL %s with params %s", self.url, params) response = self.session.get(self.url, params=params) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) response.raise_for_status() return response
[docs] def get_pages(self) -> Iterator[List[Dict[str, Any]]]: last_package_id: str = "0" if self.incremental else '""' if ( self.incremental and self.state is not None and self.state.last_seq is not None ): last_package_id = str(self.state.last_seq) while True: response = self.page_request(last_package_id) data = response.json() page = data["results"] if self.incremental else data["rows"] if not page: break if self.incremental or len(page) < self.page_size: yield page else: yield page[:-1] if len(page) < self.page_size: break last_package_id = ( str(page[-1]["seq"]) if self.incremental else f'"{page[-1]["id"]}"' )
[docs] def get_origins_from_page( self, page: List[Dict[str, Any]] ) -> Iterator[ListedOrigin]: """Convert a page of Npm repositories into a list of ListedOrigin.""" assert self.lister_obj.id is not None for package in page: # no source code to archive here if not package["doc"].get("versions", {}): continue package_name = package["doc"]["name"] package_latest_version = ( package["doc"].get("dist-tags", {}).get("latest", "") ) last_update = None if package_latest_version in package["doc"].get("time", {}): last_update = iso8601.parse_date( package["doc"]["time"][package_latest_version] ) yield ListedOrigin( lister_id=self.lister_obj.id, url=self.PACKAGE_URL_TEMPLATE.format(package_name=package_name), visit_type="npm", last_update=last_update, )
[docs] def commit_page(self, page: List[Dict[str, Any]]): """Update the currently stored state using the latest listed page.""" if self.incremental: last_package = page[-1] last_seq = last_package["seq"] if self.state.last_seq is None or last_seq > self.state.last_seq: self.state.last_seq = last_seq
[docs] def finalize(self): if self.incremental and self.state.last_seq is not None: scheduler_state = self.get_state_from_scheduler() if ( scheduler_state.last_seq is None or self.state.last_seq > scheduler_state.last_seq ): self.updated = True