Source code for swh.alter.subgraph

# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from typing import Any, Dict, Iterable, List, TextIO

from igraph import Graph, Vertex

from swh.model.swhids import CoreSWHID
from swh.model.swhids import ExtendedObjectType as ObjectType
from swh.model.swhids import ExtendedSWHID

logger = logging.getLogger(__name__)

# Only used when generating graphviz output
def _batched(it, n):
    from itertools import islice

    # TODO: Migrate to the the assignment syntax once we require Python 3.8+
    # while batch := tuple(islice(it, n)):
    #     yield batch
    while True:
        batch = tuple(islice(it, n))
        if not batch:
        yield batch

[docs] class Subgraph(Graph): """A class to hold a small subset of the Software Heritage graph Each vertex corresponds to a single SWHID and vice versa. The graph is directed and each edge represents a reference from an object in Software Heritage storage to another. This is backed by the `igraph <>`_ library for convenience. This class is intended to be subclassed to implement the more specific behaviors needed for the different stage of the :ref:`object removal algorithm <alter_removal_algorithm>`. """ def __init__(self, *args, **kwargs): """See also `igraph.Graph constructor <>`_""" super().__init__(*args, directed=True, **kwargs) if "name" not in self.vs.attributes(): self.vs["name"] = [] if "swhid" not in self.vs.attributes(): self.vs["swhid"] = []
[docs] @classmethod def copy(cls, subgraph): """Create a new instance by copying vertices, edges and respective attributes from the given subgraph.""" g = subgraph return cls( n=len(g.vs), edges=(e.tuple for e in, graph_attrs={attr: g[attr] for attr in g.attributes()}, vertex_attrs={attr: g.vs[attr] for attr in g.vs.attributes()}, edge_attrs={attr:[attr] for attr in}, )
# Used by select_ordered() below _OBJECT_TYPE_ORDER = { t: v for v, t in enumerate( [ ObjectType.ORIGIN, ObjectType.SNAPSHOT, ObjectType.RELEASE, ObjectType.REVISION, ObjectType.DIRECTORY, ObjectType.CONTENT, ] ) } default_vertex_attributes: Dict[str, Any] = {} """Vertex will get the following attributes on creation unless specified otherwise."""
[docs] def add_vertex(self, name: str, **kwargs) -> Vertex: """Add or update a vertex. A vertex with the given `name` will be created if it does not exist already. Attributes for the vertex will be set to the one given as keyword arguments. Returns: a Vertex object corresponding to the added or updated vertex """ try: # The name attribute is indexed. See: # v = self.vs.find(name) v.update_attributes(**kwargs) return v except ValueError: for k, v in type(self).default_vertex_attributes.items(): kwargs[k] = kwargs.get(k, v) return super().add_vertex(name, source=set(), **kwargs)
[docs] def add_swhids(self, swhids: Iterable[str]) -> Dict[str, int]: """Add a set of swhids to the subgraph. Arguments: swhids: a Set of SWHIDs. Returns: a mapping between added SWHID strings and the corresponding vertex index """ swhid_list = list(swhids) super().add_vertices( n=len(swhid_list), attributes={ **{ k: [v] * len(swhid_list) for k, v in type(self).default_vertex_attributes.items() }, "name": swhid_list, "swhid": [ExtendedSWHID.from_string(swhid) for swhid in swhid_list], }, ) return {v["name"]: v.index for v in}
[docs] def add_swhid(self, object_or_swhid, **kwargs) -> Vertex: """Add or update a vertex for the given SWHID or object. This is a convenience method to add vertex from either :py:class:`CoreSWHID`, :py:class:`ExtendedSWHID`, or any objects implementing a ``swhid()`` method returning one of those. """ if hasattr(object_or_swhid, "swhid") and callable(object_or_swhid.swhid): swhid = object_or_swhid.swhid() else: swhid = object_or_swhid if type(swhid) is CoreSWHID: swhid = swhid.to_extended() elif type(swhid) is str: swhid = ExtendedSWHID.from_string(swhid) return self.add_vertex(name=str(swhid), swhid=swhid, **kwargs)
[docs] def swhids(self) -> List[ExtendedSWHID]: """Returns all SWHID in this subgraph""" return self.vs["swhid"]
[docs] def add_edge(self, src: Vertex, dst: Vertex, skip_duplicates=False, **kwargs): """Add an edge with the given attributes. When trying to add an edge that already exists: - if `skip_duplicates` is set to True, nothing will be done, - otherwise (the default), an exception will be raised. Raises: ValueError if the given edge already exists and `skip_duplicates` is False """ if self.get_eid(src, dst, error=False) != -1: if skip_duplicates: return import inspect current_frame = inspect.currentframe() if current_frame and current_frame.f_back: caller = inspect.getframeinfo(current_frame.f_back)[2] else: caller = "<unknown>" raise ValueError( "Duplicate edge %s%s, added from %s" % (src["name"], dst["name"], caller) ) # This will also log edge insertion in the graph, making for a very # verbose output. It should only be useful if you get the # “duplicate edge” above. if self._DEBUG_EDGE_INSERTION: import inspect current_frame = inspect.currentframe() if current_frame and current_frame.f_back: caller = inspect.getframeinfo(current_frame.f_back)[2] else: caller = "<unknown>" logger.debug( "Inserting edge %s%s, added from %s", src["name"], dst["name"], caller, ) super().add_edge(src, dst, **kwargs)
[docs] def select_ordered(self, *args, **kwargs) -> List[Vertex]: """Get vertices ordered by object type from origins to contents""" return sorted(*args, **kwargs), key=lambda v: Subgraph._OBJECT_TYPE_ORDER[v["swhid"].object_type], )
def _format_swhid_label(self, s): """Format SWHID taking only SHA1 and breaking in lines of 8 characters""" return "\\n".join( ["".join(batch) for batch in _batched(iter(s.split(":")[3]), 8)] )
[docs] def dot_node_attributes(self, v: Vertex) -> List[str]: """Get a list of attributes in DOT format for the given vertex. The default implementation defines a label with a formatted SWHID. This method is called by :py:meth:`write_dot` and is meant to be subclassed to produce extra labels to highlight certain graph aspects. """ return [f'label="{self._format_swhid_label(v["name"])}"']
[docs] def write_dot(self, f: TextIO) -> None: """Write a representation of this subgraph in DOT format to `f`. The result can be processed using the `dot` command-line utility provided by the Graphviz package. """ def write_objects(f, object_type): for v in v: v["swhid"].object_type == object_type): f.write(f' {v.index} [{", ".join(self.dot_node_attributes(v))}];\n') for dst in v.successors(): # layout rev -> rev edges horizontally if ( v["swhid"].object_type == ObjectType.REVISION and dst["swhid"].object_type == ObjectType.REVISION ): edge_options = " [constraint=false]" else: edge_options = "" f.write(f" {v.index} -> {dst.index}{edge_options};\n") f.write( f'digraph "{self["name"] if "name" in self.attributes() else "Subgraph"}"' "{\n" " ranksep=1; nodesep=0.5;\n" " subgraph cnt {\n" " node [style=filled, fillcolor=pink];\n" ) write_objects(f, ObjectType.CONTENT) f.write( " }\n" " subgraph cluster_dir {\n" ' label="File contents";\n' " node [shape=folder, style=filled, fillcolor=lightblue];\n" ) write_objects(f, ObjectType.DIRECTORY) f.write( " }\n" " subgraph cluster_rev {\n" ' label="Revisions";\n' " node [shape=diamond, style=filled, fillcolor=orchid];\n" ) write_objects(f, ObjectType.REVISION) f.write( " }\n" " subgraph cluster_rel {\n" ' label="Releases";\n' " node [shape=octagon, style=filled, fillcolor=sandybrown];\n" ) write_objects(f, ObjectType.RELEASE) f.write( " }\n" " subgraph cluster_snp {\n" ' label="Snapshot";\n' " node [shape=doubleoctagon, style=filled, fillcolor=aqua];\n" ) write_objects(f, ObjectType.SNAPSHOT) f.write( " }\n" " subgraph cluster_ori {\n" ' label="Origins";\n' " node [shape=egg, style=filled, fillcolor=khaki];\n" ) write_objects(f, ObjectType.ORIGIN) f.write(" }\n}\n")