Source code for swh.alter.inventory

# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""
This module implements the inventory stage of the
:ref:`removal algorithm <alter_removal_algorithm>`.
"""

from contextlib import suppress
import itertools
import logging
from typing import Any, Callable, Collection, Dict, Iterator, List

from igraph import Vertex

from swh.core.api.classes import stream_results_optional
from swh.graph.http_client import GraphArgumentException, RemoteGraphClient
from swh.model.model import Revision
from swh.model.swhids import ExtendedObjectType as ObjectType
from swh.model.swhids import ExtendedSWHID
from swh.storage.algos.origin import iter_origin_visit_statuses, iter_origin_visits
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.storage.interface import StorageInterface

from .subgraph import Subgraph

logger = logging.getLogger(__name__)


[docs] class InventorySubgraph(Subgraph): """A subgraph holding an inventory of all candidates for removal When all references from a given node have been accounted for, the ``complete`` attribute is set to True. """ default_vertex_attributes: Dict[str, Any] = {"complete": False} def __init__(self, *args, **kwargs): """See :py:class:`Subgraph`""" super().__init__(*args, **kwargs) self["name"] = "Inventory" self["root_swhids"] = [] if "complete" not in self.vs.attributes(): self.vs["complete"] = [False for _ in self.vs]
[docs] def select_incomplete(self) -> List[Vertex]: """Return vertices known to be incomplete ordered by object type from origins to contents.""" # We want to order incomplete vertices from origin to content in order to # increase the chance to complete as many vertices as possible using swh.graph # traversal which follows the same direction. return self.select_ordered(complete_eq=False)
def dot_node_attributes(self, v: Vertex) -> List[str]: """Get a list of attributes in DOT format for the given vertex. On top of default attributes, use a bold font for root objects, and color the background if the vertex is known to be incomplete. :meta private: """ attrs = super().dot_node_attributes(v) if v["swhid"] in self["root_swhids"]: attrs.append('fontname="times bold"') if not v["complete"]: attrs.append("color=gray") return attrs
class Lister: """A class encapsulating our inventory algorithm. :meta private: """ def __init__( self, storage: StorageInterface, graph_client: RemoteGraphClient, subgraph: InventorySubgraph, ): self._subgraph = subgraph self._storage = storage self._graph_client = graph_client @property def subgraph(self): return self._subgraph def inventory_candidates(self, root: ExtendedSWHID) -> None: self._subgraph["root_swhids"].append(root) # Add our starting point as incomplete self._subgraph.add_swhid(root, complete=False) # Iterate until everything has been fetched logger.debug("inventory_candidates: added %s", root) for remaining in self._iter_inventory_candidates(): logger.debug( "inventory_candidates: %4d SWIDS known, %4d needs to be looked up.", len(self._subgraph.vs), remaining, ) def _fetch_candidates_using_graph(self, vertex) -> None: # We don’t except all SWHID to be present in swh.graph with suppress(GraphArgumentException): self.add_edges_traversing_graph(vertex["swhid"]) def _fetch_candidates_using_storage(self, vertex) -> None: self.add_edges_using_storage(vertex["swhid"]) def _iter_inventory_candidates(self) -> Iterator[int]: # We cycle from retrieving from swh.graph (fast but potentially incomplete), # and swh.storage (complete, but slow and one level at a time). # Note: SWHIDs discovered from swh.graph will not # require further fetching. Because objects are immutable, # nothing could have been added after the graph has been # exported. The sole exception is origin objects. # References from SWHIDs discovered from swh.storage # have to be looked up though. As there is a chance they # can be found them in swh.graph, we’ll do that at our # next iteration. fetchers = itertools.cycle( [self._fetch_candidates_using_graph, self._fetch_candidates_using_storage] ) to_fetch = self._subgraph.select_incomplete() while to_fetch: fetch = next(fetchers) for vertex in to_fetch: # fetcher might complete more than a given vertex, so # this one might just have been completed since we got # the list of incomplete vertices if vertex["complete"]: continue fetch(vertex) to_fetch = self._subgraph.select_incomplete() yield len(to_fetch) def add_edges_traversing_graph(self, start: ExtendedSWHID) -> None: # We want everything except dir→rev edges which represent submodules. # See the relevant comment in `_add_edges_using_storage_for_directory` below. edge_restriction = "ori:*,snp:*,rel:*,rev:*,dir:dir,dir:cnt" # XXX: We should be able to pass a SWHID object to visit_edges() for src, dst in self._graph_client.visit_edges( str(start), edges=edge_restriction ): # Origins are the only objects in the graph that are not identified # over their content. New destinations can be added over time with # new visits. Therefore, we need to always query storage for them. complete = not src.startswith("swh:1:ori:") v_src = self._subgraph.add_swhid(src, complete=complete) # Anything pointed by origins are known to be complete if they were # present at times the swh.graph export was made. v_dst = self._subgraph.add_swhid(dst, complete=True) self._subgraph.add_edge(v_src, v_dst) def add_edges_using_storage(self, source: ExtendedSWHID) -> None: _ADD_EDGES_USING_STORAGE_METHODS_PER_OBJECT_TYPE[source.object_type]( self, source ) def _add_edges_using_storage_for_content(self, source: ExtendedSWHID) -> None: _ = self._subgraph.add_swhid(source, complete=True) # Nothing is referenced by content objects so we have no edges to add def _add_edges_using_storage_for_directory(self, source: ExtendedSWHID) -> None: v_directory = self._subgraph.add_swhid(source, complete=True) entries = stream_results_optional( self._storage.directory_get_entries, source.object_id ) if not entries: logger.warning("Directory %s not found", source) return for entry in entries: target = entry.swhid().to_extended() # References from directory to revision represents submodules. As of # April 2023, loaders will record the reference to a submodule, and # that’s it. They don’t handle loading the associated origin. If the # revision for a submodule is in the archive, it means that we the # origin has been retrieved separately. The policy is thus that to # remove the code in a submodule, one should remove the associated # origin. Long story short, revisions used as submodule (and their # references) are not considered as candidates for removal when # inventorying a directory. if target.object_type == ObjectType.REVISION: logger.info( "Ignored submodule %s (%s), listed in directory %s", entry.name, entry.swhid(), source, ) continue # Content objects are our leafs. Therefore we already know all about # them as soon as we know they exist and we can consider them complete. if target.object_type == ObjectType.CONTENT: v_target = self._subgraph.add_swhid(target, complete=True) else: v_target = self._subgraph.add_swhid(target) # It is possible to get the same target multiple times # for a single directory. For example if it contains multiple # files with the same content. self._subgraph.add_edge(v_directory, v_target, skip_duplicates=True) def _add_edges_using_storage_for_revision(self, source: ExtendedSWHID) -> None: # We limit the search to not retrieve too much at once from storage. # We will have to retrieve the oldest parent revisions if needed in any cases. for rev_d in self._storage.revision_log([source.object_id], limit=100): revision = Revision.from_dict(rev_d) revision_swhid = revision.swhid().to_extended() # We might know about this revision already from a previous revision log. # In that case there is no need to go any further up the log. if self._subgraph.vs.select(name=str(revision_swhid), complete=True): break # We can say these don’t need to be fetched as we know that # we are getting everything about this revision here: # its directory and parents. v_revision = self._subgraph.add_swhid(revision_swhid, complete=True) v_directory = self._subgraph.add_swhid(revision.directory_swhid()) self._subgraph.add_edge(v_revision, v_directory) # We create a set here because some revisions point to the same # parent multiple times. for parent_swhid in set(revision.parent_swhids()): v_parent = self._subgraph.add_swhid(parent_swhid) self._subgraph.add_edge(v_revision, v_parent) def _add_edges_using_storage_for_release(self, source: ExtendedSWHID) -> None: v_release = self._subgraph.add_swhid(source, complete=True) [release] = self._storage.release_get([source.object_id]) if not release: logger.warning("Release %s not found", source) return v_target = self._subgraph.add_swhid(release.target_swhid()) self._subgraph.add_edge(v_release, v_target) def _add_edges_using_storage_for_snapshot(self, source: ExtendedSWHID) -> None: v_snapshot = self._subgraph.add_swhid(source, complete=True) snapshot = snapshot_get_all_branches(self._storage, source.object_id) if not snapshot: logger.warning("Snapshot %s not found", source) return # We need to deduplicate targets as multiple branches can point to the # same head and we only need one edge for each target. # TODO: Better document aliases in swh-model. I had to look at mercurial # loader to understand what they were useful for. # We also skip aliases here as they are referring to a branch name # inside the snapshot. As we inventory the reference for every branches # the branch pointed by an alias will always be dealt with. target_swhids = { branch.swhid() for branch in snapshot.branches.values() if branch and branch.target_type.value != "alias" } for swhid in target_swhids: v_branch = self._subgraph.add_swhid(swhid) self._subgraph.add_edge(v_snapshot, v_branch) def _add_edges_using_storage_for_origin(self, source: ExtendedSWHID) -> None: v_source = self._subgraph.add_swhid(source, complete=True) [origin_d] = self._storage.origin_get_by_sha1([source.object_id]) if not origin_d: logger.warning("origin %s not found in storage", source) return for visit in iter_origin_visits(self._storage, origin_d["url"]): assert visit.visit is not None # make mypy happy for status in iter_origin_visit_statuses( self._storage, visit.origin, visit.visit ): snapshot_swhid = status.snapshot_swhid() if snapshot_swhid: v_snapshot = self._subgraph.add_swhid(snapshot_swhid) self._subgraph.add_edge(v_source, v_snapshot, skip_duplicates=True) _ADD_EDGES_USING_STORAGE_METHODS_PER_OBJECT_TYPE: Dict[ ObjectType, Callable[[Lister, ExtendedSWHID], None] ] = { ObjectType.CONTENT: Lister._add_edges_using_storage_for_content, ObjectType.DIRECTORY: Lister._add_edges_using_storage_for_directory, ObjectType.REVISION: Lister._add_edges_using_storage_for_revision, ObjectType.RELEASE: Lister._add_edges_using_storage_for_release, ObjectType.SNAPSHOT: Lister._add_edges_using_storage_for_snapshot, ObjectType.ORIGIN: Lister._add_edges_using_storage_for_origin, }
[docs] def make_inventory( storage, graph_client, swhids: Collection[ExtendedSWHID] ) -> InventorySubgraph: """Inventory candidates for removal from the given set of SWHID. By querying the given `storage` and `graph_client`, create a subgraph with all objects belonging to the given set of SWHIDs (typically origins). The result should then used to verify which candidate can safely be removed. """ subgraph = InventorySubgraph() lister = Lister(storage, graph_client, subgraph) for swhid in swhids: lister.inventory_candidates(swhid) return lister.subgraph