Source code for swh.lister.bioconductor.lister

# Copyright (C) 2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from dataclasses import dataclass, field
import json
import logging
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
from urllib.parse import urljoin

from bs4 import BeautifulSoup
from debian.deb822 import Sources
import iso8601
from packaging import version
from requests import HTTPError

from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin

from ..pattern import CredentialsType, Lister

logger = logging.getLogger(__name__)

Release = str
Category = str
BioconductorListerPage = Optional[Tuple[Release, Category, Dict[str, Any]]]


[docs] @dataclass class BioconductorListerState: """State of the Bioconductor lister""" package_versions: Dict[str, Set[str]] = field(default_factory=dict) """Dictionary mapping a package name to all the versions found during last listing"""
[docs] class BioconductorLister(Lister[BioconductorListerState, BioconductorListerPage]): """List origins from Bioconductor, a collection of open source software for bioinformatics based on the R statistical programming language.""" LISTER_NAME = "bioconductor" VISIT_TYPE = "bioconductor" INSTANCE = "bioconductor" BIOCONDUCTOR_HOMEPAGE = "https://www.bioconductor.org" def __init__( self, scheduler: SchedulerInterface, url: str = BIOCONDUCTOR_HOMEPAGE, instance: str = INSTANCE, credentials: Optional[CredentialsType] = None, releases: Optional[List[Release]] = None, categories: Optional[List[Category]] = None, incremental: bool = False, max_origins_per_page: Optional[int] = None, max_pages: Optional[int] = None, enable_origins: bool = True, record_batch_size: int = 1000, ): super().__init__( scheduler=scheduler, url=url, instance=instance, credentials=credentials, max_origins_per_page=max_origins_per_page, max_pages=max_pages, enable_origins=enable_origins, record_batch_size=record_batch_size, ) if releases is None: self.releases = self.fetch_versions() else: self.releases = releases self.categories = categories or [ "bioc", "workflows", "data/annotation", "data/experiment", ] self.incremental = incremental self.listed_origins: Dict[str, ListedOrigin] = {} self.origins_to_send: Set[str] = set() self.package_versions: Dict[str, Set[str]] = {}
[docs] def state_from_dict(self, d: Dict[str, Any]) -> BioconductorListerState: return BioconductorListerState( package_versions={k: set(v) for k, v in d.items()} )
[docs] def state_to_dict(self, state: BioconductorListerState) -> Dict[str, Any]: return {k: list(v) for k, v in state.package_versions.items()}
[docs] def origin_url_for_package(self, package_name: str) -> str: return f"{self.BIOCONDUCTOR_HOMEPAGE}/packages/{package_name}"
[docs] def get_pages(self) -> Iterator[BioconductorListerPage]: """Return an iterator for each page. Every page is a (release, category) pair.""" for release in self.releases: if version.parse(release) < version.parse("1.8"): # only bioc category existed before 1.8 url_template = urljoin( self.url, "/packages/{category}/{release}/src/contrib/PACKAGES" ) categories = {"bioc"} elif version.parse(release) < version.parse("2.5"): # workflows category won't exist for these url_template = urljoin( self.url, "/packages/{release}/{category}/src/contrib/PACKAGES" ) categories = {"bioc", "data/annotation", "data/experiment"} else: url_template = urljoin( self.url, "/packages/json/{release}/{category}/packages.json" ) categories = set(self.categories) for category in categories: url = url_template.format(release=release, category=category) try: packages_txt = self.http_request(url).text packages = self.parse_packages(packages_txt) except HTTPError as e: assert e.response is not None logger.debug( "Skipping page since got %s response for %s", e.response.status_code, url, ) continue yield (release, category, packages) # Yield extra none to signal get_origins_from_page() # to stop iterating and yield the extracted origins yield None
[docs] def fetch_versions(self) -> List[str]: html = self.http_request( f"{self.BIOCONDUCTOR_HOMEPAGE}/about/release-announcements" ).text bs = BeautifulSoup(html, "html.parser") return [ tr.find_all("td")[0].text for tr in reversed(bs.find("table").find("tbody").find_all("tr")) if tr.find_all("td")[2].find("a") ]
[docs] def parse_packages(self, text: str) -> Dict[str, Any]: """Parses packages.json and PACKAGES files""" try: return json.loads(text) except json.JSONDecodeError: pass sources = Sources.iter_paragraphs(text) return {s["Package"]: dict(s) for s in sources}
[docs] def get_origins_from_page( self, page: BioconductorListerPage ) -> Iterator[ListedOrigin]: """Convert a page of BioconductorLister PACKAGES/packages.json metadata into a list of ListedOrigins""" assert self.lister_obj.id is not None if page is None: for origin_url in self.origins_to_send: yield self.listed_origins[origin_url] return release, category, packages = page origins_to_send = set() for pkg_name, pkg_metadata in packages.items(): pkg_version = pkg_metadata["Version"] last_update_date = None last_update_str = "" if version.parse(release) < version.parse("1.8"): tar_url = urljoin( self.url, f"/packages/{category}/{release}/src/contrib/Source/{pkg_name}_{pkg_metadata['Version']}.tar.gz", ) elif version.parse(release) < version.parse("2.5"): tar_url = urljoin( self.url, f"/packages/{release}/{category}/src/contrib/{pkg_name}_{pkg_metadata['Version']}.tar.gz", ) else: # Some packages don't have don't have a download URL (based on source.ver) # and hence can't be archived. For example see the package # maEndToEnd at the end of # https://bioconductor.org/packages/json/3.17/workflows/packages.json # Even guessing tar url path based on the expected url format doesn't work. i.e. # https://bioconductor.org/packages/3.17/workflows/src/contrib/maEndToEnd_2.20.0.tar.gz # doesn't respond with a tar file. Plus, the mirror clearly shows # that maEndToEnd tar is missing. # https://ftp.gwdg.de/pub/misc/bioconductor/packages/3.17/workflows/src/contrib/ # So skipping such packages if "source.ver" not in pkg_metadata: logger.info( ( "Skipping package %s listed in release %s " "category %s since it doesn't have a download URL" ), pkg_name, release, category, ) continue if "git_url" in pkg_metadata: # Along with the .tar.gz files grab the git repo as well git_origin_url = pkg_metadata["git_url"] git_last_update_str = pkg_metadata.get("git_last_commit_date") self.listed_origins[git_origin_url] = ListedOrigin( lister_id=self.lister_obj.id, visit_type="git", url=git_origin_url, last_update=( iso8601.parse_date(git_last_update_str) if git_last_update_str else None ), ) origins_to_send.add(git_origin_url) tar_url = urljoin( self.url, f"/packages/{release}/{category}/{pkg_metadata['source.ver']}", ) last_update_str = pkg_metadata.get( "Date/Publication", pkg_metadata.get("git_last_commit_date") ) last_update_date = ( iso8601.parse_date(last_update_str) if last_update_str else None ) # For some packages in releases >= 2.5, last_update can still # remain None. Example: See "adme16cod.db" entry in # https://bioconductor.org/packages/json/3.17/data/annotation/packages.json origin_url = self.origin_url_for_package(pkg_name) package_version_key = f"{release}/{category}/{pkg_version}" if origin_url not in self.listed_origins: self.listed_origins[origin_url] = ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=origin_url, last_update=last_update_date, extra_loader_arguments={"packages": {}}, ) self.package_versions[pkg_name] = set() origins_to_send.add(origin_url) optional_fields: Dict[str, Any] = {} if "MD5sum" in pkg_metadata: optional_fields["checksums"] = {"md5": pkg_metadata["MD5sum"]} if last_update_str: optional_fields["last_update_date"] = last_update_str self.listed_origins[origin_url].extra_loader_arguments["packages"][ package_version_key ] = { "release": release, "version": pkg_version, "category": category, "package": pkg_name, "tar_url": tar_url, } self.listed_origins[origin_url].extra_loader_arguments["packages"][ package_version_key ].update(optional_fields) last_update = self.listed_origins[origin_url].last_update if ( last_update is not None and last_update_date is not None and last_update_date > last_update ): self.listed_origins[origin_url].last_update = last_update_date self.package_versions[pkg_name].add(package_version_key) # package has been listed during a previous listing if self.incremental and pkg_name in self.state.package_versions: new_versions = ( self.package_versions[pkg_name] - self.state.package_versions[pkg_name] ) # no new versions, no need to send the origin to the scheduler if not new_versions: origins_to_send.remove(origin_url) self.origins_to_send.update(origins_to_send)
[docs] def finalize(self) -> None: if self.incremental: self.state.package_versions = self.package_versions self.updated = len(self.listed_origins) > 0