Source code for swh.indexer.origin_head

# Copyright (C) 2018-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import re
from typing import List, Optional, Tuple, Union

from swh.model.model import Snapshot, SnapshotBranch, SnapshotTargetType
from swh.model.swhids import CoreSWHID, ObjectType
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.storage.interface import PartialBranches, StorageInterface


[docs] def get_head_swhid(storage: StorageInterface, origin_url: str) -> Optional[CoreSWHID]: """Returns the SWHID of the head revision or release of an origin""" visit_status = origin_get_latest_visit_status( storage, origin_url, allowed_statuses=["full"], require_snapshot=True ) if not visit_status: return None assert visit_status.snapshot is not None if visit_status.type == "ftp": # We need to fetch all branches in order to find the largest one snapshot = snapshot_get_all_branches(storage, visit_status.snapshot) if snapshot is None: return None return _try_get_ftp_head(storage, snapshot) else: # Peak into the snapshot, without fetching too many refs. # If the snapshot is small, this gets all of it in a single request. # If the snapshot is large, we will query specific branches as we need them. partial_branches = storage.snapshot_get_branches( visit_status.snapshot, branches_count=100 ) if partial_branches is None: # Snapshot does not exist return None return _try_get_head_generic(storage, partial_branches)
_archive_filename_re = re.compile( rb"^" rb"(?P<pkgname>.*)[-_]" rb"(?P<version>[0-9]+(\.[0-9])*)" rb"(?P<preversion>[-+][a-zA-Z0-9.~]+?)?" rb"(?P<extension>(\.[a-zA-Z0-9]+)+)" rb"$" ) def _parse_version(filename: bytes) -> Tuple[Union[float, int, str], ...]: """Extracts the release version from an archive filename, to get an ordering whose maximum is likely to be the last version of the software >>> _parse_version(b'foo') (-inf,) >>> _parse_version(b'foo.tar.gz') (-inf,) >>> _parse_version(b'gnu-hello-0.0.1.tar.gz') (0, 0, 1, 0) >>> _parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') (0, 0, 1, -1, 'beta2') >>> _parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') (0, 0, 1, 1, 'foobar') """ res = _archive_filename_re.match(filename) if res is None: return (float("-infinity"),) version: List[Union[float, int, str]] = [ int(n) for n in res.group("version").decode().split(".") ] if res.group("preversion") is None: version.append(0) else: preversion = res.group("preversion").decode() if preversion.startswith("-"): version.append(-1) version.append(preversion[1:]) elif preversion.startswith("+"): version.append(1) version.append(preversion[1:]) else: assert False, res.group("preversion") return tuple(version) def _try_get_ftp_head( storage: StorageInterface, snapshot: Snapshot ) -> Optional[CoreSWHID]: archive_names = list(snapshot.branches) max_archive_name = max(archive_names, key=_parse_version) return _try_resolve_target( storage, {"id": snapshot.id, "branches": dict(snapshot.branches), "next_branch": None}, branch_name=max_archive_name, ) def _try_get_head_generic( storage: StorageInterface, partial_branches: PartialBranches ) -> Optional[CoreSWHID]: # Works on 'deposit', 'pypi', and VCSs. return _try_resolve_target( storage, partial_branches, branch_name=b"HEAD" ) or _try_resolve_target(storage, partial_branches, branch_name=b"master") def _get_branch( storage: StorageInterface, partial_branches: PartialBranches, branch_name: bytes ) -> Optional[SnapshotBranch]: """Given a ``branch_name``, gets it from ``partial_branches`` if present, and fetches it from the storage otherwise.""" if branch_name in partial_branches["branches"]: return partial_branches["branches"][branch_name] elif partial_branches["next_branch"] is not None: # Branch is not in `partial_branches`, and `partial_branches` indeed partial res = storage.snapshot_get_branches( partial_branches["id"], branches_from=branch_name, branches_count=1 ) assert res is not None, "Snapshot does not exist anymore" return res["branches"].get(branch_name) else: # Branch is not in `partial_branches`, but `partial_branches` is the full # list of branches, which means it is a dangling reference. return None def _try_resolve_target( storage: StorageInterface, partial_branches: PartialBranches, branch_name: bytes ) -> Optional[CoreSWHID]: try: branch = _get_branch(storage, partial_branches, branch_name) if branch is None: return None while branch.target_type == SnapshotTargetType.ALIAS: branch = _get_branch(storage, partial_branches, branch.target) if branch is None: return None if branch.target_type == SnapshotTargetType.REVISION: return CoreSWHID(object_type=ObjectType.REVISION, object_id=branch.target) elif branch.target_type == SnapshotTargetType.CONTENT: return None # TODO elif branch.target_type == SnapshotTargetType.DIRECTORY: return None # TODO elif branch.target_type == SnapshotTargetType.RELEASE: return CoreSWHID(object_type=ObjectType.RELEASE, object_id=branch.target) else: assert False, branch except KeyError: return None