# Copyright (C) 2022-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import json
import logging
from os import path, walk
import string
import subprocess
import tempfile
from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple
import attr
from swh.core.tarball import uncompress
from swh.loader.core.utils import EMPTY_AUTHOR, cached_method, release_name
from swh.loader.package.loader import BasePackageInfo, PackageLoader
from swh.model import from_disk
from swh.model.model import ObjectType, Release, Sha1Git, TimestampWithTimezone
from swh.storage.interface import StorageInterface
logger = logging.getLogger(__name__)
[docs]
@attr.s
class RpmPackageInfo(BasePackageInfo):
name = attr.ib(type=str)
intrinsic_version = attr.ib(type=str)
"""Intrinsic version of the package, independent from the distribution (e.g. 1.18.0-5)"""
build_time = attr.ib(type=str, default=None)
"""Build time of the package in iso format. (e.g. 2017-02-10T04:59:31+00:00)"""
checksums_str = attr.ib(type=str, default=None)
EXTID_TYPE = "rpm-sha256"
MANIFEST_FORMAT = string.Template("$name $intrinsic_version $checksums_str")
[docs]
class RpmLoader(PackageLoader[RpmPackageInfo]):
visit_type = "rpm"
def __init__(
self,
storage: StorageInterface,
url: str,
packages: Dict[str, Dict[str, Any]],
**kwargs: Any,
):
"""RPM Loader implementation.
Args:
url: Origin url (e.g. rpm://Fedora/packages/nginx)
packages: versioned packages and associated artifacts, example::
{
'34/Everything/1.18.0-5': {
'name': 'nginx',
'version': '1.18.0-5',
'release': 34,
'edition': 'Everything',
'build_time': '2022-11-01T12:00:55+00:00',
'url': 'https://archives.fedoraproject.org/nginx-1.18.0-5.fc34.src.rpm',
'checksums': {
'sha256': 'ac68fa26886c661b77bfb97bbe234a6c37d36a16c1eca126eabafbfc7fcb',
}
},
# ...
}
"""
super().__init__(storage=storage, url=url, **kwargs)
self.url = url
self.packages = packages
self.tarball_branches: Dict[bytes, Mapping[str, Any]] = {}
[docs]
@cached_method
def get_versions(self) -> Sequence[str]:
"""Returns the package versions sorted by build time"""
return list(sorted(self.packages, key=lambda p: self.packages[p]["build_time"]))
[docs]
def get_default_version(self) -> str:
"""Get the latest release version of a rpm package"""
return self.get_versions()[-1]
[docs]
def get_package_info(self, version: str) -> Iterator[Tuple[str, RpmPackageInfo]]:
yield (
release_name(version),
RpmPackageInfo.from_metadata(self.packages[version], version),
)
[docs]
def uncompress(
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str
) -> str:
rpm_path, _ = dl_artifacts[0]
return extract_rpm_package(rpm_path, dest=dest)
[docs]
def build_release(
self, p_info: RpmPackageInfo, uncompressed_path: str, directory: Sha1Git
) -> Optional[Release]:
# extract tarballs that might be located in the root directory of the rpm
# package and adds a dedicated branch for it in the snapshot
root, _, files = next(walk(uncompressed_path))
for file in files:
file_path = path.join(root, file)
with tempfile.TemporaryDirectory() as tmpdir:
try:
uncompress(file_path, tmpdir)
except Exception:
# not a tarball
continue
tarball_dir = from_disk.Directory.from_disk(
path=tmpdir.encode("utf-8"),
max_content_length=self.max_content_size,
)
contents, skipped_contents, directories = from_disk.iter_directory(
tarball_dir
)
self.storage.skipped_content_add(skipped_contents)
self.storage.content_add(contents)
self.storage.directory_add(directories)
self.tarball_branches[file.encode()] = {
"target_type": "directory",
"target": tarball_dir.hash,
}
msg = (
f"Synthetic release for RPM source package {p_info.name} "
f"version {p_info.intrinsic_version}\n"
)
return Release(
name=p_info.intrinsic_version.encode(),
message=msg.encode(),
author=EMPTY_AUTHOR,
date=TimestampWithTimezone.from_iso8601(p_info.build_time),
target=directory,
target_type=ObjectType.DIRECTORY,
synthetic=True,
)