Source code for swh.lister.npm.lister

# Copyright (C) 2018-2025 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from dataclasses import asdict, dataclass
import logging
from typing import Any, Dict, Iterator, List, Optional

from swh.lister.pattern import CredentialsType, Lister
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from swh.scheduler.utils import utcnow

logger = logging.getLogger(__name__)


[docs] @dataclass class NpmListerState: """State of npm lister""" last_seq: Optional[int] = None
[docs] class NpmLister(Lister[NpmListerState, List[Dict[str, Any]]]): """ List packages referenced by the changes API of the npm registry by last modification order. The lister is based on the npm replication API powered by a CouchDB database (https://docs.couchdb.org/en/stable/api/database/). Args: scheduler: a scheduler instance page_size: number of packages info to return per page when querying npm API incremental: defines if incremental listing should be used, in that case only modified or new packages since last incremental listing operation will be returned, otherwise all packages referenced by the NPM changes API will be listed in last modification order """ LISTER_NAME = "npm" INSTANCE = "npm" NPM_API_BASE_URL = "https://replicate.npmjs.com" NPM_API_CHANGES_URL = f"{NPM_API_BASE_URL}/_changes" PACKAGE_URL_TEMPLATE = "https://www.npmjs.com/package/{package_name}" def __init__( self, scheduler: SchedulerInterface, url: str = NPM_API_CHANGES_URL, instance: str = INSTANCE, page_size: int = 10000, incremental: bool = False, credentials: CredentialsType = None, max_origins_per_page: Optional[int] = None, max_pages: Optional[int] = None, enable_origins: bool = True, ): super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, max_origins_per_page=max_origins_per_page, max_pages=max_pages, enable_origins=enable_origins, ) self.page_size = page_size self.incremental = incremental self.listing_date = utcnow()
[docs] def state_from_dict(self, d: Dict[str, Any]) -> NpmListerState: return NpmListerState(**d)
[docs] def state_to_dict(self, state: NpmListerState) -> Dict[str, Any]: return asdict(state)
[docs] def request_params(self, last_seq: str) -> Dict[str, Any]: # include package JSON document to get its last update date params: Dict[str, Any] = {"limit": self.page_size} params["since"] = last_seq return params
[docs] def get_pages(self) -> Iterator[List[Dict[str, Any]]]: last_seq: str = "0" if ( self.incremental and self.state is not None and self.state.last_seq is not None ): last_seq = str(self.state.last_seq) while True: response = self.http_request(self.url, params=self.request_params(last_seq)) data = response.json() page = data["results"] if not page: break yield page if len(page) < self.page_size: break last_seq = str(page[-1]["seq"])
[docs] def get_origins_from_page( self, page: List[Dict[str, Any]] ) -> Iterator[ListedOrigin]: """Convert a page of Npm repositories into a list of ListedOrigin.""" assert self.lister_obj.id is not None for package in page: if package.get("deleted"): continue yield ListedOrigin( lister_id=self.lister_obj.id, url=self.PACKAGE_URL_TEMPLATE.format(package_name=package["id"]), visit_type="npm", last_update=self.listing_date, )
[docs] def commit_page(self, page: List[Dict[str, Any]]): """Update the currently stored state using the latest listed page.""" if self.incremental: last_package = page[-1] last_seq = last_package["seq"] if self.state.last_seq is None or last_seq > self.state.last_seq: self.state.last_seq = last_seq
[docs] def finalize(self): if self.incremental and self.state.last_seq is not None: scheduler_state = self.get_state_from_scheduler() if ( scheduler_state.last_seq is None or self.state.last_seq > scheduler_state.last_seq ): self.updated = True