Source code for swh.provenance.backend.graph

# Copyright (C) 2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from time import monotonic
from typing import List, Optional

from google.protobuf.field_mask_pb2 import FieldMask
import grpc

from swh.graph.grpc.swhgraph_pb2 import GraphDirection, NodeFilter, TraversalRequest
from swh.graph.grpc.swhgraph_pb2_grpc import TraversalServiceStub
from swh.model.swhids import CoreSWHID
from swh.model.swhids import ObjectType as SWHIDType
from swh.model.swhids import QualifiedSWHID

logger = logging.getLogger(__name__)


[docs] class GraphProvenance: def __init__(self, url, max_edges=10000): """Provenance instance using a swh-graph GRPC backend Args: url: the location of the GRPC server; should be of the form: "<host>:<port>" max_edges: maximum number of edges that can be fetched by a traversal query; for more details, see: https://docs.softwareheritage.org/devel/swh-graph/grpc-api.html#limiting-the-traversal """ self.graph_url = url self._channel = grpc.insecure_channel(self.graph_url) self._stub = TraversalServiceStub(self._channel) self._max_edges = max_edges
[docs] def check_config(self) -> bool: return True
def _get_anchor(self, swhid: CoreSWHID, leaf_type) -> Optional[CoreSWHID]: """Find some top level object that contains the argument The search focus on `leaf_type`, that can be either "rel" or "rev". However if you pass a `shwid` for an higher level object, you will get it back as is. Return a SWHID or None is nothing of the requested type is found. """ if swhid.object_type in (SWHIDType.RELEASE, SWHIDType.SNAPSHOT): # We won't find anything better than the object already passed return swhid if swhid.object_type == SWHIDType.REVISION and leaf_type == "rev": # We are requesting a revision but we already have a revision, so # return that. return swhid src = [str(swhid)] if leaf_type == "rel": edges = "dir:dir,cnt:dir,dir:rev,rev:rel,dir:rel,cnt:rel" elif leaf_type == "rev": edges = "dir:dir,cnt:dir,dir:rev" else: raise ValueError(leaf_type) anchor_search = TraversalRequest( src=src, edges=edges, direction=GraphDirection.BACKWARD, max_edges=self._max_edges, return_nodes=NodeFilter(types=leaf_type), mask=FieldMask(paths=["swhid"]), max_matching_nodes=1, ) try: t0 = monotonic() resp = list(self._stub.Traverse(anchor_search)) except grpc.RpcError as exc: if exc.code() == grpc.StatusCode.NOT_FOUND: logger.debug("SWHID %s anchor: not found", swhid) return None d = exc.details() if ( exc.code() == grpc.StatusCode.INVALID_ARGUMENT and d is not None and d.startswith("Unknown SWHID:") ): # for java… return None logger.debug("SWHID %s anchor: GRPC error %s", swhid, exc) raise finally: logger.debug( "SWHID %s anchor query took %.2fms", swhid, (monotonic() - t0) * 1000.0 ) if resp: assert len(resp) == 1 node = resp[0] logger.debug("SWHID %s anchor: %s", swhid, resp[0]) return CoreSWHID.from_string(node.swhid) logger.debug("SWHID %s anchor: no result", swhid) return None def _get_origin(self, anchor_swhid: CoreSWHID) -> Optional[str]: """Find the url of an origin associated with an anchor object. If no origin is found, return None.""" if anchor_swhid.object_type not in ( SWHIDType.REVISION, SWHIDType.RELEASE, SWHIDType.SNAPSHOT, ): # we need a revision, or higher raise ValueError(anchor_swhid.object_type) src = [str(anchor_swhid)] origin_search = TraversalRequest( src=src, edges="rev:rev,rev:rel,*:snp,*:ori", direction=GraphDirection.BACKWARD, max_edges=self._max_edges, return_nodes=NodeFilter(types="ori"), max_matching_nodes=1, ) try: t0 = monotonic() resp = list(self._stub.Traverse(origin_search)) except grpc.RpcError as exc: if exc.code() == grpc.StatusCode.NOT_FOUND: logger.debug("SWHID %s origin: not found", anchor_swhid) return None d = exc.details() if ( exc.code() == grpc.StatusCode.INVALID_ARGUMENT and d is not None and d.startswith("Unknown SWHID:") ): # for java… return None logger.debug("SWHID %s origin: GRPC error %s", anchor_swhid, exc) raise finally: logger.debug( "SWHID %s origin query took %.2fms", anchor_swhid, (monotonic() - t0) * 1000.0, ) if resp: assert len(resp) == 1 logger.debug("SWHID %s origin: %s", anchor_swhid, resp[0].ori.url) return resp[0].ori.url logger.debug("SWHID %s origin: no result", anchor_swhid) return None
[docs] def whereis(self, *, swhid: CoreSWHID) -> Optional[QualifiedSWHID]: """Given a SWHID return a QualifiedSWHID with some provenance info: - the release or revision containing that content or directory - the url of the origin containing that content or directory This can also be called for revision, release or snapshot to retrieve origin url information if any. When using a revision, the anchor will be an association release if any. """ anchor = self._get_anchor(swhid, "rel") if anchor is None: anchor = self._get_anchor(swhid, "rev") if anchor is None: return None else: origin = self._get_origin(anchor) if anchor == swhid: # don't anchor releases (and revisions) on themselves anchor = None return QualifiedSWHID( object_type=swhid.object_type, object_id=swhid.object_id, anchor=anchor, origin=origin, )
[docs] def whereare(self, *, swhids: List[CoreSWHID]) -> List[Optional[QualifiedSWHID]]: """Given a SWHID list return a list of provenance info: See `whereis` documentation for details on the provenance info. """ return [self.whereis(swhid=si) for si in swhids]