Source code for swh.web.utils.identifiers

# Copyright (C) 2020-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

from typing import Any, Dict, Iterable, List, Mapping, Optional, TypedDict
from urllib.parse import quote, unquote

from swh.model.exceptions import ValidationError
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.swhids import CoreSWHID, ObjectType, QualifiedSWHID
from swh.web.utils import archive, reverse
from swh.web.utils.exc import BadInputExc
from swh.web.utils.typing import SnapshotContext, SWHIDContext, SWHIDInfo, SWHObjectInfo


[docs] def parse_object_type(object_type: str) -> ObjectType: try: return ObjectType[object_type.upper()] except KeyError: valid_types = ", ".join(variant.name.lower() for variant in ObjectType) raise BadInputExc( f"Invalid swh object type! Valid types are {valid_types}; not {object_type}" )
[docs] def gen_swhid( object_type: ObjectType, object_id: str, scheme_version: int = 1, metadata: SWHIDContext = {}, ) -> str: """ Returns the SoftWare Hash IDentifier for a swh object based on: * the object type * the object id * the SWHID scheme version Args: object_type: the swh object type (content/directory/release/revision/snapshot) object_id: the swh object id (hexadecimal representation of its hash value) scheme_version: the scheme version of the SWHIDs Returns: the SWHID of the object Raises: BadInputExc: if the provided parameters do not enable to generate a valid identifier """ try: decoded_object_id = hash_to_bytes(object_id) obj_swhid = str( QualifiedSWHID( object_type=object_type, object_id=decoded_object_id, scheme_version=scheme_version, **metadata, ) ) except (ValidationError, KeyError, ValueError) as e: raise BadInputExc("Invalid object (%s) for SWHID. %s" % (object_id, e)) else: return obj_swhid
[docs] class ResolvedSWHID(TypedDict): """parsed SWHID with context""" swhid_parsed: QualifiedSWHID """URL to browse object according to SWHID context""" browse_url: Optional[str]
[docs] def resolve_swhid( swhid: str, query_params: Optional[Mapping[str, str]] = None ) -> ResolvedSWHID: """ Try to resolve a SoftWare Hash IDentifier into an url for browsing the targeted object. Args: swhid: a SoftWare Hash IDentifier query_params: optional dict filled with query parameters to append to the browse url Returns: a dict with the following keys: * **swhid_parsed**: the parsed identifier * **browse_url**: the url for browsing the targeted object """ swhid_parsed = get_qualified_swhid(swhid) object_type = swhid_parsed.object_type object_id = swhid_parsed.object_id browse_url = None url_args = {} fragment = "" process_lines = object_type == ObjectType.CONTENT query_dict: Dict[str, str] = dict(query_params or {}) if swhid_parsed.origin: origin_url = unquote(swhid_parsed.origin) origin_url = archive.lookup_origin(origin_url)["url"] query_dict["origin_url"] = origin_url if swhid_parsed.path and swhid_parsed.path != b"/": query_dict["path"] = swhid_parsed.path.decode("utf8", errors="replace") if swhid_parsed.anchor: directory = b"" if swhid_parsed.anchor.object_type == ObjectType.DIRECTORY: directory = swhid_parsed.anchor.object_id elif swhid_parsed.anchor.object_type == ObjectType.REVISION: revision = archive.lookup_revision( hash_to_hex(swhid_parsed.anchor.object_id) ) directory = revision["directory"] elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: release = archive.lookup_release( hash_to_hex(swhid_parsed.anchor.object_id) ) if release["target_type"] == ObjectType.REVISION.name.lower(): revision = archive.lookup_revision(release["target"]) directory = revision["directory"] elif release["target_type"] == ObjectType.DIRECTORY.name.lower(): directory = release["target"] if object_type == ObjectType.CONTENT: if ( not swhid_parsed.origin and swhid_parsed.anchor.object_type != ObjectType.REVISION ): # when no origin or revision context, content objects need to have # their path prefixed by root directory id for breadcrumbs display query_dict["path"] = hash_to_hex(directory) + query_dict["path"] elif query_dict["path"] is not None: # remove leading slash from SWHID content path query_dict["path"] = query_dict["path"].lstrip("/") elif object_type == ObjectType.DIRECTORY and query_dict["path"] is not None: object_id = directory # remove leading and trailing slashes from SWHID directory path query_dict["path"] = query_dict["path"].strip("/") # snapshot context if swhid_parsed.visit: if swhid_parsed.visit.object_type != ObjectType.SNAPSHOT: raise BadInputExc("Visit must be a snapshot SWHID.") query_dict["snapshot"] = hash_to_hex(swhid_parsed.visit.object_id) if swhid_parsed.anchor: if ( swhid_parsed.anchor.object_type == ObjectType.REVISION and object_type != ObjectType.REVISION ): query_dict["revision"] = hash_to_hex(swhid_parsed.anchor.object_id) elif swhid_parsed.anchor.object_type == ObjectType.RELEASE: release = archive.lookup_release( hash_to_hex(swhid_parsed.anchor.object_id) ) if release: query_dict["release"] = release["name"] # browsing content or directory without snapshot context elif ( object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY) and swhid_parsed.anchor ): if swhid_parsed.anchor.object_type == ObjectType.REVISION: # anchor revision, objects are browsed from its view object_type = ObjectType.REVISION object_id = swhid_parsed.anchor.object_id elif ( object_type == ObjectType.DIRECTORY and swhid_parsed.anchor.object_type == ObjectType.DIRECTORY ): # a directory is browsed from its root object_id = swhid_parsed.anchor.object_id if object_type == ObjectType.CONTENT: url_args["query_string"] = f"sha1_git:{hash_to_hex(object_id)}" elif object_type in (ObjectType.DIRECTORY, ObjectType.RELEASE, ObjectType.REVISION): url_args["sha1_git"] = hash_to_hex(object_id) elif object_type == ObjectType.SNAPSHOT: url_args["snapshot_id"] = hash_to_hex(object_id) if swhid_parsed.lines and process_lines: lines = swhid_parsed.lines fragment += "#L" + str(lines[0]) if lines[1]: fragment += "-L" + str(lines[1]) if url_args: browse_url = ( reverse( f"browse-{object_type.name.lower()}", url_args=url_args, query_params=query_dict, ) + fragment ) return ResolvedSWHID(swhid_parsed=swhid_parsed, browse_url=browse_url)
[docs] def get_qualified_swhid(swhid: str) -> QualifiedSWHID: """Leniently check if a qualified SWHID is valid and return it parsed. This allows a superset of core SWHIDs, which are badly capitalized or quoted. Args: swhid: a SoftWare Hash IDentifier. Raises: BadInputExc: if the provided SWHID cannot be parsed. Return: A parsed SWHID. """ try: # ensure core part of SWHID is in lower case to avoid parsing error (core, sep, qualifiers) = swhid.partition(";") core = core.lower() # quoted white spaces might have been automatically unquoted when a SWHID # is passed as URL argument so ensure to quote them back qualifiers = qualifiers.replace(" ", "%20") return QualifiedSWHID.from_string(core + sep + qualifiers) except ValidationError as ve: raise BadInputExc("Error when parsing identifier: %s" % " ".join(ve.messages))
[docs] def parse_core_swhid(swhid: str) -> CoreSWHID: """Check if a core SWHID is valid and return it parsed. Args: swhid: a SoftWare Hash IDentifier. Raises: BadInputExc: if the provided SWHID cannot be parsed. Return: A parsed SWHID. """ try: return CoreSWHID.from_string(swhid) except ValidationError as ve: raise BadInputExc(f"Error when parsing identifier: {' '.join(ve.messages)}") except ValueError as e: raise BadInputExc(f"Error when parsing identifier: {e}")
[docs] def group_swhids( swhids: Iterable[CoreSWHID], ) -> Dict[ObjectType, List[bytes]]: """ Groups many SoftWare Hash IDentifiers into a dictionary depending on their type. Args: swhids: an iterable of SoftWare Hash IDentifier objects Returns: A dictionary with: keys: object types values: object hashes """ swhids_by_type: Dict[ObjectType, List[bytes]] = { ObjectType.CONTENT: [], ObjectType.DIRECTORY: [], ObjectType.REVISION: [], ObjectType.RELEASE: [], ObjectType.SNAPSHOT: [], } for obj_swhid in swhids: obj_id = obj_swhid.object_id obj_type = obj_swhid.object_type swhids_by_type[obj_type].append(hash_to_bytes(obj_id)) return swhids_by_type
[docs] def get_swhids_info( swh_objects: Iterable[SWHObjectInfo], snapshot_context: Optional[SnapshotContext] = None, extra_context: Optional[Mapping[str, Any]] = None, ) -> List[SWHIDInfo]: """ Returns a list of dict containing info related to SWHIDs of objects. Args: swh_objects: an iterable of dict describing archived objects snapshot_context: optional dict parameter describing the snapshot in which the objects have been found extra_context: optional dict filled with extra contextual info about the objects Returns: a list of dict containing SWHIDs info """ swhids_info = [] for swh_object in swh_objects: if not swh_object["object_id"]: swhids_info.append( SWHIDInfo( object_type=swh_object["object_type"], object_id="", swhid="", swhid_url="", context={}, swhid_with_context=None, swhid_with_context_url=None, ) ) continue object_type = swh_object["object_type"] object_id = swh_object["object_id"] swhid_context: SWHIDContext = {} if snapshot_context: if snapshot_context["origin_info"] is not None: swhid_context["origin"] = quote( snapshot_context["origin_info"]["url"], safe="/?:@&" ) if object_type != ObjectType.SNAPSHOT: swhid_context["visit"] = gen_swhid( ObjectType.SNAPSHOT, snapshot_context["snapshot_id"] ) if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY): if snapshot_context["release_id"] is not None: swhid_context["anchor"] = gen_swhid( ObjectType.RELEASE, snapshot_context["release_id"] ) elif snapshot_context["revision_id"] is not None: swhid_context["anchor"] = gen_swhid( ObjectType.REVISION, snapshot_context["revision_id"] ) if object_type in (ObjectType.CONTENT, ObjectType.DIRECTORY): if ( extra_context and "revision" in extra_context and extra_context["revision"] and "anchor" not in swhid_context ): swhid_context["anchor"] = gen_swhid( ObjectType.REVISION, extra_context["revision"] ) elif ( extra_context and "root_directory" in extra_context and extra_context["root_directory"] and "anchor" not in swhid_context and ( object_type != ObjectType.DIRECTORY or extra_context["root_directory"] != object_id ) ): swhid_context["anchor"] = gen_swhid( ObjectType.DIRECTORY, extra_context["root_directory"] ) path = None if extra_context and "path" in extra_context: path = extra_context["path"] or "/" if "filename" in extra_context and object_type == ObjectType.CONTENT: path += extra_context["filename"] if object_type == ObjectType.DIRECTORY and path == "/": path = None if path: swhid_context["path"] = quote(path, safe="/?:@&") swhid = gen_swhid(object_type, object_id) swhid_url = reverse("browse-swhid", url_args={"swhid": swhid}) swhid_with_context = None swhid_with_context_url = None if swhid_context: swhid_with_context = gen_swhid( object_type, object_id, metadata=swhid_context ) swhid_with_context_url = reverse( "browse-swhid", url_args={"swhid": quote(swhid_with_context, safe=":;=/%")}, ) swhids_info.append( SWHIDInfo( object_type=object_type, object_id=object_id, swhid=swhid, swhid_url=swhid_url, context=swhid_context, swhid_with_context=swhid_with_context, swhid_with_context_url=swhid_with_context_url, ) ) return swhids_info