Source code for swh.graph.find_context

# Copyright (C) 2022-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from hashlib import sha1

from google.protobuf.field_mask_pb2 import FieldMask
import grpc

from swh.graph.grpc.swhgraph_pb2 import (
    FindPathBetweenRequest,
    FindPathToRequest,
    NodeFilter,
    TraversalRequest,
)
from swh.graph.grpc.swhgraph_pb2_grpc import TraversalServiceStub
from swh.model.cli import swhid_of_file
from swh.model.exceptions import ValidationError

# documentation: https://docs.softwareheritage.org/devel/apidoc/swh.model.swhids.html
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID

# global variable holding headers parameters
headers: dict = {}
swhcli: dict = {}
verbose = False

GRAPH_GRPC_SERVER = "localhost:50091"


[docs] def fqswhid_of_traversal(response): # Build the Fully qualified SWHID global verbose fqswhid = [] coreswhidtype = "" needrevision = True needrelease = True path = [] url = "" for node in response.node: # response contains the nodes in the order content -> dir -> revision # -> release -> snapshot -> origin if verbose: print(" examining node: " + node.swhid) if url == "": url = node.ori.url parsedid = ExtendedSWHID.from_string(node.swhid) if parsedid.object_type == ExtendedObjectType.CONTENT: # print(parsedid.object_type) # print(swhcli.get(node.swhid)) pathids = [ label.name.decode() for successor in node.successor for label in successor.label ] path.insert(0, pathids[0]) # raises exception if pathids is empty! fqswhid.append(node.swhid) coreswhidtype = "cnt" if parsedid.object_type == ExtendedObjectType.DIRECTORY: # print(parsedid.object_type) # print(swhcli.get(node.swhid)) if fqswhid: # empty list signals coreswhid not found yet pathids = [ label.name.decode() for successor in node.successor for label in successor.label ] path.insert(0, pathids[0]) # raises exception if pathids is empty! else: fqswhid.append(node.swhid) coreswhidtype = "dir" if parsedid.object_type == ExtendedObjectType.REVISION: if fqswhid: # empty list signals coreswhid not found yet if needrevision: revision = node.swhid needrevision = False else: fqswhid.append(node.swhid) coreswhidtype = "rev" if parsedid.object_type == ExtendedObjectType.RELEASE: if fqswhid: # empty list signals coreswhid not found yet if needrelease: release = node.swhid needrelease = False else: fqswhid.append(node.swhid) coreswhidtype = "rel" if parsedid.object_type == ExtendedObjectType.SNAPSHOT: if fqswhid: # empty list signals coreswhid not found yet snapshot = node.swhid else: fqswhid.append(node.swhid) coreswhidtype = "snp" # Now we have all the elements to print a FQ SWHID # We could also build and return a swh.model.swhids.QualifiedSWHID if coreswhidtype == "cnt" or coreswhidtype == "dir": fqswhid.append("path=/" + "/".join(path)) if not needrevision: fqswhid.append("anchor=" + revision) elif not needrelease: fqswhid.append("anchor=" + release) if snapshot: fqswhid.append("visit=" + snapshot) elif coreswhidtype == "rev" or coreswhidtype == "rel": if snapshot: fqswhid.append("visit=" + snapshot) if url: fqswhid.append("origin=" + url) return ";".join(fqswhid)
[docs] def main( content_swhid, origin_url, all_origins, random_origin, filename, graph_grpc_server, fqswhid, trace, ): global headers global swhcli global verbose verbose = trace # Check if content SWHID is valid try: CoreSWHID.from_string(content_swhid) except ValidationError: print(f"Error: '{content_swhid}' is not a valid SWHID") return with grpc.insecure_channel(graph_grpc_server) as channel: client = TraversalServiceStub(channel) field_mask = FieldMask( paths=[ "node.swhid", "node.ori.url", "node.successor.swhid", "node.successor.label.name", ] ) try: if filename: content_swhid = str(swhid_of_file(filename)) # Traversal request: get all origins if all_origins: random_origin = False response = client.Traverse( TraversalRequest( src=[content_swhid], edges="cnt:dir,dir:dir,dir:rev,rev:rev,rev:snp,rev:rel,rel:snp,snp:ori", direction="BACKWARD", return_nodes=NodeFilter(types="ori"), mask=field_mask, ) ) for node in response: if fqswhid: response = client.FindPathBetween( FindPathBetweenRequest( src=[content_swhid], dst=[node.swhid], direction="BACKWARD", mask=field_mask, ) ) print(fqswhid_of_traversal(response)) else: print(node.ori.url) # Traversal request to a (random) origin if random_origin: response = client.FindPathTo( FindPathToRequest( src=[content_swhid], target=NodeFilter(types="ori"), direction="BACKWARD", mask=field_mask, ) ) if fqswhid: print(fqswhid_of_traversal(response)) else: for node in response.node: print(node.ori.url) # Traversal request to a given origin URL if origin_url: response = client.FindPathBetween( FindPathBetweenRequest( src=[content_swhid], dst=[ str( ExtendedSWHID( object_type=ExtendedObjectType.ORIGIN, object_id=bytes.fromhex( sha1(bytes(origin_url, "UTF-8")).hexdigest() ), ) ) ], direction="BACKWARD", mask=field_mask, ) ) print(fqswhid_of_traversal(response)) except grpc.RpcError as e: print("Error from the GRPC API call: {}".format(e.details())) if filename: print(filename + " has SWHID " + content_swhid) except Exception as e: print("Unexpected error occurred: {}".format(e))