Source code for swh.storage.algos.dir_iterators
# Copyright (C) 2018-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# Utility module to iterate on directory trees.
# The implementation is inspired from the work of Alberto Cortés
# for the go-git project. For more details, you can refer to:
# - this blog post: https://blog.sourced.tech/post/difftree/
# - the reference implementation in go:
# https://github.com/src-d/go-git/tree/master/utils/merkletrie
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional
from swh.model.model import Directory
from swh.storage.interface import StorageInterface
# get the hash identifier for an empty directory
_empty_dir_hash = Directory(entries=()).id
def _get_dir(
storage: StorageInterface, dir_id: Optional[bytes]
) -> List[Dict[str, Any]]:
"""
Return directory data from swh storage.
"""
return list(storage.directory_ls(dir_id)) if dir_id else []
[docs]
class DirectoryIterator:
"""
Helper class used to iterate on a directory tree in a depth-first search
way with some additional features:
- sibling nodes are iterated in lexicographic order by name
- it is possible to skip the visit of sub-directories nodes
for efficiency reasons when comparing two trees (no need to
go deeper if two directories have the same hash)
"""
def __init__(
self, storage: StorageInterface, dir_id: Optional[bytes], base_path: bytes = b""
):
"""
Args:
storage (swh.storage.interface.StorageInterface): instance of
swh storage (either local or remote)
dir_id (bytes): identifier of a root directory
base_path (bytes): optional base path used when traversing
a sub-directory
"""
self.storage = storage
self.root_dir_id = dir_id
self.base_path = base_path
self.restart()
[docs]
def restart(self) -> None:
"""
Restart the iteration at the beginning.
"""
# stack of frames representing currently visited directories:
# the root directory is at the bottom while the current one
# is at the top
self.frames: List[List[Dict[str, Any]]] = []
self._push_dir_frame(self.root_dir_id)
self.has_started = False
def _push_dir_frame(self, dir_id: Optional[bytes]) -> None:
"""
Visit a sub-directory by pushing a new frame to the stack.
Each frame is itself a stack of directory entries.
Args:
dir_id: identifier of a root directory
"""
# get directory entries
dir_data = _get_dir(self.storage, dir_id)
# sort them in lexicographical order and reverse the ordering
# in order to unstack the "smallest" entry each time the
# iterator advances
dir_data = sorted(dir_data, key=lambda e: e["name"], reverse=True)
# push the directory frame to the main stack
self.frames.append(dir_data)
[docs]
def top(self) -> Optional[List[Dict[str, Any]]]:
"""
Returns:
The top frame of the main directories stack
"""
if not self.frames:
return None
return self.frames[-1]
[docs]
def current(self) -> Optional[Dict[str, Any]]:
"""
Returns:
dict: The current visited directory entry, i.e. the
top element from the top frame
"""
top_frame = self.top()
if not top_frame:
return None
return top_frame[-1]
[docs]
def current_hash(self) -> bytes:
"""
Returns:
The hash value of the currently visited directory entry
"""
current = self.current()
assert current is not None
return current["target"]
[docs]
def current_perms(self) -> int:
"""
Returns:
The permissions value of the currently visited directory entry
"""
current = self.current()
assert current is not None
return current["perms"]
[docs]
def current_path(self) -> Optional[bytes]:
"""
Returns:
The absolute path from the root directory of the currently visited
directory entry
"""
top_frame = self.top()
if not top_frame:
return None
path = []
for frame in self.frames:
path.append(frame[-1]["name"])
return self.base_path + b"/".join(path)
[docs]
def current_is_dir(self) -> bool:
"""
Returns:
If the currently visited directory entry is a directory
"""
current = self.current()
assert current is not None
return current["type"] == "dir"
def _advance(self, descend: bool) -> Optional[Dict[str, Any]]:
"""
Advance in the tree iteration.
Args:
descend: whether or not to push a new frame if the currently
visited element is a sub-directory
Returns:
The description of the newly visited directory entry
"""
current = self.current()
if not self.has_started or not current:
self.has_started = True
return current
if descend and self.current_is_dir() and current["target"] != _empty_dir_hash:
self._push_dir_frame(current["target"])
else:
self.drop()
return self.current()
[docs]
def next(self) -> Optional[Dict[str, Any]]:
"""
Advance the tree iteration by dropping the current visited
directory entry from the top frame. If the top frame ends up empty,
the operation is recursively applied to remove all empty frames
as the tree is climbed up towards its root.
Returns:
The description of the newly visited directory entry
"""
return self._advance(False)
[docs]
def step(self) -> Optional[Dict[str, Any]]:
"""
Advance the tree iteration like the next operation with the
difference that if the current visited element is a sub-directory
a new frame representing its content is pushed to the main stack.
Returns:
The description of the newly visited directory entry
"""
return self._advance(True)
[docs]
def drop(self) -> None:
"""
Drop the current visited element from the top frame.
If the frame ends up empty, the operation is recursively
applied.
"""
frame = self.top()
if not frame:
return
frame.pop()
if not frame:
self.frames.pop()
self.drop()
def __next__(self) -> Dict[str, Any]:
entry = self.step()
if not entry:
raise StopIteration
entry["path"] = self.current_path()
return entry
def __iter__(self) -> DirectoryIterator:
return DirectoryIterator(self.storage, self.root_dir_id, self.base_path)
[docs]
def dir_iterator(storage: StorageInterface, dir_id: bytes) -> DirectoryIterator:
"""
Return an iterator for recursively visiting a directory and
its sub-directories. The associated paths are visited in
lexicographic depth-first search order.
Args:
storage: an instance of a swh storage
dir_id: a directory identifier
Returns:
an iterator returning a dict at each iteration step describing
a directory entry. A ``path`` field is added in that dict to store
the absolute path of the entry.
"""
return DirectoryIterator(storage, dir_id)
[docs]
class Remaining(Enum):
"""
Enum to represent the current state when iterating
on both directory trees at the same time.
"""
NoMoreFiles = 0
OnlyToFilesRemain = 1
OnlyFromFilesRemain = 2
BothHaveFiles = 3
[docs]
class DoubleDirectoryIterator:
"""
Helper class to traverse two directory trees at the same
time and compare their contents to detect changes between them.
"""
def __init__(
self, storage: StorageInterface, dir_from: Optional[bytes], dir_to: bytes
):
"""
Args:
storage: instance of swh storage
dir_from: hash identifier of the from directory
dir_to: hash identifier of the to directory
"""
self.storage = storage
self.dir_from = dir_from
self.dir_to = dir_to
self.restart()
[docs]
def restart(self) -> None:
"""
Restart the double iteration at the beginning.
"""
# initialize custom dfs iterators for the two directories
self.it_from = DirectoryIterator(self.storage, self.dir_from)
self.it_to = DirectoryIterator(self.storage, self.dir_to)
# grab the first element of each iterator
self.it_from.next()
self.it_to.next()
[docs]
def next_from(self) -> None:
"""
Apply the next operation on the from iterator.
"""
self.it_from.next()
[docs]
def next_to(self) -> None:
"""
Apply the next operation on the to iterator.
"""
self.it_to.next()
[docs]
def next_both(self) -> None:
"""
Apply the next operation on both iterators.
"""
self.next_from()
self.next_to()
[docs]
def step_from(self) -> None:
"""
Apply the step operation on the from iterator.
"""
self.it_from.step()
[docs]
def step_to(self) -> None:
"""
Apply the step operation on the from iterator.
"""
self.it_to.step()
[docs]
def step_both(self) -> None:
"""
Apply the step operation on the both iterators.
"""
self.step_from()
self.step_to()
[docs]
def remaining(self) -> Remaining:
"""
Returns:
Remaining: the current state of the double iteration
"""
from_current = self.it_from.current()
to_current = self.it_to.current()
# no more files to iterate in both iterators
if not from_current and not to_current:
return Remaining.NoMoreFiles
# still some files to iterate in the to iterator
elif not from_current and to_current:
return Remaining.OnlyToFilesRemain
# still some files to iterate in the from iterator
elif from_current and not to_current:
return Remaining.OnlyFromFilesRemain
# still files to iterate in the both iterators
else:
return Remaining.BothHaveFiles
[docs]
def compare(self) -> Dict[str, Any]:
"""
Compare the current iterated directory entries in both iterators
and return the comparison status.
Returns:
The status of the comparison with the following bool values:
* ``same_hash``: indicates if the two entries have the same hash
* ``same_perms``: indicates if the two entries have the same
permissions
* ``both_are_dirs``: indicates if the two entries are directories
* ``both_are_files``: indicates if the two entries are regular
files
* ``file_and_dir``: indicates if one of the entry is a directory
and the other a regular file
* ``from_is_empty_dir``: indicates if the from entry is the
empty directory
* ``to_is_empty_dir``: indicates if the to entry is the
empty directory
"""
from_current_hash = self.it_from.current_hash()
to_current_hash = self.it_to.current_hash()
from_current_perms = self.it_from.current_perms()
to_current_perms = self.it_to.current_perms()
from_is_dir = self.it_from.current_is_dir()
to_is_dir = self.it_to.current_is_dir()
status = {}
# compare hash
status["same_hash"] = from_current_hash == to_current_hash
# compare permissions
status["same_perms"] = from_current_perms == to_current_perms
# check if both elements are directories
status["both_are_dirs"] = from_is_dir and to_is_dir
# check if both elements are regular files
status["both_are_files"] = not from_is_dir and not to_is_dir
# check if one element is a directory, the other a regular file
status["file_and_dir"] = (
not status["both_are_dirs"] and not status["both_are_files"]
)
# check if the from element is the empty directory
status["from_is_empty_dir"] = (
from_is_dir and from_current_hash == _empty_dir_hash
)
# check if the to element is the empty directory
status["to_is_empty_dir"] = to_is_dir and to_current_hash == _empty_dir_hash
return status