# Copyright (C) 2022-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import csv
from dataclasses import dataclass
from datetime import datetime
import json
import logging
from pathlib import Path
import tarfile
import tempfile
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urlparse
import iso8601
from looseversion import LooseVersion2
from swh.core.utils import grouper
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
# Aliasing the page results returned by `get_pages` method from the lister.
CratesListerPage = List[List[Dict[str, Any]]]
[docs]
@dataclass
class CratesListerState:
"""Store lister state for incremental mode operations.
'index_last_update' represents the UTC time the crates.io database dump was
started
"""
index_last_update: Optional[datetime] = None
[docs]
class CratesLister(Lister[CratesListerState, CratesListerPage]):
"""List origins from the "crates.io" forge.
It downloads a tar.gz archive which contains crates.io database table content as
csv files which is automatically generated every 24 hours.
Parsing two csv files we can list all Crates.io package names and their related
versions.
In incremental mode, it check each entry comparing their 'last_update' value
with self.state.index_last_update
"""
LISTER_NAME = "crates"
VISIT_TYPE = "crates"
INSTANCE = "crates"
BASE_URL = "https://crates.io"
DB_DUMP_URL = "https://static.crates.io/db-dump.tar.gz"
CRATE_FILE_URL_PATTERN = (
"https://static.crates.io/crates/{crate}/{crate}-{version}.crate"
)
CRATE_URL_PATTERN = "https://crates.io/crates/{crate}"
def __init__(
self,
scheduler: SchedulerInterface,
url: str = BASE_URL,
instance: str = INSTANCE,
credentials: CredentialsType = None,
max_origins_per_page: Optional[int] = None,
max_pages: Optional[int] = None,
enable_origins: bool = True,
):
super().__init__(
scheduler=scheduler,
credentials=credentials,
url=url,
instance=instance,
max_origins_per_page=max_origins_per_page,
max_pages=max_pages,
enable_origins=enable_origins,
)
self.index_metadata: Dict[str, str] = {}
self.all_crates_processed = False
[docs]
def state_from_dict(self, d: Dict[str, Any]) -> CratesListerState:
index_last_update = d.get("index_last_update")
if index_last_update is not None:
d["index_last_update"] = iso8601.parse_date(index_last_update)
return CratesListerState(**d)
[docs]
def state_to_dict(self, state: CratesListerState) -> Dict[str, Any]:
d: Dict[str, Optional[str]] = {"index_last_update": None}
index_last_update = state.index_last_update
if index_last_update is not None:
d["index_last_update"] = index_last_update.isoformat()
return d
[docs]
def is_new(self, dt_str: str):
"""Returns True when dt_str is greater than
self.state.index_last_update
"""
dt = iso8601.parse_date(dt_str)
last = self.state.index_last_update
return not last or (last is not None and last < dt)
[docs]
def get_and_parse_db_dump(self) -> Dict[str, Any]:
"""Download and parse csv files from db_dump_path.
Returns a dict where each entry corresponds to a package name with its related versions.
"""
with tempfile.TemporaryDirectory() as tmpdir:
file_name = self.DB_DUMP_URL.split("/")[-1]
archive_path = Path(tmpdir) / file_name
# Download the Db dump
with self.http_request(self.DB_DUMP_URL, stream=True) as res:
with open(archive_path, "wb") as out_file:
for chunk in res.iter_content(chunk_size=1024):
out_file.write(chunk)
# Extract the Db dump
db_dump_path = Path(str(archive_path).split(".tar.gz")[0])
members_to_extract = []
with tarfile.open(archive_path) as tf:
for member in tf.getmembers():
if member.name.endswith(
("/data/crates.csv", "/data/versions.csv", "/metadata.json")
):
members_to_extract.append(member)
tf.extractall(members=members_to_extract, path=db_dump_path)
csv.field_size_limit(10000000)
(crates_csv_path,) = list(db_dump_path.glob("*/data/crates.csv"))
(versions_csv_path,) = list(db_dump_path.glob("*/data/versions.csv"))
(index_metadata_json_path,) = list(db_dump_path.rglob("*/metadata.json"))
with index_metadata_json_path.open("rb") as index_metadata_json:
self.index_metadata = json.load(index_metadata_json)
crates: Dict[str, Any] = {}
with crates_csv_path.open() as crates_fd:
crates_csv = csv.DictReader(crates_fd)
for item in crates_csv:
if self.is_new(item["updated_at"]):
# crate 'id' as key
crates[item["id"]] = {
"name": item["name"],
"updated_at": item["updated_at"],
"versions": {},
}
data: Dict[str, Any] = {}
with versions_csv_path.open() as versions_fd:
versions_csv = csv.DictReader(versions_fd)
for version in versions_csv:
if version["crate_id"] in crates.keys():
crate: Dict[str, Any] = crates[version["crate_id"]]
crate["versions"][version["num"]] = version
# crate 'name' as key
data[crate["name"]] = crate
return data
[docs]
def page_entry_dict(self, entry: Dict[str, Any]) -> Dict[str, Any]:
"""Transform package version definition dict to a suitable
page entry dict
"""
crate_file = self.CRATE_FILE_URL_PATTERN.format(
crate=entry["name"], version=entry["version"]
)
filename = urlparse(crate_file).path.split("/")[-1]
return dict(
name=entry["name"],
version=entry["version"],
checksum=entry["checksum"],
yanked=True if entry["yanked"] == "t" else False,
crate_file=crate_file,
filename=filename,
last_update=entry["updated_at"],
)
[docs]
def get_pages(self) -> Iterator[CratesListerPage]:
"""Each page is a list of crate versions with:
- name: Name of the crate
- version: Version
- checksum: Checksum
- yanked: Whether the package is yanked or not
- crate_file: Url of the crate file
- filename: File name of the crate file
- last_update: Last update for that version
"""
# Fetch crates.io Db dump, then Parse the data.
dataset = self.get_and_parse_db_dump()
logger.debug("Found %s crates in crates_index", len(dataset))
# a page contains up to 1000 crates with versions info
for crates in grouper(dataset.items(), 1000):
page = []
for name, item in crates:
crate_versions = []
# sort crate versions
versions = sorted(item["versions"].keys(), key=LooseVersion2)
for version in versions:
v = item["versions"][version]
v["name"] = name
v["version"] = version
crate_versions.append(self.page_entry_dict(v))
page.append(crate_versions)
yield page
self.all_crates_processed = True
[docs]
def get_origins_from_page(self, page: CratesListerPage) -> Iterator[ListedOrigin]:
"""Iterate on all crate pages and yield ListedOrigin instances."""
assert self.lister_obj.id is not None
for crate_versions in page:
url = self.CRATE_URL_PATTERN.format(crate=crate_versions[0]["name"])
last_update = crate_versions[0]["last_update"]
artifacts = []
for entry in crate_versions:
# Build an artifact entry following original-artifacts-json specification
# https://docs.softwareheritage.org/devel/swh-storage/extrinsic-metadata-specification.html#original-artifacts-json # noqa: B950
artifacts.append(
{
"version": entry["version"],
"filename": entry["filename"],
"url": entry["crate_file"],
"checksums": {
"sha256": entry["checksum"],
},
}
)
yield ListedOrigin(
lister_id=self.lister_obj.id,
visit_type=self.VISIT_TYPE,
url=url,
last_update=iso8601.parse_date(last_update),
extra_loader_arguments={
"artifacts": artifacts,
},
)
[docs]
def finalize(self) -> None:
if not self.state.index_last_update and self.all_crates_processed:
last = iso8601.parse_date(self.index_metadata["timestamp"])
self.state.index_last_update = last
self.updated = True