# Copyright (C) 2021-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This document contains a SWH loader for ingesting repository data
from Bazaar or Breezy.
"""
from __future__ import annotations
from datetime import datetime
from functools import lru_cache, partial
import itertools
import os
import tempfile
from typing import Any, Dict, Generator, List, Optional, Set, Tuple, TypeVar, Union
from breezy import errors as bzr_errors
from breezy.branch import Branch as BzrBranch
from breezy.builtins import cmd_branch, cmd_upgrade
from breezy.export import export
from breezy.revision import NULL_REVISION
from breezy.revision import Revision as BzrRevision
from breezy.revision import RevisionID as BzrRevisionId
from breezy.tree import Tree, TreeChange
from swh.loader.core.loader import BaseLoader
from swh.loader.core.utils import clean_dangling_folders, clone_with_timeout
from swh.model import from_disk, swhids
from swh.model.hashutil import hash_to_hex
from swh.model.model import (
Content,
ExtID,
ObjectType,
Person,
Release,
Revision,
RevisionType,
Sha1Git,
Snapshot,
SnapshotBranch,
SnapshotTargetType,
Timestamp,
TimestampWithTimezone,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.bzr.from_disk"
EXTID_TYPE = "bzr-nodeid"
EXTID_VERSION: int = 1
T = TypeVar("T")
# These are all the old Bazaar repository formats that we might encounter
# in the wild. Bazaar's `clone` does not result in an upgrade, it needs to be
# explicit.
older_repository_formats = {
b"Bazaar Knit Repository Format 3 (bzr 0.15)\n",
b"Bazaar Knit Repository Format 4 (bzr 1.0)\n",
b"Bazaar RepositoryFormatKnitPack5 (bzr 1.6)\n",
b"Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6)\n",
b"Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6.1)\n",
b"Bazaar RepositoryFormatKnitPack6 (bzr 1.9)\n",
b"Bazaar RepositoryFormatKnitPack6RichRoot (bzr 1.9)\n",
b"Bazaar development format 2 with subtree support \
(needs bzr.dev from before 1.8)\n",
b"Bazaar development format 8\n",
b"Bazaar pack repository format 1 (needs bzr 0.92)\n",
b"Bazaar pack repository format 1 with rich root (needs bzr 1.0)\n",
b"Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n",
b"Bazaar-NG Knit Repository Format 1",
}
# Latest one as of this time, unlikely to change
expected_repository_format = b"Bazaar repository format 2a (needs bzr 1.16 or later)\n"
[docs]
class BzrDirectory(from_disk.Directory):
"""A more practical directory to create missing parent directories
when adding a path.
"""
def __setitem__(
self, path: bytes, value: Union[from_disk.Content, BzrDirectory]
) -> None:
if b"/" in path:
head, tail = path.split(b"/", 1)
directory = self.get(head)
if directory is None or isinstance(directory, from_disk.Content):
directory = BzrDirectory()
self[head] = directory
directory[tail] = value
else:
super().__setitem__(path, value)
[docs]
def get(
self, path: bytes, default: Optional[T] = None
) -> Optional[Union[from_disk.Content, BzrDirectory, T]]:
# TODO move to swh.model.from_disk.Directory
try:
return self[path]
except KeyError:
return default
[docs]
def sort_changes(change: TreeChange) -> str:
"""Key function for sorting the changes by path.
Sorting allows us to group the folders together (for example "b", then "a/a",
then "a/b"). Reversing this sort in the `sorted()` call will make it
so the files appear before the folder ("a/a", then "a") if the folder has
changed. This removes a bug where the order of operations is:
- "a" goes from directory to file, removing all of its subtree
- "a/a" is removed, but our structure has already forgotten it"""
source_path, target_path = change.path
# Neither path can be the empty string
return source_path or target_path
[docs]
class BazaarLoader(BaseLoader):
"""Loads a Bazaar repository"""
visit_type = "bzr"
def __init__(
self,
storage: StorageInterface,
url: str,
directory: Optional[str] = None,
visit_date: Optional[datetime] = None,
temp_directory: str = "/tmp",
clone_timeout_seconds: int = 7200,
check_revision: int = 0,
**kwargs: Any,
):
super().__init__(storage=storage, origin_url=url, **kwargs)
self._temp_directory = temp_directory
self._clone_timeout = clone_timeout_seconds
self._revision_id_to_sha1git: Dict[BzrRevisionId, Sha1Git] = {}
self._last_root = BzrDirectory()
self._tags: Optional[Dict[bytes, BzrRevisionId]] = None
self._head_revision_id: Optional[bytes] = None
# Remember the previous revision to only compute the delta between
# revisions
self._prev_revision: Optional[BzrRevision] = None
self._branch: Optional[BzrBranch] = None
# Revisions that are pointed to, but don't exist in the current branch
# Rare, but exist usually for cross-VCS references.
self._ghosts: Set[BzrRevisionId] = set()
# Exists if in an incremental run, is the latest saved revision from
# this origin
self._latest_head: Optional[BzrRevisionId] = None
self._load_status = "eventful"
self.visit_date = visit_date or self.visit_date
self.directory = directory
self.branch: Optional[BzrBranch] = None
self.check_revision = check_revision
[docs]
def pre_cleanup(self) -> None:
"""As a first step, will try and check for dangling data to cleanup.
This should do its best to avoid raising issues.
"""
clean_dangling_folders(
self._temp_directory,
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN,
log=self.log,
)
[docs]
def prepare(self) -> None:
"""Second step executed by the loader to prepare some state needed by
the loader.
"""
latest_snapshot = snapshot_get_latest(self.storage, self.origin.url)
if latest_snapshot:
self._set_recorded_state(latest_snapshot)
[docs]
def load_status(self) -> Dict[str, str]:
"""Detailed loading status.
Defaults to logging an eventful load.
Returns: a dictionary that is eventually passed back as the task's
result to the scheduler, allowing tuning of the task recurrence
mechanism.
"""
return {
"status": self._load_status,
}
def _set_recorded_state(self, latest_snapshot: Snapshot) -> None:
if not latest_snapshot.branches:
# Last snapshot was empty
return
head = latest_snapshot.branches[b"trunk"]
bzr_head = self._get_extids_for_targets([head.target])[0].extid
self._latest_head = BzrRevisionId(bzr_head)
def _get_extids_for_targets(self, targets: List[Sha1Git]) -> List[ExtID]:
"""Get all Bzr ExtIDs for the targets in the latest snapshot"""
extids = []
for extid in self.storage.extid_get_from_target(
swhids.ObjectType.REVISION,
targets,
extid_type=EXTID_TYPE,
extid_version=EXTID_VERSION,
):
extids.append(extid)
self._revision_id_to_sha1git[BzrRevisionId(extid.extid)] = (
extid.target.object_id
)
if extids:
# Filter out dangling extids, we need to load their target again
revisions_missing = self.storage.revision_missing(
[extid.target.object_id for extid in extids]
)
extids = [
extid
for extid in extids
if extid.target.object_id not in revisions_missing
]
return extids
[docs]
def cleanup(self) -> None:
if self.branch is not None:
self.branch.unlock()
[docs]
def get_branch(self) -> BzrBranch:
return BzrBranch.open(self._repo_directory)
[docs]
def run_upgrade(self):
"""Upgrade both repository and branch to the most recent supported version
to be compatible with the loader."""
cmd_upgrade().run(self._repo_directory, clean=True)
[docs]
def fetch_data(self) -> bool:
"""Fetch the data from the source the loader is currently loading
Returns:
a value that is interpreted as a boolean. If True, fetch_data needs
to be called again to complete loading.
"""
if not self.directory: # no local repository
self._repo_directory = tempfile.mkdtemp(
prefix=TEMPORARY_DIR_PREFIX_PATTERN,
suffix=f"-{os.getpid()}",
dir=self._temp_directory,
)
msg = "Cloning '%s' to '%s' with timeout %s seconds"
self.log.debug(
msg, self.origin.url, self._repo_directory, self._clone_timeout
)
closure = partial(
cmd_branch().run,
self.origin.url,
self._repo_directory,
no_tree=True,
use_existing_dir=True,
)
clone_with_timeout(
self.origin.url, self._repo_directory, closure, self._clone_timeout
)
else: # existing local repository
# Allow to load on disk repository without cloning
# for testing purpose.
self.log.debug("Using local directory '%s'", self.directory)
self._repo_directory = self.directory
branch = self.get_branch()
repository_format = branch.repository._format.get_format_string()
if not repository_format == expected_repository_format:
if repository_format in older_repository_formats:
self.log.debug(
"Upgrading repository from format '%s'",
repository_format.decode("ascii").strip("\n"),
)
self.run_upgrade()
branch = self.get_branch()
else:
raise UnknownRepositoryFormat()
if not branch.supports_tags():
# Some repos have the right format marker but their branches do not
# support tags
self.log.debug("Branch does not support tags, upgrading")
self.run_upgrade()
branch = self.get_branch()
branch.lock_read()
self.branch = branch
self.tags # set the property
return False
[docs]
def store_data(self) -> None:
"""Store fetched data in the database."""
assert self.branch is not None
assert self.tags is not None
# Insert revisions using a topological sorting
length_ingested_revs = 0
for rev in self._get_bzr_revs_to_load():
self.store_revision(self.branch.repository.get_revision(rev))
length_ingested_revs += 1
if length_ingested_revs == 0 and (
self.branch.last_revision() in (self._latest_head, NULL_REVISION)
):
# no new revision ingested, so uneventful
# still we'll make a snapshot, so we continue
self._load_status = "uneventful"
snapshot_branches: Dict[bytes, Optional[SnapshotBranch]] = {}
for tag_name, target in self.tags.items():
label = b"tags/%s" % tag_name
if target == NULL_REVISION:
# Some very rare repositories have meaningless tags that point
# to the null revision.
self.log.debug("Tag '%s' points to the null revision", tag_name)
snapshot_branches[label] = None
continue
try:
# Used only to detect corruption
self.branch.revision_id_to_dotted_revno(target)
except (
bzr_errors.NoSuchRevision,
bzr_errors.GhostRevisionsHaveNoRevno,
bzr_errors.UnsupportedOperation,
):
# Bad tag data/merges can lead to tagged revisions
# which are not in this branch. We cannot point a tag there.
snapshot_branches[label] = None
continue
snp_target = self._get_revision_id_from_bzr_id(target)
snapshot_branches[label] = SnapshotBranch(
target=self.store_release(tag_name, snp_target),
target_type=SnapshotTargetType.RELEASE,
)
if self.branch.last_revision() != NULL_REVISION:
head_revision_git_hash = self._get_revision_id_from_bzr_id(
self.branch.last_revision()
)
snapshot_branches[b"trunk"] = SnapshotBranch(
target=head_revision_git_hash, target_type=SnapshotTargetType.REVISION
)
snapshot_branches[b"HEAD"] = SnapshotBranch(
target=b"trunk",
target_type=SnapshotTargetType.ALIAS,
)
snapshot = Snapshot(branches=snapshot_branches)
self.storage.snapshot_add([snapshot])
self.flush()
self.loaded_snapshot_id = snapshot.id
[docs]
def store_revision(self, bzr_rev: BzrRevision) -> None:
self.log.debug("Storing revision '%s'", bzr_rev.revision_id)
directory = self.store_directories(bzr_rev)
associated_bugs = [
(b"bug", b"%s %s" % (status.encode(), url.encode()))
for url, status in bzr_rev.iter_bugs()
]
extra_headers = [
(
b"time_offset_seconds",
str(bzr_rev.timezone).encode(),
),
*associated_bugs,
]
timestamp = Timestamp(int(bzr_rev.timestamp), 0)
timezone = round(int(bzr_rev.timezone) / 60)
date = TimestampWithTimezone.from_numeric_offset(timestamp, timezone, False)
committer = (
Person.from_fullname(bzr_rev.committer.encode())
if bzr_rev.committer is not None
else None
)
# TODO (how) should we store multiple authors? (T3887)
revision = Revision(
author=Person.from_fullname(bzr_rev.get_apparent_authors()[0].encode()),
date=date,
committer=committer,
committer_date=date if committer is not None else None,
type=RevisionType.BAZAAR,
directory=directory,
message=bzr_rev.message.encode(),
extra_headers=extra_headers,
synthetic=False,
parents=self._get_revision_parents(bzr_rev),
)
revno = (
self.branch.revision_id_to_dotted_revno(bzr_rev.revision_id)[0]
if self.branch
else None
)
if self.check_revision and revno and revno % self.check_revision == 0:
with tempfile.TemporaryDirectory() as tmp_dir:
export(self._get_revision_tree(bzr_rev.revision_id), tmp_dir)
exported_dir = from_disk.Directory.from_disk(path=tmp_dir.encode())
if directory != exported_dir.hash:
raise ValueError(
f"Hash tree computation divergence detected at revision {revno}"
f" ({bzr_rev.revision_id.decode()}), "
f"expected: {hash_to_hex(exported_dir.hash)}, "
f"actual: {hash_to_hex(directory)}"
)
self._revision_id_to_sha1git[bzr_rev.revision_id] = revision.id
self.storage.revision_add([revision])
self.storage.extid_add(
[
ExtID(
extid_type=EXTID_TYPE,
extid_version=EXTID_VERSION,
extid=bzr_rev.revision_id,
target=revision.swhid(),
)
]
)
[docs]
def store_directories(self, bzr_rev: BzrRevision) -> Sha1Git:
"""Store a revision's directories."""
new_tree = self._get_revision_tree(bzr_rev.revision_id)
if self._prev_revision is None:
self._store_directories_slow(bzr_rev, new_tree)
return self._store_tree(bzr_rev)
old_tree = self._get_revision_tree(self._prev_revision.revision_id)
delta = new_tree.changes_from(old_tree)
if delta.renamed or delta.copied:
# Figuring out all nested and possibly conflicting renames is a lot
# of effort for very few revisions, just go the slow way
self._store_directories_slow(bzr_rev, new_tree)
return self._store_tree(bzr_rev)
to_remove = sorted(
delta.removed + delta.missing, key=sort_changes, reverse=True
)
for change in to_remove:
path = change.path[0]
del self._last_root[path.encode()]
# `delta.kind_changed` needs to happen before `delta.added` since a file
# could be added under a node that changed from directory to file at the
# same time, for example
for change in itertools.chain(delta.kind_changed, delta.added, delta.modified):
path = change.path[1]
(kind, size, executable, _sha1_or_link) = new_tree.path_content_summary(
path
)
dir_entry: Union[BzrDirectory, from_disk.Content]
if kind in ("file", "symlink"):
dir_entry = self.store_content(
bzr_rev,
path,
kind,
executable,
size,
_sha1_or_link if kind == "symlink" else None,
)
elif kind in ("directory", "tree-reference"):
# nested tree is not recursively imported as it might be missing in
# the repository, create an empty directory instead as bzr export does
# by default
dir_entry = BzrDirectory()
else:
raise RuntimeError(
f"Unable to process directory entry {path} of kind {kind}"
)
self._last_root[path.encode()] = dir_entry
self._prev_revision = bzr_rev
return self._store_tree(bzr_rev)
[docs]
def store_release(self, name: bytes, target: Sha1Git) -> Sha1Git:
"""Store a release given its name and its target.
Args:
name: name of the release.
target: sha1_git of the target revision.
Returns:
the sha1_git of the stored release.
"""
release = Release(
name=name,
target=target,
target_type=ObjectType.REVISION,
message=None,
metadata=None,
synthetic=False,
author=Person(name=None, email=None, fullname=b""),
date=None,
)
self.storage.release_add([release])
return release.id
[docs]
def store_content(
self,
bzr_rev: BzrRevision,
file_path: str,
kind: str,
executable: bool,
size: int,
symlink_target: Optional[str] = None,
) -> from_disk.Content:
assert kind in ("file", "symlink")
if executable:
perms = from_disk.DentryPerms.executable_content
elif kind == "symlink":
perms = from_disk.DentryPerms.symlink
else: # kind == "file":
perms = from_disk.DentryPerms.content
if kind == "file":
rev_tree = self._get_revision_tree(bzr_rev.revision_id)
with rev_tree.get_file(file_path) as f:
data = f.read()
assert len(data) == size
else: # kind == "symlink":
assert symlink_target is not None
data = symlink_target.encode()
content = Content.from_data(data)
self.storage.content_add([content])
return from_disk.Content({"sha1_git": content.sha1_git, "perms": perms})
def _get_bzr_revs_to_load(self) -> Generator[BzrRevision, None, None]:
assert self.branch is not None
if self._latest_head is not None:
common_revisions = [self._latest_head]
else:
common_revisions = []
# there's nothing to fetch in NULL_REVISION
common_revisions.append(NULL_REVISION)
graph = self.branch.repository.get_graph()
todo = graph.find_unique_ancestors(
self.branch.last_revision(), common_revisions
)
return graph.iter_topo_order(todo)
# We want to cache at most the current revision and the last, no need to
# take cache more than this.
@lru_cache(maxsize=2)
def _get_revision_tree(self, rev: BzrRevisionId) -> Tree:
assert self.branch is not None
return self.branch.repository.revision_tree(rev)
def _store_tree(self, bzr_rev: BzrRevision) -> Sha1Git:
"""Save the current in-memory tree to storage."""
directories: List[from_disk.Directory] = [self._last_root]
while directories:
directory = directories.pop()
self.storage.directory_add([directory.to_model()])
directories.extend(
[
item
for item in directory.values()
if isinstance(item, from_disk.Directory)
]
)
self._prev_revision = bzr_rev
return self._last_root.hash
def _store_directories_slow(self, bzr_rev: BzrRevision, tree: Tree) -> None:
"""Store a revision's directories.
This is the slow variant: it does not use a diff from the last revision
but lists all the files. It is used for the first revision of a load
(the null revision for a full run, the last recorded head for an
incremental one) or for cases where the headaches of figuring out the
delta from the breezy primitives is not worth it.
"""
# Don't reuse the last root, we're listing everything anyway, and we
# could be keeping around deleted files
self._last_root = BzrDirectory()
for path, entry in tree.iter_entries_by_dir():
if path == "":
# root repo is created by default
continue
dir_entry: Union[BzrDirectory, from_disk.Content]
if entry.kind in ("file", "symlink"):
dir_entry = self.store_content(
bzr_rev,
path,
entry.kind,
entry.executable,
entry.text_size,
entry.symlink_target if entry.kind == "symlink" else None,
)
elif entry.kind in ("directory", "tree-reference"):
# nested tree is not recursively imported as it might be missing in
# the repository, create an empty directory instead as bzr export does
# by default
dir_entry = BzrDirectory()
else:
raise RuntimeError(
f"Unable to process directory entry {path} of kind {entry.kind}"
)
self._last_root[path.encode()] = dir_entry
def _get_revision_parents(self, bzr_rev: BzrRevision) -> Tuple[Sha1Git, ...]:
parents = []
for parent_id in bzr_rev.parent_ids:
if parent_id == NULL_REVISION:
# Paranoid, don't think that actually happens
continue
try:
revision_id = self._get_revision_id_from_bzr_id(parent_id)
except LookupError:
assert self.branch is not None
# Check if this is a ghost:
if (
not self.branch.repository.get_parent_map([parent_id]).get(
parent_id
)
== ()
):
self._ghosts.add(parent_id)
# We can't store ghosts in any meaningful way (yet?). They
# have no contents by definition, and they're pretty rare,
# so just ignore them.
continue
raise
parents.append(revision_id)
return tuple(parents)
def _get_revision_id_from_bzr_id(self, bzr_id: BzrRevisionId) -> Sha1Git:
"""Return the git sha1 of a revision given its bazaar revision id."""
from_cache = self._revision_id_to_sha1git.get(bzr_id)
if from_cache is not None:
return from_cache
# The parent was not loaded in this run, get it from storage
from_storage = self.storage.extid_get_from_extid(
EXTID_TYPE, ids=[bzr_id], version=EXTID_VERSION
)
if len(from_storage) != 1:
msg = "Expected 1 match from storage for bzr node %r, got %d"
raise LookupError(msg % (bzr_id.hex(), len(from_storage)))
return from_storage[0].target.object_id
@property
def tags(self) -> Optional[Dict[bytes, BzrRevisionId]]:
assert self.branch is not None
if self._tags is None:
self._tags = {
n.encode(): r for n, r in self.branch.tags.get_tag_dict().items()
}
return self._tags