Source code for swh.loader.package.debian.loader

# Copyright (C) 2017-2021  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from os import path
import re
import subprocess
from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple

import attr
from dateutil.parser import parse as parse_date
from debian.changelog import Changelog
from debian.deb822 import Dsc

from swh.loader.package.loader import BasePackageInfo, PackageLoader, PartialExtID
from swh.loader.package.utils import download, release_name
from swh.model.hashutil import hash_to_bytes
from swh.model.model import ObjectType, Person, Release, Sha1Git, TimestampWithTimezone
from import StorageInterface

logger = logging.getLogger(__name__)
UPLOADERS_SPLIT = re.compile(r"(?<=\>)\s*,\s*")

EXTID_TYPE = "dsc-sha256"

[docs]class DscCountError(ValueError): """Raised when an unexpected number of .dsc files is seen""" pass
[docs]@attr.s class DebianFileMetadata: name = attr.ib(type=str) """Filename""" size = attr.ib(type=int) uri = attr.ib(type=str) """URL of this specific file""" # all checksums are not always available, make them optional sha256 = attr.ib(type=str, default="") md5sum = attr.ib(type=str, default="") sha1 = attr.ib(type=str, default="") # Some of the DSC files imported in swh apparently had a Checksums-SHA512 # field which got recorded in the archive. Current versions of dpkg-source # don't seem to generate them, but keep the field available for # future-proofing. sha512 = attr.ib(type=str, default="")
[docs]@attr.s class DebianPackageChangelog: person = attr.ib(type=Dict[str, str]) """A dict with fields like, model.Person, except they are str instead of bytes, and 'email' is optional.""" date = attr.ib(type=str) """Date of the changelog entry.""" history = attr.ib(type=List[Tuple[str, str]]) """List of tuples (package_name, version)"""
[docs]@attr.s class DebianPackageInfo(BasePackageInfo): raw_info = attr.ib(type=Dict[str, Any]) files = attr.ib(type=Dict[str, DebianFileMetadata]) """Metadata of the files (.deb, .dsc, ...) of the package.""" name = attr.ib(type=str) intrinsic_version = attr.ib(type=str) """eg. ``0.7.2-3``, while :attr:`version` would be ``stretch/contrib/0.7.2-3``"""
[docs] @classmethod def from_metadata( cls, a_metadata: Dict[str, Any], url: str, version: str ) -> "DebianPackageInfo": intrinsic_version = a_metadata["version"] assert "/" in version and "/" not in intrinsic_version, ( version, intrinsic_version, ) return cls( url=url, filename=None, version=version, raw_info=a_metadata, files={ file_name: DebianFileMetadata(**file_metadata) for (file_name, file_metadata) in a_metadata.get("files", {}).items() }, name=a_metadata["name"], intrinsic_version=intrinsic_version, )
[docs] def extid(self) -> Optional[PartialExtID]: dsc_files = [ file for (name, file) in self.files.items() if name.endswith(".dsc") ] if len(dsc_files) != 1: raise DscCountError( f"Expected exactly one .dsc file for package {}, " f"got {len(dsc_files)}" ) return (EXTID_TYPE, EXTID_VERSION, hash_to_bytes(dsc_files[0].sha256))
[docs]@attr.s class IntrinsicPackageMetadata: """Metadata extracted from a package's .dsc file.""" name = attr.ib(type=str) version = attr.ib(type=str) changelog = attr.ib(type=DebianPackageChangelog) maintainers = attr.ib(type=List[Dict[str, str]]) """A list of dicts with fields like, model.Person, except they are str instead of bytes, and 'email' is optional."""
[docs]class DebianLoader(PackageLoader[DebianPackageInfo]): """Load debian origins into swh archive.""" visit_type = "deb" def __init__( self, storage: StorageInterface, url: str, packages: Mapping[str, Any], **kwargs: Any, ): """Debian Loader implementation. Args: url: Origin url (e.g. deb://Debian/packages/cicero) date: Ignored packages: versioned packages and associated artifacts, example:: { 'stretch/contrib/0.7.2-3': { 'name': 'cicero', 'version': '0.7.2-3' 'files': { 'cicero_0.7.2-3.diff.gz': { 'md5sum': 'a93661b6a48db48d59ba7d26796fc9ce', 'name': 'cicero_0.7.2-3.diff.gz', 'sha256': 'f039c9642fe15c75bed5254315e2a29f...', 'size': 3964, 'uri': 'http://d.d.o/cicero_0.7.2-3.diff.gz', }, 'cicero_0.7.2-3.dsc': { 'md5sum': 'd5dac83eb9cfc9bb52a15eb618b4670a', 'name': 'cicero_0.7.2-3.dsc', 'sha256': '35b7f1048010c67adfd8d70e4961aefb...', 'size': 1864, 'uri': 'http://d.d.o/cicero_0.7.2-3.dsc', }, 'cicero_0.7.2.orig.tar.gz': { 'md5sum': '4353dede07c5728319ba7f5595a7230a', 'name': 'cicero_0.7.2.orig.tar.gz', 'sha256': '63f40f2436ea9f67b44e2d4bd669dbab...', 'size': 96527, 'uri': 'http://d.d.o/cicero_0.7.2.orig.tar.gz', } }, }, # ... } """ super().__init__(storage=storage, url=url, **kwargs) self.packages = packages
[docs] def get_versions(self) -> Sequence[str]: """Returns the keys of the packages input (e.g. stretch/contrib/0.7.2-3, etc...) """ return list(self.packages.keys())
[docs] def get_package_info(self, version: str) -> Iterator[Tuple[str, DebianPackageInfo]]: meta = self.packages[version] p_info = DebianPackageInfo.from_metadata( meta, url=self.origin.url, version=version ) yield release_name(version), p_info
[docs] def download_package( self, p_info: DebianPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: """Contrary to other package loaders (1 package, 1 artifact), `p_info.files` represents the package's datafiles set to fetch: - <package-version>.orig.tar.gz - <package-version>.dsc - <package-version>.diff.gz This is delegated to the `download_package` function. """ all_hashes = download_package(p_info, tmpdir) logger.debug("all_hashes: %s", all_hashes) res = [] for hashes in all_hashes.values(): res.append((tmpdir, hashes)) logger.debug("res: %s", res) return res
[docs] def uncompress( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str ) -> str: logger.debug("dl_artifacts: %s", dl_artifacts) return extract_package(dl_artifacts, dest=dest)
[docs] def build_release( self, p_info: DebianPackageInfo, uncompressed_path: str, directory: Sha1Git, ) -> Optional[Release]: dsc_url, dsc_name = dsc_information(p_info) if not dsc_name: raise ValueError("dsc name for url %s should not be None" % dsc_url) dsc_path = path.join(path.dirname(uncompressed_path), dsc_name) intrinsic_metadata = get_intrinsic_package_metadata( p_info, dsc_path, uncompressed_path ) logger.debug("intrinsic_metadata: %s", intrinsic_metadata) logger.debug("p_info: %s", p_info) msg = ( f"Synthetic release for Debian source package {} " f"version {p_info.intrinsic_version}\n" ) author = prepare_person(intrinsic_metadata.changelog.person) date = TimestampWithTimezone.from_iso8601( # inspired from swh.loader.debian.converters.package_metadata_to_revision return Release( name=p_info.intrinsic_version.encode(), message=msg.encode(), author=author, date=date, target=directory, target_type=ObjectType.DIRECTORY, synthetic=True, )
[docs]def uid_to_person(uid: str) -> Dict[str, str]: """Convert an uid to a person suitable for insertion. Args: uid: an uid of the form "Name <email@ddress>" Returns: a dictionary with the following keys: - name: the name associated to the uid - email: the mail associated to the uid - fullname: the actual uid input """ person = Person.from_fullname(uid.encode("utf-8")) return {k: v.decode("utf-8") for k, v in person.to_dict().items() if v is not None}
[docs]def prepare_person(person: Mapping[str, str]) -> Person: """Prepare person for swh serialization... Args: A person dict Returns: A person ready for storage """ return Person.from_dict( {key: value.encode("utf-8") for (key, value) in person.items()} )
[docs]def download_package(p_info: DebianPackageInfo, tmpdir: Any) -> Mapping[str, Any]: """Fetch a source package in a temporary directory and check the checksums for all files. Args: p_info: Information on a package tmpdir: Where to download and extract the files to ingest Returns: Dict of swh hashes per filename key """ all_hashes = {} for filename, fileinfo in p_info.files.items(): uri = fileinfo.uri logger.debug("fileinfo: %s", fileinfo) extrinsic_hashes = {} if fileinfo.md5sum: extrinsic_hashes["md5"] = fileinfo.md5sum if fileinfo.sha256: extrinsic_hashes["sha256"] = fileinfo.sha256 if fileinfo.sha1: extrinsic_hashes["sha1"] = fileinfo.sha1 logger.debug("extrinsic_hashes(%s): %s", filename, extrinsic_hashes) _, hashes = download( uri, dest=tmpdir, filename=filename, hashes=extrinsic_hashes ) all_hashes[filename] = hashes logger.debug("all_hashes: %s", all_hashes) return all_hashes
[docs]def dsc_information(p_info: DebianPackageInfo) -> Tuple[Optional[str], Optional[str]]: """Retrieve dsc information from a package. Args: p_info: Package metadata information Returns: Tuple of dsc file's uri, dsc's full disk path """ dsc_name = None dsc_url = None for filename, fileinfo in p_info.files.items(): if filename.endswith(".dsc"): if dsc_name: raise DscCountError( "Package %s_%s references several dsc files." % (, p_info.intrinsic_version) ) dsc_url = fileinfo.uri dsc_name = filename return dsc_url, dsc_name
[docs]def extract_package(dl_artifacts: List[Tuple[str, Mapping]], dest: str) -> str: """Extract a Debian source package to a given directory. Note that after extraction the target directory will be the root of the extracted package, rather than containing it. Args: package: package information dictionary dest: directory where the package files are stored Returns: Package extraction directory """ a_path = dl_artifacts[0][0] logger.debug("dl_artifacts: %s", dl_artifacts) for _, hashes in dl_artifacts: logger.debug("hashes: %s", hashes) filename = hashes["filename"] if filename.endswith(".dsc"): dsc_name = filename break dsc_path = path.join(a_path, dsc_name) destdir = path.join(dest, "extracted") logfile = path.join(dest, "extract.log") logger.debug( "extract Debian source package %s in %s" % (dsc_path, destdir), extra={ "swh_type": "deb_extract", "swh_dsc": dsc_path, "swh_destdir": destdir, }, ) cmd = [ "dpkg-source", "--no-copy", "--no-check", "--ignore-bad-version", "-x", dsc_path, destdir, ] try: with open(logfile, "w") as stdout: subprocess.check_call(cmd, stdout=stdout, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: logdata = open(logfile, "r").read() raise ValueError( "dpkg-source exited with code %s: %s" % (e.returncode, logdata) ) from None return destdir
[docs]def get_intrinsic_package_metadata( p_info: DebianPackageInfo, dsc_path: str, extracted_path: str ) -> IntrinsicPackageMetadata: """Get the package metadata from the source package at dsc_path, extracted in extracted_path. Args: p_info: the package information dsc_path: path to the package's dsc file extracted_path: the path where the package got extracted Returns: dict: a dictionary with the following keys: - history: list of (package_name, package_version) tuples parsed from the package changelog """ with open(dsc_path, "rb") as dsc: parsed_dsc = Dsc(dsc) # Parse the changelog to retrieve the rest of the package information changelog_path = path.join(extracted_path, "debian/changelog") with open(changelog_path, "rb") as changelog_file: try: parsed_changelog = Changelog(changelog_file) except UnicodeDecodeError: logger.warning( "Unknown encoding for changelog %s," " falling back to iso" % changelog_path, extra={ "swh_type": "deb_changelog_encoding", "swh_name":, "swh_version": str(p_info.version), "swh_changelog": changelog_path, }, ) # need to reset as Changelog scrolls to the end of the file parsed_changelog = Changelog(changelog_file, encoding="iso-8859-15") history: List[Tuple[str, str]] = [] for block in parsed_changelog: assert block.package is not None and block._raw_version is not None history.append((block.package, block._raw_version)) changelog = DebianPackageChangelog( person=uid_to_person(, date=parse_date(, history=history[1:], ) maintainers = [ uid_to_person(parsed_dsc["Maintainer"]), ] maintainers.extend( uid_to_person(person) for person in UPLOADERS_SPLIT.split(parsed_dsc.get("Uploaders", "")) ) return IntrinsicPackageMetadata(, version=str(p_info.intrinsic_version), changelog=changelog, maintainers=maintainers, )