Source code for swh.lister.rubygems.lister

# Copyright (C) 2022-2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import base64
from datetime import timezone
import gzip
import logging
import os
import shutil
import subprocess
import tarfile
import tempfile
from typing import Any, Dict, Iterator, Optional, Tuple

from bs4 import BeautifulSoup
import psycopg2
from testing.postgresql import Postgresql

from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin

from ..pattern import CredentialsType, StatelessLister

logger = logging.getLogger(__name__)

RubyGemsListerPage = Dict[str, Any]


[docs] class RubyGemsLister(StatelessLister[RubyGemsListerPage]): """Lister for RubyGems.org, the Ruby community's gem hosting service. Instead of querying rubygems.org Web API, it uses gems data from the daily PostreSQL database dump of rubygems. It enables to gather all interesting info about a gem and its release artifacts (version number, download URL, checksums, release date) in an efficient way and without flooding rubygems Web API with numerous HTTP requests (as there is more than 187000 gems available on 07/10/2022). """ LISTER_NAME = "rubygems" VISIT_TYPE = "rubygems" INSTANCE = "rubygems" RUBY_GEMS_POSTGRES_DUMP_BASE_URL = ( "https://s3-us-west-2.amazonaws.com/rubygems-dumps" ) RUBY_GEMS_POSTGRES_DUMP_LIST_URL = ( f"{RUBY_GEMS_POSTGRES_DUMP_BASE_URL}?prefix=production/public_postgresql" ) RUBY_GEM_DOWNLOAD_URL_PATTERN = "https://rubygems.org/downloads/{gem}-{version}.gem" RUBY_GEM_ORIGIN_URL_PATTERN = "https://rubygems.org/gems/{gem}" RUBY_GEM_EXTRINSIC_METADATA_URL_PATTERN = ( "https://rubygems.org/api/v2/rubygems/{gem}/versions/{version}.json" ) DB_NAME = "rubygems" DUMP_SQL_PATH = "public_postgresql/databases/PostgreSQL.sql.gz" def __init__( self, scheduler: SchedulerInterface, url: str = RUBY_GEMS_POSTGRES_DUMP_BASE_URL, instance: str = INSTANCE, credentials: Optional[CredentialsType] = None, max_origins_per_page: Optional[int] = None, max_pages: Optional[int] = None, enable_origins: bool = True, ): super().__init__( scheduler=scheduler, credentials=credentials, instance=instance, url=url, max_origins_per_page=max_origins_per_page, max_pages=max_pages, enable_origins=enable_origins, )
[docs] def get_latest_dump_file(self) -> str: response = self.http_request(self.RUBY_GEMS_POSTGRES_DUMP_LIST_URL) xml = BeautifulSoup(response.content, "xml") contents = xml.find_all("Contents") return contents[-1].find("Key").text
[docs] def create_rubygems_db( self, postgresql: Postgresql ) -> Tuple[str, psycopg2._psycopg.connection]: logger.debug("Creating rubygems database") db_dsn = postgresql.dsn() db_url = postgresql.url().replace(db_dsn["database"], self.DB_NAME) db = psycopg2.connect(**db_dsn) db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) with db.cursor() as cursor: cursor.execute(f"CREATE DATABASE {self.DB_NAME}") db_dsn["database"] = self.DB_NAME db = psycopg2.connect(**db_dsn) db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) with db.cursor() as cursor: cursor.execute("CREATE EXTENSION IF NOT EXISTS hstore") return db_url, db
[docs] def populate_rubygems_db(self, db_url: str): dump_file = self.get_latest_dump_file() dump_id = dump_file.split("/")[2] response = self.http_request(f"{self.url}/{dump_file}", stream=True) with tempfile.TemporaryDirectory() as temp_dir: logger.debug( "Downloading latest rubygems database dump: %s (%s bytes)", dump_id, response.headers["content-length"], ) dump_file = os.path.join(temp_dir, "rubygems_dump.tar") with open(dump_file, "wb") as dump: for chunk in response.iter_content(chunk_size=1024): dump.write(chunk) with tarfile.open(dump_file) as dump_tar: dump_tar.extractall(temp_dir) logger.debug("Populating rubygems database with dump %s", dump_id) # FIXME: make this work with -v ON_ERROR_STOP=1 psql = subprocess.Popen( ["psql", "--no-psqlrc", "-q", db_url], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # passing value of gzip.open as stdin of subprocess.run makes the process # read raw data instead of decompressed data so we have to use a pipe with gzip.open(os.path.join(temp_dir, self.DUMP_SQL_PATH), "rb") as sql: shutil.copyfileobj(sql, psql.stdin) # type: ignore # denote end of read file psql.stdin.close() # type: ignore psql.wait() if psql.returncode != 0: assert psql.stdout for line in psql.stdout.readlines(): logger.warning("psql out: %s", line.decode().strip()) assert psql.stderr for line in psql.stderr.readlines(): logger.warning("psql err: %s", line.decode().strip()) raise ValueError( "Loading rubygems dump failed with exit code %s.", psql.returncode, )
[docs] def get_pages(self) -> Iterator[RubyGemsListerPage]: # spawn a temporary postgres instance (require initdb executable in environment) with Postgresql() as postgresql: db_url, db = self.create_rubygems_db(postgresql) self.populate_rubygems_db(db_url) with db.cursor() as cursor: cursor.execute("SELECT id, name from rubygems") for gem_id, gem_name in cursor.fetchall(): logger.debug("Processing gem named %s", gem_name) with db.cursor() as cursor_v: cursor_v.execute( "SELECT authors, built_at, number, sha256, size from versions " "where rubygem_id = %s", (gem_id,), ) versions = [ { "number": number, "url": self.RUBY_GEM_DOWNLOAD_URL_PATTERN.format( gem=gem_name, version=number ), "date": built_at.replace(tzinfo=timezone.utc), "authors": authors, "sha256": ( base64.decodebytes(sha256.encode()).hex() if sha256 else None ), "size": size, } for authors, built_at, number, sha256, size in cursor_v.fetchall() ] if versions: yield { "name": gem_name, "versions": versions, }
[docs] def get_origins_from_page(self, page: RubyGemsListerPage) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None artifacts = [] rubygem_metadata = [] for version in page["versions"]: artifacts.append( { "version": version["number"], "filename": version["url"].split("/")[-1], "url": version["url"], "checksums": ( {"sha256": version["sha256"]} if version["sha256"] else {} ), "length": version["size"], } ) rubygem_metadata.append( { "version": version["number"], "date": version["date"].isoformat(), "authors": version["authors"], "extrinsic_metadata_url": ( self.RUBY_GEM_EXTRINSIC_METADATA_URL_PATTERN.format( gem=page["name"], version=version["number"] ) ), } ) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=self.RUBY_GEM_ORIGIN_URL_PATTERN.format(gem=page["name"]), last_update=max(version["date"] for version in page["versions"]), extra_loader_arguments={ "artifacts": artifacts, "rubygem_metadata": rubygem_metadata, }, )