# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Any, Dict, Iterable, List, TextIO
from igraph import Graph, Vertex
from swh.model.swhids import CoreSWHID
from swh.model.swhids import ExtendedObjectType as ObjectType
from swh.model.swhids import ExtendedSWHID
logger = logging.getLogger(__name__)
# Only used when generating graphviz output
def _batched(it, n):
from itertools import islice
# TODO: Migrate to the the assignment syntax once we require Python 3.8+
# while batch := tuple(islice(it, n)):
# yield batch
while True:
batch = tuple(islice(it, n))
if not batch:
return
yield batch
[docs]
class Subgraph(Graph):
"""A class to hold a small subset of the Software Heritage graph
Each vertex corresponds to a single SWHID and vice versa. The graph is
directed and each edge represents a reference from an object in Software
Heritage storage to another.
This is backed by the `igraph <https://igraph.org/>`_ library for
convenience.
This class is intended to be subclassed to implement the more specific
behaviors needed for the different stage of the :ref:`object removal
algorithm <alter_removal_algorithm>`.
"""
def __init__(self, *args, **kwargs):
"""See also `igraph.Graph constructor
<https://igraph.org/python/api/latest/igraph.Graph.html#__init__>`_"""
super().__init__(*args, directed=True, **kwargs)
if "name" not in self.vs.attributes():
self.vs["name"] = []
if "swhid" not in self.vs.attributes():
self.vs["swhid"] = []
[docs]
@classmethod
def copy(cls, subgraph):
"""Create a new instance by copying vertices, edges and respective
attributes from the given subgraph."""
g = subgraph
return cls(
n=len(g.vs),
edges=(e.tuple for e in g.es),
graph_attrs={attr: g[attr] for attr in g.attributes()},
vertex_attrs={attr: g.vs[attr] for attr in g.vs.attributes()},
edge_attrs={attr: g.es[attr] for attr in g.es.attributes()},
)
# Used by select_ordered() below
_OBJECT_TYPE_ORDER = {
t: v
for v, t in enumerate(
[
ObjectType.ORIGIN,
ObjectType.SNAPSHOT,
ObjectType.RELEASE,
ObjectType.REVISION,
ObjectType.DIRECTORY,
ObjectType.CONTENT,
]
)
}
default_vertex_attributes: Dict[str, Any] = {}
"""Vertex will get the following attributes on creation
unless specified otherwise."""
[docs]
def add_vertex(self, name: str, **kwargs) -> Vertex:
"""Add or update a vertex.
A vertex with the given `name` will be created if it does not exist already.
Attributes for the vertex will be set to the one given as keyword arguments.
Returns:
a Vertex object corresponding to the added or updated vertex
"""
try:
# The name attribute is indexed. See:
# https://lists.nongnu.org/archive/html/igraph-help/2014-05/msg00069.html
v = self.vs.find(name)
v.update_attributes(**kwargs)
return v
except ValueError:
for k, v in type(self).default_vertex_attributes.items():
kwargs[k] = kwargs.get(k, v)
return super().add_vertex(name, source=set(), **kwargs)
[docs]
def add_swhids(self, swhids: Iterable[str]) -> Dict[str, int]:
"""Add a set of swhids to the subgraph.
Arguments:
swhids: a Set of SWHIDs.
Returns: a mapping between added SWHID strings and the corresponding vertex index
"""
swhid_list = list(swhids)
super().add_vertices(
n=len(swhid_list),
attributes={
**{
k: [v] * len(swhid_list)
for k, v in type(self).default_vertex_attributes.items()
},
"name": swhid_list,
"swhid": [ExtendedSWHID.from_string(swhid) for swhid in swhid_list],
},
)
return {v["name"]: v.index for v in self.vs.select(name_in=swhids)}
[docs]
def add_swhid(self, object_or_swhid, **kwargs) -> Vertex:
"""Add or update a vertex for the given SWHID or object.
This is a convenience method to add vertex from either :py:class:`CoreSWHID`,
:py:class:`ExtendedSWHID`, or any objects implementing a ``swhid()`` method
returning one of those.
"""
if hasattr(object_or_swhid, "swhid") and callable(object_or_swhid.swhid):
swhid = object_or_swhid.swhid()
else:
swhid = object_or_swhid
if type(swhid) is CoreSWHID:
swhid = swhid.to_extended()
elif type(swhid) is str:
swhid = ExtendedSWHID.from_string(swhid)
return self.add_vertex(name=str(swhid), swhid=swhid, **kwargs)
[docs]
def swhids(self) -> List[ExtendedSWHID]:
"""Returns all SWHID in this subgraph"""
return self.vs["swhid"]
_DEBUG_EDGE_INSERTION = False
[docs]
def add_edge(self, src: Vertex, dst: Vertex, skip_duplicates=False, **kwargs):
"""Add an edge with the given attributes.
When trying to add an edge that already exists:
- if `skip_duplicates` is set to True, nothing will be done,
- otherwise (the default), an exception will be raised.
Raises:
ValueError if the given edge already exists and `skip_duplicates` is False
"""
if self.get_eid(src, dst, error=False) != -1:
if skip_duplicates:
return
import inspect
current_frame = inspect.currentframe()
if current_frame and current_frame.f_back:
caller = inspect.getframeinfo(current_frame.f_back)[2]
else:
caller = "<unknown>"
raise ValueError(
"Duplicate edge %s → %s, added from %s"
% (src["name"], dst["name"], caller)
)
# This will also log edge insertion in the graph, making for a very
# verbose output. It should only be useful if you get the
# “duplicate edge” above.
if self._DEBUG_EDGE_INSERTION:
import inspect
current_frame = inspect.currentframe()
if current_frame and current_frame.f_back:
caller = inspect.getframeinfo(current_frame.f_back)[2]
else:
caller = "<unknown>"
logger.debug(
"Inserting edge %s → %s, added from %s",
src["name"],
dst["name"],
caller,
)
super().add_edge(src, dst, **kwargs)
[docs]
def select_ordered(self, *args, **kwargs) -> List[Vertex]:
"""Get vertices ordered by object type from origins to contents"""
return sorted(
self.vs.select(*args, **kwargs),
key=lambda v: Subgraph._OBJECT_TYPE_ORDER[v["swhid"].object_type],
)
def _format_swhid_label(self, s):
"""Format SWHID taking only SHA1 and breaking in lines of 8 characters"""
return "\\n".join(
["".join(batch) for batch in _batched(iter(s.split(":")[3]), 8)]
)
[docs]
def dot_node_attributes(self, v: Vertex) -> List[str]:
"""Get a list of attributes in DOT format for the given vertex.
The default implementation defines a label with a formatted SWHID.
This method is called by :py:meth:`write_dot` and is meant to be
subclassed to produce extra labels to highlight certain graph aspects.
"""
return [f'label="{self._format_swhid_label(v["name"])}"']
[docs]
def write_dot(self, f: TextIO) -> None:
"""Write a representation of this subgraph in DOT format to `f`.
The result can be processed using the `dot` command-line utility provided
by the Graphviz package.
"""
def write_objects(f, object_type):
for v in self.vs.select(lambda v: v["swhid"].object_type == object_type):
f.write(f' {v.index} [{", ".join(self.dot_node_attributes(v))}];\n')
for dst in v.successors():
# layout rev -> rev edges horizontally
if (
v["swhid"].object_type == ObjectType.REVISION
and dst["swhid"].object_type == ObjectType.REVISION
):
edge_options = " [constraint=false]"
else:
edge_options = ""
f.write(f" {v.index} -> {dst.index}{edge_options};\n")
f.write(
f'digraph "{self["name"] if "name" in self.attributes() else "Subgraph"}"'
"{\n"
" ranksep=1; nodesep=0.5;\n"
" subgraph cnt {\n"
" node [style=filled, fillcolor=pink];\n"
)
write_objects(f, ObjectType.CONTENT)
f.write(
" }\n"
" subgraph cluster_dir {\n"
' label="File contents";\n'
" node [shape=folder, style=filled, fillcolor=lightblue];\n"
)
write_objects(f, ObjectType.DIRECTORY)
f.write(
" }\n"
" subgraph cluster_rev {\n"
' label="Revisions";\n'
" node [shape=diamond, style=filled, fillcolor=orchid];\n"
)
write_objects(f, ObjectType.REVISION)
f.write(
" }\n"
" subgraph cluster_rel {\n"
' label="Releases";\n'
" node [shape=octagon, style=filled, fillcolor=sandybrown];\n"
)
write_objects(f, ObjectType.RELEASE)
f.write(
" }\n"
" subgraph cluster_snp {\n"
' label="Snapshot";\n'
" node [shape=doubleoctagon, style=filled, fillcolor=aqua];\n"
)
write_objects(f, ObjectType.SNAPSHOT)
f.write(
" }\n"
" subgraph cluster_ori {\n"
' label="Origins";\n'
" node [shape=egg, style=filled, fillcolor=khaki];\n"
)
write_objects(f, ObjectType.ORIGIN)
f.write(" }\n}\n")