# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
This module implements the marking stage of the
:ref:`removal algorithm <alter_removal_algorithm>`.
"""
from enum import Enum, auto
from itertools import chain
import logging
from typing import Iterator, List
from igraph import Vertex
from swh.graph.http_client import GraphArgumentException, RemoteGraphClient
from swh.model.swhids import ExtendedObjectType as ObjectType
from swh.model.swhids import ExtendedSWHID
from swh.storage.interface import StorageInterface
from .inventory import InventorySubgraph
from .subgraph import Subgraph
logger = logging.getLogger(__name__)
class MarkingState(Enum):
"""Represents the different state of vertices while the marking algorithm is on-going.
All vertices start UNMARKED. After examination of their inbound references, they are
either marked as REMOVABLE or UNREMOVABLE.
:meta private:
"""
UNMARKED = auto()
REMOVABLE = auto()
UNREMOVABLE = auto()
[docs]
class RemovableSubgraph(Subgraph):
"""A class representing the subgraph of objects that can be safely removed
from the archive
This subgraph should only be created from an :py:class:`InventorySubgraph`
using :py:meth:`from_inventory_subgraph`. No new vertices or edges can be
added at this point.
Getting the list of SWHID that can be removed is done by using
py:meth:`removable_swhids`.
After marking happened, this subgraph will contain all candidates until
:py:meth:`delete_unremovable` is called.
"""
def __init__(self, *args, **kwargs):
""" """
super().__init__(*args, **kwargs)
self["name"] = "Removable"
self.vs["state"] = MarkingState.UNMARKED
[docs]
@classmethod
def from_inventory_subgraph(cls, subgraph: InventorySubgraph):
return cls.copy(subgraph)
def add_vertex(self, name: str, **kwargs):
"""Not available for RemovableSubgraph.
:meta private:"""
raise NotImplementedError("No new vertex should be added at this stage.")
def add_edge(self, src: Vertex, dst: Vertex, skip_duplicates=False, **kwargs):
"""Not available for RemovableSubgraph.
:meta private:"""
raise NotImplementedError("No new edge should be added at this stage.")
[docs]
def delete_unremovable(self):
"""Delete all vertices for unremovable objects from this subgraph."""
self.delete_vertices(self.vs.select(state_eq=MarkingState.UNREMOVABLE))
[docs]
def removable_swhids(self) -> List[ExtendedSWHID]:
"""Returns a list of SWHIDs that can safely be removed from the archive."""
return [
v["swhid"] for v in self.select_ordered(state_eq=MarkingState.REMOVABLE)
]
def dot_node_attributes(self, v: Vertex) -> List[str]:
"""Get a list of attributes in DOT format for the given vertex.
On top of default attributes, color the background of each node if
they can be removed or not.
:meta private:
"""
attrs = super().dot_node_attributes(v)
# Unset the shape so we can better spot ExtendedSWHID that can be removed
if v["state"] == MarkingState.UNMARKED:
attrs.append('style="filled,dotted"')
elif v["state"] == MarkingState.UNREMOVABLE:
attrs.append("fillcolor=white")
else:
attrs.append("fillcolor=red")
return attrs
class Marker:
"""A class encapsulating our algorithm marking nodes as removable or not.
:meta private:
"""
def __init__(
self,
storage: StorageInterface,
graph_client: RemoteGraphClient,
subgraph: RemovableSubgraph,
):
self._storage = storage
self._graph_client = graph_client
self._subgraph = subgraph
if logger.isEnabledFor(logging.INFO):
self._total_nodes = len(subgraph.vs)
self._marked_count = 0
@property
def subgraph(self):
return self._subgraph
def has_unknown_inbound_edges(self, vertex: Vertex):
# Origins never have any inbound edges
if vertex["swhid"].object_type == ObjectType.ORIGIN:
return False
# We use a set of str as an optimization because
# swh.graph API will return SWHIDs as str
known_predecessors = {str(pred["swhid"]) for pred in vertex.predecessors()}
# We only need to find one extra edge from the ones we already know
search_limit = len(known_predecessors) + 1
for pred in chain(
self.inbound_edges_from_graph(vertex, search_limit),
self.inbound_edges_from_storage(vertex, search_limit),
):
# Removing a revision used as a submodule should be possible.
# (We do not care that it will make submodule users miss part
# of their source code.)
# We use string matching here because the graph API returns
# str objects and not ExtendedSWHID objects.
if vertex["swhid"].object_type == ObjectType.REVISION and pred.startswith(
"swh:1:dir:"
):
logger.info(
"Skipping predecessor %s of %s as its a submodule",
pred,
vertex["swhid"],
)
return False
logger.debug("Is %s an extra predecessor of %s?", pred, vertex["swhid"])
if pred not in known_predecessors:
return True
return False
def inbound_edges_from_graph(
self, vertex: Vertex, search_limit: int
) -> Iterator[str]:
try:
yield from self._graph_client.neighbors(
str(vertex["swhid"]), direction="backward", max_edges=search_limit
)
except GraphArgumentException:
yield from ()
def inbound_edges_from_storage(
self, vertex: Vertex, search_limit: int
) -> Iterator[str]:
yield from map(
str,
self._storage.object_find_recent_references(
vertex["swhid"], limit=search_limit
),
)
def mark_candidates(self):
for index, vid in enumerate(self._subgraph.topological_sorting()):
vertex = self._subgraph.vs[vid]
assert all(
pred["state"] != MarkingState.UNMARKED for pred in vertex.predecessors()
), "topological sort broken: one predecessor is still in unmarked state"
# If any predecessor is unremovable, this makes this object unremovable too
if any(
pred["state"] == MarkingState.UNREMOVABLE
for pred in vertex.predecessors()
):
vertex["state"] = MarkingState.UNREMOVABLE
continue
vertex["state"] = (
MarkingState.UNREMOVABLE
if self.has_unknown_inbound_edges(vertex)
else MarkingState.REMOVABLE
)
self.log_marking_progress()
def log_marking_progress(self):
if not logger.isEnabledFor(logging.INFO):
return
self._marked_count += 1
if self._total_nodes == self._marked_count or self._marked_count % 100 == 0:
logger.info(
"Checking inbound edges for node %s/%s",
self._marked_count,
self._total_nodes,
)
[docs]
def mark_removable(
storage: StorageInterface,
graph_client: RemoteGraphClient,
inventory_subgraph: InventorySubgraph,
) -> RemovableSubgraph:
"""Find which candidates can be safely removed from the archive.
Using the previously populated ``inventory_subgraph``, query the given
``storage`` and ``graph_client`` to assess inbound references for each
candidate and see which one can be safely removed from the archive.
"""
subgraph = RemovableSubgraph.from_inventory_subgraph(inventory_subgraph)
marker = Marker(storage, graph_client, subgraph)
marker.mark_candidates()
return marker.subgraph