# Copyright (C) 2022-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass, field
from datetime import datetime, timezone
from itertools import product
import logging
from string import Template
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
from urllib.parse import urljoin
import repomd
from typing_extensions import TypedDict
from swh.lister.pattern import CredentialsType
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import Lister
logger = logging.getLogger(__name__)
Release = str
Component = str
PkgName = str
PkgVersion = str
RPMOrigin = str
RPMPageType = Optional[Tuple[Release, Component, repomd.Repo]]
"""Each page is a list of packages for a given (release, component) pair
from a Red Hat based distribution."""
[docs]
class RPMSourceData(TypedDict):
"""Dictionary holding relevant data for listing RPM source packages.
See content of the lister config directory to get examples of RPM
source data for famous RedHat based distributions.
"""
base_url: str
"""Base URL of a RPM repository"""
releases: List[Release]
"""List of release identifiers for a Red Hat based distribution"""
components: List[Component]
"""List of components for a Red Hat based distribution"""
index_url_templates: List[str]
"""List of URL templates to discover source packages metadata, the
following variables can be substituted in them: ``base_url``, ``release``
and ``edition``, see :class:`string.Template` for more details about the
format. The generated URLs must target directories containing a sub-directory
named ``repodata``, which contains packages metadata, in order to be
successfully processed by the lister."""
def _get_last_modified(pkg: repomd.Package) -> datetime:
"""Get timezone aware last modified time in UTC from RPM package metadata."""
ts = pkg._element.find("common:time", namespaces=repomd._ns).get("build")
return datetime.utcfromtimestamp(int(ts)).replace(tzinfo=timezone.utc)
def _get_checksums(pkg: repomd.Package) -> Dict[str, str]:
"""Get checksums associated to rpm archive."""
cs = pkg._element.find("common:checksum", namespaces=repomd._ns)
cs_type = cs.get("type")
if cs_type == "sha":
cs_type = "sha1"
return {cs_type: cs.text}
[docs]
@dataclass
class RPMListerState:
"""State of RPM lister"""
package_versions: Dict[PkgName, Set[PkgVersion]] = field(default_factory=dict)
"""Dictionary mapping a package name to all the versions found during
last listing"""
[docs]
class RPMLister(Lister[RPMListerState, RPMPageType]):
"""
List source packages for a Red Hat based linux distribution.
The lister creates a snapshot for each package from all its available versions.
In incremental mode, only packages with different snapshot since the last listing
operation will be sent to the scheduler that will create loading tasks to archive
newly found source code.
Args:
scheduler: instance of SchedulerInterface
url: Red Hat based distribution info URL
instance: name of Red Hat based distribution
rpm_src_data: list of dictionaries holding data required to list RPM source packages,
see examples in the config directory.
incremental: if :const:`True`, only packages with new versions are sent to the
scheduler when relisting
"""
LISTER_NAME = "rpm"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: str,
rpm_src_data: List[RPMSourceData],
incremental: bool = False,
max_origins_per_page: Optional[int] = None,
max_pages: Optional[int] = None,
enable_origins: bool = True,
credentials: Optional[CredentialsType] = None,
):
super().__init__(
scheduler=scheduler,
url=url,
instance=instance,
credentials=credentials,
max_origins_per_page=max_origins_per_page,
max_pages=max_pages,
enable_origins=enable_origins,
)
self.rpm_src_data = rpm_src_data
self.incremental = incremental
self.listed_origins: Dict[RPMOrigin, ListedOrigin] = {}
self.origins_to_send: Set[RPMOrigin] = set()
self.package_versions: Dict[PkgName, Set[PkgVersion]] = {}
[docs]
def state_from_dict(self, d: Dict[str, Any]) -> RPMListerState:
return RPMListerState(package_versions={k: set(v) for k, v in d.items()})
[docs]
def state_to_dict(self, state: RPMListerState) -> Dict[str, Any]:
return {k: list(v) for k, v in state.package_versions.items()}
[docs]
def repo_request(
self,
index_url_template: Template,
base_url: str,
release: Release,
component: Component,
) -> Optional[RPMPageType]:
"""Return parsed packages for a given distribution release and component."""
index_url = index_url_template.substitute(
base_url=base_url.rstrip("/"), release=release, component=component
)
try:
repo = repomd.load(index_url) # throws error if no repomd.xml is not found
except Exception:
logger.debug("Repository metadata not found at URL %s", index_url)
return None
else:
logger.debug(
"Fetched metadata from url: %s, found %d packages", index_url, len(repo)
)
return repo
[docs]
def get_pages(self) -> Iterator[RPMPageType]:
"""Return an iterator on parsed rpm packages, one page per (release, component) pair."""
for rpm_src_data in self.rpm_src_data:
index_url_templates = [
Template(index_url_template)
for index_url_template in rpm_src_data["index_url_templates"]
]
# try all possible package repository URLs for each (release, component) pair
for release, component, index_url_template in product(
rpm_src_data["releases"],
rpm_src_data["components"],
index_url_templates,
):
repo = self.repo_request(
index_url_template,
rpm_src_data["base_url"],
release,
component,
)
if repo is not None:
# valid package repository found, yield page
yield (release, component, repo)
yield None
[docs]
def origin_url_for_package(self, package_name: PkgName) -> RPMOrigin:
"""Return the origin url for the given package."""
# TODO: Use a better origin URL before deploying the lister to production
# https://gitlab.softwareheritage.org/swh/devel/swh-model/-/issues/4632
return f"rpm://{self.instance}/packages/{package_name}"
[docs]
def get_origins_from_page(self, page: RPMPageType) -> Iterator[ListedOrigin]:
"""Convert a page of rpm package sources into an iterator of ListedOrigin."""
assert self.lister_obj.id is not None
if page is None:
# all pages processed, yield listed origins
for origin_url in self.origins_to_send:
yield self.listed_origins[origin_url]
return
release, component, repo = page
logger.debug(
"Listing %s release %s component %s from repository metadata located at %s",
self.instance,
release,
component,
repo.baseurl,
)
origins_to_send = set()
new_origins_count = 0
# iterate on each package's metadata
for pkg_metadata in repo:
if pkg_metadata.arch != "src":
# not a source package, skip it
continue
# extract package metadata
package_name = pkg_metadata.name
# we extract the intrinsic version of the package for the rpm loader
# to avoid creating different releases targeting the same directory
# 2.12-10.el8 => 2.12-10
package_version_split = pkg_metadata.vr.rsplit("-", maxsplit=1)
package_version = "-".join(
[
package_version_split[0],
package_version_split[1].split(".", maxsplit=1)[0],
]
)
# create package version key as expected by the rpm loader
package_version_key = f"{release}/{component}/{package_version}"
package_build_time = _get_last_modified(pkg_metadata)
package_download_url = urljoin(
repo.baseurl.rstrip("/") + "/", pkg_metadata.location
)
checksums = _get_checksums(pkg_metadata)
# build origin url
origin_url = self.origin_url_for_package(package_name)
# this is the first time a package is listed
if origin_url not in self.listed_origins:
# create a ListedOrigin object for it that can be later
# updated with new package versions info
self.listed_origins[origin_url] = ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type="rpm",
extra_loader_arguments={"packages": {}},
last_update=package_build_time,
)
# init set that will contain all listed package versions
self.package_versions[package_name] = set()
new_origins_count += 1
# origins will be yielded when all pages processed
origins_to_send.add(origin_url)
# update package metadata in parameter that will be provided
# to the rpm loader
self.listed_origins[origin_url].extra_loader_arguments["packages"][
package_version_key
] = {
"name": package_name,
"version": package_version,
"url": package_download_url,
"build_time": package_build_time.isoformat(),
"checksums": checksums,
}
last_update = self.listed_origins[origin_url].last_update
if last_update is not None and package_build_time > last_update:
self.listed_origins[origin_url].last_update = package_build_time
# add package version key to the set of found versions
self.package_versions[package_name].add(package_version_key)
# package has already been listed during a previous listing process
if self.incremental and package_name in self.state.package_versions:
new_versions = (
self.package_versions[package_name]
- self.state.package_versions[package_name]
)
# no new versions so far, no need to send the origin to the scheduler
if not new_versions:
origins_to_send.remove(origin_url)
logger.debug(
"Found %s packages to update (%s new ones and %s packages with new versions).",
len(origins_to_send),
new_origins_count,
len(origins_to_send) - new_origins_count,
)
logger.debug(
"Current total number of listed source packages is equal to %s.",
len(self.listed_origins),
)
self.origins_to_send.update(origins_to_send)
[docs]
def finalize(self):
if self.incremental:
# set mapping between listed package names and versions as lister state
self.state.package_versions = self.package_versions
self.updated = len(self.listed_origins) > 0