# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from collections import deque
import dataclasses
import heapq
from typing import TYPE_CHECKING, Any, Dict, Optional, Set, TypeVar
from swh.model.model import Sha1Git
if TYPE_CHECKING:
from swh.storage.interface import StorageInterface
[docs]
@dataclasses.dataclass
class State:
done: Set[Sha1Git] = dataclasses.field(default_factory=set)
revs_to_visit: Any = dataclasses.field(default_factory=list)
last_rev: Optional[Dict] = None
num_revs: int = 0
missing_revs: Set[Sha1Git] = dataclasses.field(default_factory=set)
_revs_walker_classes = {}
class _RevisionsWalkerMetaClass(ABCMeta):
def __new__(cls, clsname, bases, attrs):
newclass = super().__new__(cls, clsname, bases, attrs)
if "rw_type" in attrs:
_revs_walker_classes[attrs["rw_type"]] = newclass
return newclass
TWalker = TypeVar("TWalker", bound="RevisionsWalker")
[docs]
class RevisionsWalker(metaclass=_RevisionsWalkerMetaClass):
"""
Abstract base class encapsulating the logic to walk across
a revisions history starting from a given one.
It defines an iterator returning the revisions according
to a specific ordering implemented in derived classes.
The iteration step performs the following operations:
1) Check if the iteration is finished by calling method
:meth:`is_finished` and raises :exc:`StopIteration` if it
it is the case
2) Get the next unseen revision by calling method
:meth:`get_next_rev_id`
3) Process parents of that revision by calling method
:meth:`process_parent_revs` for the next iteration
steps
4) Check if the revision should be returned by calling
method :meth:`should_return` and returns it if
it is the case
In order to easily instantiate a specific type of revisions
walker, it is recommended to use the factory function
:func:`get_revisions_walker`.
Args:
storage: instance of swh storage (either local or remote)
rev_start: a revision identifier
max_revs: maximum number of revisions to return
state: previous state of that revisions walker
ignore_displayname: return the original author/committer's full name even if
it's masked by a displayname.
"""
def __init__(
self,
storage: StorageInterface,
rev_start: Sha1Git,
max_revs: Optional[int] = None,
state: Optional[State] = None,
ignore_displayname: bool = False,
):
self._revs: Dict[Sha1Git, Dict] = {}
self._max_revs = max_revs
self._state = state or State()
self.storage = storage
self.ignore_displayname = ignore_displayname
self.process_rev(rev_start)
[docs]
@abstractmethod
def process_rev(self, rev_id: Sha1Git) -> None:
"""
Abstract method whose purpose is to process a newly visited
revision during the walk.
Derived classes must implement it according to the desired
method to walk across the revisions history (for instance
through a dfs on the revisions DAG).
Args:
rev_id: the newly visited revision identifier
"""
pass
[docs]
@abstractmethod
def get_next_rev_id(self) -> Sha1Git:
"""
Abstract method whose purpose is to return the next revision
during the iteration.
Derived classes must implement it according to the desired
method to walk across the revisions history.
"""
pass
[docs]
def process_parent_revs(self, rev: Dict) -> None:
"""
Process the parents of a revision when it is iterated.
The default implementation simply calls :meth:`process_rev`
for each parent revision in the order they are declared.
Args:
rev (dict): A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
"""
for parent_id in rev["parents"]:
self.process_rev(parent_id)
[docs]
def should_return(self, rev: Dict) -> bool:
"""
Filter out a revision to return if needed.
Default implementation returns all iterated revisions.
Args:
rev (dict): A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
Returns:
bool: Whether to return the revision in the iteration
"""
return True
[docs]
def is_finished(self) -> bool:
"""
Determine if the iteration is finished.
This method is called at the beginning of each iteration loop.
Returns:
bool: Whether the iteration is finished
"""
if self._max_revs is not None and self._state.num_revs >= self._max_revs:
return True
if not self._state.revs_to_visit:
return True
return False
def _get_rev(self, rev_id: Sha1Git) -> Optional[Dict]:
rev = self._revs.get(rev_id)
if rev is None:
# cache some revisions in advance to avoid sending too much
# requests to storage and thus speedup the revisions walk
for rev in self.storage.revision_log(
[rev_id], limit=100, ignore_displayname=self.ignore_displayname
):
# revision data is missing, returned history will be truncated
if rev is None:
continue
self._revs[rev["id"]] = rev
return self._revs.get(rev_id)
[docs]
def missing_revisions(self) -> Set[Sha1Git]:
"""
Return a set of revision identifiers whose associated data were
found missing into the archive content while walking on the
revisions graph.
Returns:
Set[bytes]: a set of revision identifiers
"""
return self._state.missing_revs
[docs]
def is_history_truncated(self) -> bool:
"""
Return if the revision history generated so far has been truncated
of not. A revision history might end up truncated if some revision
data were found missing into the archive content.
Returns:
bool: Whether the history got truncated or not
"""
return len(self.missing_revisions()) > 0
[docs]
def export_state(self) -> State:
"""
Export the internal state of that revision walker to a dict.
Its purpose is to continue the iteration in a pagination context.
Returns:
The internal state of that revisions walker
"""
return self._state
def __next__(self) -> Dict:
if self.is_finished():
raise StopIteration
while self._state.revs_to_visit:
rev_id = self.get_next_rev_id()
if rev_id in self._state.done:
continue
self._state.done.add(rev_id)
rev = self._get_rev(rev_id)
# revision data is missing, returned history will be truncated
if rev is None:
self._state.missing_revs.add(rev_id)
continue
self.process_parent_revs(rev)
if self.should_return(rev):
self._state.num_revs += 1
self._state.last_rev = rev
return rev
raise StopIteration
def __iter__(self: TWalker) -> TWalker:
return self
[docs]
class CommitterDateRevisionsWalker(RevisionsWalker):
"""
Revisions walker that returns revisions in reverse chronological
order according to committer date (same behaviour as ``git log``)
"""
rw_type = "committer_date"
[docs]
def process_rev(self, rev_id: Sha1Git) -> None:
"""
Add the revision to a priority queue according to the committer date.
Args:
rev_id (bytes): the newly visited revision identifier
"""
if rev_id not in self._state.done:
rev = self._get_rev(rev_id)
if rev is not None:
commit_time = (
rev["committer_date"]["timestamp"]["seconds"]
if rev["committer_date"]
# allows to avoid failure with a revision without commit date
# and iterate on such revision before its parents
else len(self._state.revs_to_visit)
)
heapq.heappush(self._state.revs_to_visit, (-commit_time, rev_id))
else:
self._state.missing_revs.add(rev_id)
[docs]
def get_next_rev_id(self) -> Sha1Git:
"""
Return the smallest revision from the priority queue, i.e.
the one with highest committer date.
Returns:
dict: A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
"""
_, rev_id = heapq.heappop(self._state.revs_to_visit)
return rev_id
[docs]
class BFSRevisionsWalker(RevisionsWalker):
"""
Revisions walker that returns revisions in the same order
as when performing a breadth-first search on the revisions
DAG.
"""
rw_type = "bfs"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._state.revs_to_visit = deque(self._state.revs_to_visit)
[docs]
def process_rev(self, rev_id: Sha1Git) -> None:
"""
Append the revision to a queue.
Args:
rev_id (bytes): the newly visited revision identifier
"""
if rev_id not in self._state.done:
self._state.revs_to_visit.append(rev_id)
[docs]
def get_next_rev_id(self) -> Sha1Git:
"""
Return the next revision from the queue.
Returns:
dict: A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
"""
return self._state.revs_to_visit.popleft()
[docs]
class DFSPostRevisionsWalker(RevisionsWalker):
"""
Revisions walker that returns revisions in the same order
as when performing a depth-first search in post-order on the
revisions DAG (i.e. after visiting a merge commit,
the merged commit will be visited before the base it was
merged on).
"""
rw_type = "dfs_post"
[docs]
def process_rev(self, rev_id: Sha1Git) -> None:
"""
Append the revision to a stack.
Args:
rev_id (bytes): the newly visited revision identifier
"""
if rev_id not in self._state.done:
self._state.revs_to_visit.append(rev_id)
[docs]
def get_next_rev_id(self) -> Sha1Git:
"""
Return the next revision from the stack.
Returns:
dict: A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
"""
return self._state.revs_to_visit.pop()
[docs]
class DFSRevisionsWalker(DFSPostRevisionsWalker):
"""
Revisions walker that returns revisions in the same order
as when performing a depth-first search in pre-order on the
revisions DAG (i.e. after visiting a merge commit,
the base commit it was merged on will be visited before
the merged commit).
"""
rw_type = "dfs"
[docs]
def process_parent_revs(self, rev: Dict) -> None:
"""
Process the parents of a revision when it is iterated in
the reversed order they are declared.
Args:
rev (dict): A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
"""
for parent_id in reversed(rev["parents"]):
self.process_rev(parent_id)
[docs]
class PathRevisionsWalker(CommitterDateRevisionsWalker):
"""
Revisions walker that returns revisions where a specific
path in the source tree has been modified, in other terms
it allows to get the history for a specific file or directory.
It has a behaviour similar to what ``git log`` offers by default,
meaning the returned history is simplified in order to only
show relevant revisions (see the `History Simplification
<https://git-scm.com/docs/git-log#_history_simplification>`_
section of the associated manual for more details).
Please note that to avoid walking the entire history, the iteration
will stop once a revision where the path has been added is found.
.. warning:: Due to client-side implementation, performances
are not optimal when the total numbers of revisions to walk
is large. This should only be used when the total number of
revisions does not exceed a couple of thousands.
Args:
storage (swh.storage.interface.StorageInterface): instance of swh storage
(either local or remote)
rev_start (bytes): a revision identifier
path (str): the path in the source tree to retrieve the history
max_revs (Optional[int]): maximum number of revisions to return
state (Optional[dict]): previous state of that revisions walker
"""
rw_type = "path"
def __init__(self, storage, rev_start, path, **kwargs):
super().__init__(storage, rev_start, **kwargs)
paths = path.strip("/").split("/")
self._path = list(map(lambda p: p.encode("utf-8"), paths))
self._rev_dir_path = {}
def _get_path_id(self, rev_id):
"""
Return the path checksum identifier in the source tree of the
provided revision. If the path corresponds to a directory, the
value computed by :meth:`swh.model.Directory.compute_hash`
will be returned. If the path corresponds to a file, its sha1
checksum will be returned.
Args:
rev_id (bytes): a revision identifier
Returns:
bytes: the path identifier
"""
rev = self._get_rev(rev_id)
rev_dir_id = rev["directory"]
if rev_dir_id not in self._rev_dir_path:
try:
dir_info = self.storage.directory_entry_get_by_path(
rev_dir_id, self._path
)
self._rev_dir_path[rev_dir_id] = dir_info["target"]
except Exception:
self._rev_dir_path[rev_dir_id] = None
return self._rev_dir_path[rev_dir_id]
[docs]
def is_finished(self):
"""
Check if the revisions iteration is finished.
This checks for the specified path's existence in the last
returned revision's parents' source trees.
If not, the iteration is considered finished.
Returns:
bool: Whether to return the revision in the iteration
"""
if self._path and self._last_rev:
last_rev_parents = self._last_rev["parents"]
last_rev_parents_path_ids = [
self._get_path_id(p_rev) for p_rev in last_rev_parents
]
no_path = all([path_id is None for path_id in last_rev_parents_path_ids])
if no_path:
return True
return super().is_finished()
[docs]
def process_parent_revs(self, rev):
"""
Process parents when a new revision is iterated.
It enables to get a simplified revisions history in the same
manner as ``git log``. When a revision has multiple parents,
the following process is applied. If the revision was a merge,
and has the same path identifier to one parent, follow only that
parent (even if there are several parents with the same path
identifier, follow only one of them.) Otherwise, follow all parents.
Args:
rev (dict): A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
"""
rev_path_id = self._get_path_id(rev["id"])
if rev_path_id:
if len(rev["parents"]) == 1:
self.process_rev(rev["parents"][0])
else:
parent_rev_path_ids = [
self._get_path_id(p_rev) for p_rev in rev["parents"]
]
different_trees = all(
[path_id != rev_path_id for path_id in parent_rev_path_ids]
)
for i, p_rev in enumerate(rev["parents"]):
if different_trees or parent_rev_path_ids[i] == rev_path_id:
self.process_rev(p_rev)
if not different_trees:
break
else:
super().process_parent_revs(rev)
[docs]
def should_return(self, rev):
"""
Check if a revision should be returned when iterating.
It verifies that the specified path has been modified
by the revision but also that all parents have a path
identifier different from the revision one in order
to get a simplified history.
Args:
rev (dict): A dict describing a revision as returned by
:meth:`swh.storage.interface.StorageInterface.revision_get`
Returns:
bool: Whether to return the revision in the iteration
"""
rev_path_id = self._get_path_id(rev["id"])
if not rev["parents"]:
return rev_path_id is not None
parent_rev_path_ids = [self._get_path_id(p_rev) for p_rev in rev["parents"]]
different_trees = all(
[path_id != rev_path_id for path_id in parent_rev_path_ids]
)
if rev_path_id != parent_rev_path_ids[0] and different_trees:
return True
return False
[docs]
def get_revisions_walker(rev_walker_type, *args, **kwargs):
"""
Instantiate a revisions walker of a given type.
The following code snippet demonstrates how to use a revisions
walker for processing a whole revisions history::
from swh.storage import get_storage
storage = get_storage(...)
revs_walker = get_revisions_walker('committer_date', storage, rev_id)
for rev in revs_walker:
# process revision rev
It is also possible to walk a revisions history in a paginated
way as illustrated below::
def get_revs_history_page(rw_type, storage, rev_id, page_num,
page_size, rw_state):
max_revs = (page_num + 1) * page_size
revs_walker = get_revisions_walker(rw_type, storage, rev_id,
max_revs=max_revs,
state=rw_state)
revs = list(revs_walker)
rw_state = revs_walker.export_state()
return revs
rev_start = ...
per_page = 50
rw_state = {}
for page in range(0, 10):
revs_page = get_revs_history_page('dfs', storage, rev_start, page,
per_page, rw_state)
# process revisions page
Args:
rev_walker_type (str): the type of revisions walker to return,
possible values are: *committer_date*, *dfs*, *dfs_post*,
*bfs* and *path*
args (list): position arguments to pass to the revisions walker
constructor
kwargs (dict): keyword arguments to pass to the revisions walker
constructor
"""
if rev_walker_type not in _revs_walker_classes:
raise Exception('No revisions walker found for type "%s"' % rev_walker_type)
revs_walker_class = _revs_walker_classes[rev_walker_type]
return revs_walker_class(*args, **kwargs)