Source code for swh.loader.git.converters

# Copyright (C) 2015-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Convert dulwich objects to dictionaries suitable for swh.storage"""

from typing import Any, Dict, Optional

from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes
from swh.model.model import (
    BaseContent,
    Content,
    Directory,
    DirectoryEntry,
    ObjectType,
    Person,
    Release,
    Revision,
    RevisionType,
    SkippedContent,
    TargetType,
    Timestamp,
    TimestampWithTimezone,
)

HASH_ALGORITHMS = DEFAULT_ALGORITHMS - {"sha1_git"}


[docs]def dulwich_blob_to_content_id(blob) -> Dict[str, Any]: """Convert a dulwich blob to a Software Heritage content id""" if blob.type_name != b"blob": raise ValueError("Argument is not a blob.") size = blob.raw_length() data = blob.as_raw_string() hashes = MultiHash.from_data(data, HASH_ALGORITHMS).digest() hashes["sha1_git"] = blob.sha().digest() hashes["length"] = size return hashes
[docs]def dulwich_blob_to_content(blob, max_content_size=None) -> BaseContent: """Convert a dulwich blob to a Software Heritage content """ if blob.type_name != b"blob": raise ValueError("Argument is not a blob.") hashes = dulwich_blob_to_content_id(blob) if max_content_size is not None and hashes["length"] >= max_content_size: return SkippedContent(status="absent", reason="Content too large", **hashes,) else: return Content(data=blob.as_raw_string(), status="visible", **hashes,)
[docs]def dulwich_tree_to_directory(tree, log=None) -> Directory: """Format a tree as a directory""" if tree.type_name != b"tree": raise ValueError("Argument is not a tree.") entries = [] entry_mode_map = { 0o040000: "dir", 0o160000: "rev", 0o100644: "file", 0o100755: "file", 0o120000: "file", } for entry in tree.iteritems(): entries.append( DirectoryEntry( type=entry_mode_map.get(entry.mode, "file"), perms=entry.mode, name=entry.path, target=hash_to_bytes(entry.sha.decode("ascii")), ) ) return Directory(id=tree.sha().digest(), entries=tuple(entries),)
[docs]def parse_author(name_email: bytes) -> Person: """Parse an author line""" return Person.from_fullname(name_email)
[docs]def dulwich_tsinfo_to_timestamp( timestamp, timezone, timezone_neg_utc ) -> TimestampWithTimezone: """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" return TimestampWithTimezone( timestamp=Timestamp(seconds=int(timestamp), microseconds=0,), offset=timezone // 60, negative_utc=timezone_neg_utc if timezone == 0 else False, )
[docs]def dulwich_commit_to_revision(commit, log=None) -> Revision: if commit.type_name != b"commit": raise ValueError("Argument is not a commit.") git_metadata = [] if commit.encoding is not None: git_metadata.append((b"encoding", commit.encoding)) if commit.mergetag: for mergetag in commit.mergetag: raw_string = mergetag.as_raw_string() assert raw_string.endswith(b"\n") git_metadata.append((b"mergetag", raw_string[:-1])) if commit.extra: git_metadata.extend((k, v) for k, v in commit.extra) if commit.gpgsig: git_metadata.append((b"gpgsig", commit.gpgsig)) return Revision( id=commit.sha().digest(), author=parse_author(commit.author), date=dulwich_tsinfo_to_timestamp( commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc, ), committer=parse_author(commit.committer), committer_date=dulwich_tsinfo_to_timestamp( commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc, ), type=RevisionType.GIT, directory=bytes.fromhex(commit.tree.decode()), message=commit.message, metadata=None, extra_headers=tuple(git_metadata), synthetic=False, parents=tuple(bytes.fromhex(p.decode()) for p in commit.parents), )
DULWICH_TARGET_TYPES = { b"blob": TargetType.CONTENT, b"tree": TargetType.DIRECTORY, b"commit": TargetType.REVISION, b"tag": TargetType.RELEASE, } DULWICH_OBJECT_TYPES = { b"blob": ObjectType.CONTENT, b"tree": ObjectType.DIRECTORY, b"commit": ObjectType.REVISION, b"tag": ObjectType.RELEASE, }
[docs]def dulwich_tag_to_release(tag, log=None) -> Release: if tag.type_name != b"tag": raise ValueError("Argument is not a tag.") target_type, target = tag.object if tag.tagger: author: Optional[Person] = parse_author(tag.tagger) if not tag.tag_time: date = None else: date = dulwich_tsinfo_to_timestamp( tag.tag_time, tag.tag_timezone, tag._tag_timezone_neg_utc, ) else: author = date = None return Release( id=tag.sha().digest(), author=author, date=date, name=tag.name, target=bytes.fromhex(target.decode()), target_type=DULWICH_OBJECT_TYPES[target_type.type_name], message=tag._message, metadata=None, synthetic=False, )