# Copyright (C) 2018-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# Utility module to efficiently compute the list of changed files
# between two directory trees.
# The implementation is inspired from the work of Alberto Cortés
# for the go-git project. For more details, you can refer to:
# - this blog post: https://blog.sourced.tech/post/difftree/
# - the reference implementation in go:
# https://github.com/src-d/go-git/tree/master/utils/merkletrie
import collections
from typing import Any, Dict, List, Optional
from swh.model.model import Directory
from swh.storage.interface import StorageInterface
from .dir_iterators import DirectoryIterator, DoubleDirectoryIterator, Remaining
# get the hash identifier for an empty directory
_empty_dir_hash = Directory(entries=()).id
def _get_rev(storage: StorageInterface, rev_id: bytes) -> Dict[str, Any]:
Return revision data from swh storage.
revision = storage.revision_get([rev_id])[0]
assert revision is not None
return revision.to_dict()
class _RevisionChangesList:
Helper class to track the changes between two
revision directories.
def __init__(self, storage: StorageInterface, track_renaming: bool):
storage: instance of swh storage
track_renaming: whether to track or not files renaming
self.storage = storage
self.track_renaming = track_renaming
self.result: List[Dict[str, Any]] = []
# dicts used to track file renaming based on hash value
# we use a list instead of a single entry to handle the corner
# case when a repository contains multiple instance of
# the same file in different directories and a commit
# renames all of them
self.inserted_hash_idx: Dict[bytes, List[int]] = collections.defaultdict(list)
self.deleted_hash_idx: Dict[bytes, List[int]] = collections.defaultdict(list)
def add_insert(self, it_to: DirectoryIterator) -> None:
Add a file insertion in the to directory.
it_to: iterator on the to directory
to_hash = it_to.current_hash()
# if the current file hash has been previously marked as deleted,
# the file has been renamed
if self.track_renaming and self.deleted_hash_idx[to_hash]:
# pop the delete change index in the same order it was inserted
change = self.result[self.deleted_hash_idx[to_hash].pop(0)]
# change the delete change as a rename one
change["type"] = "rename"
change["to"] = it_to.current()
change["to_path"] = it_to.current_path()
# add the insert change in the list
"type": "insert",
"from": None,
"from_path": None,
"to": it_to.current(),
"to_path": it_to.current_path(),
# if rename tracking is activated, add the change index in
# the inserted_hash_idx dict
if self.track_renaming:
self.inserted_hash_idx[to_hash].append(len(self.result) - 1)
def add_delete(self, it_from: DirectoryIterator) -> None:
Add a file deletion in the from directory.
it_from: iterator on the from directory
from_hash = it_from.current_hash()
# if the current file has been previously marked as inserted,
# the file has been renamed
if self.track_renaming and self.inserted_hash_idx[from_hash]:
# pop the insert change index in the same order it was inserted
change = self.result[self.inserted_hash_idx[from_hash].pop(0)]
# change the insert change as a rename one
change["type"] = "rename"
change["from"] = it_from.current()
change["from_path"] = it_from.current_path()
# add the delete change in the list
"type": "delete",
"from": it_from.current(),
"from_path": it_from.current_path(),
"to": None,
"to_path": None,
# if rename tracking is activated, add the change index in
# the deleted_hash_idx dict
if self.track_renaming:
self.deleted_hash_idx[from_hash].append(len(self.result) - 1)
def add_modify(self, it_from: DirectoryIterator, it_to: DirectoryIterator) -> None:
Add a file modification in the to directory.
it_from: iterator on the from directory
it_to: iterator on the to directory
"type": "modify",
"from": it_from.current(),
"from_path": it_from.current_path(),
"to": it_to.current(),
"to_path": it_to.current_path(),
def add_recursive(self, it: DirectoryIterator, insert: bool) -> None:
Recursively add changes from a directory.
it: iterator on a directory
insert: the type of changes to add (insertion or deletion)
# current iterated element is a regular file,
# simply add adequate change in the list
if not it.current_is_dir():
if insert:
# current iterated element is a directory,
dir_id = it.current_hash()
# handle empty dir insertion/deletion as the swh model allow
# to have such object compared to git
if dir_id == _empty_dir_hash:
if insert:
# iterate on files reachable from it and add
# adequate changes in the list
current_path = it.current_path()
assert current_path is not None
sub_it = DirectoryIterator(self.storage, dir_id, current_path + b"/")
sub_it_current = sub_it.step()
while sub_it_current:
if not sub_it.current_is_dir():
if insert:
sub_it_current = sub_it.step()
def add_recursive_insert(self, it_to: DirectoryIterator) -> None:
Recursively add files insertion from a to directory.
it_to: iterator on a to directory
self.add_recursive(it_to, True)
def add_recursive_delete(self, it_from: DirectoryIterator) -> None:
Recursively add files deletion from a from directory.
it_from: iterator on a from directory
self.add_recursive(it_from, False)
def _diff_elts_same_name(
changes: _RevisionChangesList, it: DoubleDirectoryIterator
) -> None:
""" "
Compare two directory entries with the same name and add adequate
changes if any.
changes: the list of changes between two revisions
it: the iterator traversing two revision directories at the same time
# compare the two current directory elements of the iterator
status = it.compare()
# elements have same hash and same permissions:
# no changes to add and call next on the two iterators
if status["same_hash"] and status["same_perms"]:
# elements are regular files and have been modified:
# insert the modification change in the list and
# call next on the two iterators
elif status["both_are_files"]:
changes.add_modify(it.it_from, it.it_to)
# one element is a regular file, the other a directory:
# recursively add delete/insert changes and call next
# on the two iterators
elif status["file_and_dir"]:
# both elements are directories:
elif status["both_are_dirs"]:
# from directory is empty:
# recursively add insert changes in the to directory
# and call next on the two iterators
if status["from_is_empty_dir"]:
# to directory is empty:
# recursively add delete changes in the from directory
# and call next on the two iterators
elif status["to_is_empty_dir"]:
# both directories are not empty:
# call step on the two iterators to descend further in
# the directory trees.
elif not status["from_is_empty_dir"] and not status["to_is_empty_dir"]:
def _compare_paths(path1: bytes, path2: bytes) -> int:
Compare paths in lexicographic depth-first order.
For instance, it returns:
- "a" < "b"
- "b/c/d" < "b"
- "c/foo.txt" < "c.txt"
path1_parts = path1.split(b"/")
path2_parts = path2.split(b"/")
i = 0
while True:
if len(path1_parts) == len(path2_parts) and i == len(path1_parts):
return 0
elif len(path2_parts) == i:
return 1
elif len(path1_parts) == i:
return -1
if path2_parts[i] > path1_parts[i]:
return -1
elif path2_parts[i] < path1_parts[i]:
return 1
i = i + 1
def _diff_elts(changes: _RevisionChangesList, it: DoubleDirectoryIterator) -> None:
Compare two directory entries.
changes: the list of changes between two revisions
it: the iterator traversing two revision directories at the same time
from_current_path = it.it_from.current_path()
to_current_path = it.it_to.current_path()
assert from_current_path is not None
assert to_current_path is not None
# compare current to and from path in depth-first lexicographic order
c = _compare_paths(from_current_path, to_current_path)
# current from path is lower than the current to path:
# the from path has been deleted
if c < 0:
# current from path is greater than the current to path:
# the to path has been inserted
elif c > 0:
# paths are the same and need more processing
_diff_elts_same_name(changes, it)
def diff_directories(
storage: StorageInterface,
from_dir: Optional[bytes],
to_dir: bytes,
track_renaming: bool = False,
) -> List[Dict[str, Any]]:
Compute the differential between two directories, i.e. the list of
file changes (insertion / deletion / modification / renaming)
between them.
storage: instance of a swh storage (either local or remote, for optimal
performance the use of a local storage is recommended)
from_dir: the swh identifier of the directory to compare from
to_dir: the swh identifier of the directory to compare to
track_renaming: whether or not to track files renaming
A list of dict representing the changes between the two
revisions. Each dict contains the following entries:
- ``type``: a string describing the type of change
(``insert`` / ``delete`` / ``modify`` / ``rename``)
- ``from``: a dict containing the directory entry metadata in the
from revision (:const:`None` in case of an insertion)
- ``from_path``: bytes string corresponding to the absolute path
of the from revision entry (:const:`None` in case of an insertion)
- ``to``: a dict containing the directory entry metadata in the
to revision (:const:`None` in case of a deletion)
- ``to_path``: bytes string corresponding to the absolute path
of the to revision entry (:const:`None` in case of a deletion)
The returned list is sorted in lexicographic depth-first order
according to the value of the ``to_path`` field.
.. warning::
The algorithm used to track files renaming is quite naive (it compares hashes
between deleted and inserted files) and might fail to detect all renamings for
some edge cases.
changes = _RevisionChangesList(storage, track_renaming)
it = DoubleDirectoryIterator(storage, from_dir, to_dir)
while True:
r = it.remaining()
if r == Remaining.NoMoreFiles:
elif r == Remaining.OnlyFromFilesRemain:
elif r == Remaining.OnlyToFilesRemain:
_diff_elts(changes, it)
return changes.result
def diff_revisions(
storage: StorageInterface,
from_rev: Optional[bytes],
to_rev: bytes,
track_renaming: bool = False,
) -> List[Dict[str, Any]]:
Compute the differential between two revisions,
i.e. the list of file changes between the two associated directories.
storage: instance of a swh storage (either local or remote, for optimal
performance the use of a local storage is recommended)
from_rev: the identifier of the revision to compare from
to_rev: the identifier of the revision to compare to
track_renaming: whether or not to track files renaming
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`).
.. warning::
The algorithm used to track files renaming is quite naive (it compares hashes
between deleted and inserted files) and might fail to detect all renamings for
some edge cases.
from_dir = None
if from_rev is not None:
from_dir = _get_rev(storage, from_rev)["directory"]
to_dir = _get_rev(storage, to_rev)["directory"]
return diff_directories(storage, from_dir, to_dir, track_renaming)
def diff_revision(
storage: StorageInterface, revision: bytes, track_renaming: bool = False
) -> List[Dict[str, Any]]:
Computes the differential between a revision and its first parent.
If the revision has no parents, the directory to compare from
is considered as empty.
In other words, it computes the file changes introduced in a
specific revision.
storage: instance of a swh storage (either local or remote, for
optimal performance the use of a local storage is recommended)
revision: the identifier of the revision from which to compute the
introduced changes.
track_renaming: whether or not to track files renaming
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`).
.. warning::
The algorithm used to track files renaming is quite naive (it compares hashes
between deleted and inserted files) and might fail to detect all renamings for
some edge cases.
rev_data = _get_rev(storage, revision)
parent = None
if rev_data["parents"]:
parent = rev_data["parents"][0]
return diff_revisions(storage, parent, revision, track_renaming)