Source code for swh.loader.cvs.loader

# Copyright (C) 2015-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Loader in charge of injecting either new or existing cvs repositories to
swh-storage.

"""
from datetime import datetime
import os
import os.path
import subprocess
import tempfile
import time
from typing import (
    Any,
    BinaryIO,
    Dict,
    Iterator,
    List,
    Optional,
    Sequence,
    TextIO,
    Tuple,
    cast,
)
from urllib.parse import urlparse

import sentry_sdk
from tenacity import retry
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt

from swh.loader.core.loader import BaseLoader
from swh.loader.core.utils import clean_dangling_folders
from swh.loader.cvs.cvs2gitdump.cvs2gitdump import (
    CHANGESET_FUZZ_SEC,
    ChangeSetKey,
    CvsConv,
    FileRevision,
    RcsKeywords,
    file_path,
)
from swh.loader.cvs.cvsclient import CVSClient, CVSProtocolError, decode_path
import swh.loader.cvs.rcsparse as rcsparse
from swh.loader.cvs.rlog import RlogConv
from swh.loader.exception import NotFound
from swh.model import from_disk, hashutil
from swh.model.model import (
    Content,
    Directory,
    Person,
    Revision,
    RevisionType,
    Sha1Git,
    SkippedContent,
    Snapshot,
    SnapshotBranch,
    TargetType,
    TimestampWithTimezone,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface

DEFAULT_BRANCH = b"HEAD"

TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.cvs."


[docs] def rsync_retry(): return retry( retry=retry_if_exception_type(subprocess.CalledProcessError), stop=stop_after_attempt(max_attempt_number=4), reraise=True, )
[docs] class BadPathException(Exception): pass
[docs] class CvsLoader(BaseLoader): """Swh cvs loader. The repository is local. The loader deals with update on an already previously loaded repository. """ visit_type = "cvs" cvs_module_name: str cvsclient: CVSClient # remote CVS repository access (history is parsed from CVS rlog): rlog_file: BinaryIO swh_revision_gen: Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ] def __init__( self, storage: StorageInterface, url: str, origin_url: Optional[str] = None, visit_date: Optional[datetime] = None, cvsroot_path: Optional[str] = None, temp_directory: str = "/tmp", **kwargs: Any, ): self.cvsroot_url = url.rstrip("/") # origin url as unique identifier for origin in swh archive origin_url = origin_url.rstrip("/") if origin_url else self.cvsroot_url super().__init__(storage=storage, origin_url=origin_url, **kwargs) self.temp_directory = temp_directory # internal state used to store swh objects self._contents: List[Content] = [] self._skipped_contents: List[SkippedContent] = [] self._directories: List[Directory] = [] self._revisions: List[Revision] = [] # internal state, current visit self._last_revision: Optional[Revision] = None self._visit_status = "full" self.visit_date = visit_date or self.visit_date self.cvsroot_path = cvsroot_path self.custom_id_keyword: Optional[str] = None self.excluded_keywords: List[str] = [] self.swh_dir = from_disk.Directory() self.snapshot: Optional[Snapshot] = None self.last_snapshot: Optional[Snapshot] = snapshot_get_latest( self.storage, self.origin.url )
[docs] def compute_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes] ) -> Tuple[Revision, from_disk.Directory]: """Compute swh hash data per CVS changeset. Returns: tuple (rev, swh_directory) - rev: current SWH revision computed from checked out work tree - swh_directory: dictionary of path, swh hash data with type """ # Compute SWH revision from the on-disk state parents: Tuple[Sha1Git, ...] if self._last_revision: parents = (self._last_revision.id,) else: parents = () swh_dir = self.swh_dir[self.cvs_module_name.encode()] revision = self.build_swh_revision(k, logmsg, swh_dir.hash, parents) self.log.debug("SWH revision ID: %s", hashutil.hash_to_hex(revision.id)) self._last_revision = revision return (revision, swh_dir)
[docs] def file_path_is_safe(self, wtpath: bytes): tempdir = os.fsencode(self.tempdir_path) if os.fsencode("%s..%s" % (os.path.sep, os.path.sep)) in wtpath: # Paths with back-references should not appear # in CVS protocol messages or CVS rlog output return False elif os.path.commonpath([tempdir, os.path.normpath(wtpath)]) != tempdir: # The path must be a child of our temporary directory. return False else: return True
[docs] def add_content(self, path: bytes, wtpath: bytes): path_parts = path.split(b"/") current_path = b"" for p in path_parts[:-1]: current_path = os.path.join(current_path, p) if current_path not in self.swh_dir: self.swh_dir[current_path] = from_disk.Directory() self.swh_dir[path] = from_disk.Content.from_file(path=wtpath)
[docs] def checkout_file_with_rcsparse( self, k: ChangeSetKey, f: FileRevision, rcsfile: rcsparse.rcsfile ) -> None: assert self.cvsroot_path assert self.server_style_cvsroot path = file_path(os.fsencode(self.cvsroot_path), f.path) wtpath = os.path.join(os.fsencode(self.tempdir_path), path) if not self.file_path_is_safe(wtpath): raise BadPathException(f"unsafe path found in RCS file: {f.path!r}") self.log.debug("rev %s state %s file %s", f.rev, f.state, f.path) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass if path in self.swh_dir: del self.swh_dir[path] else: # create, or update, this file in the work tree if not rcsfile: rcsfile = rcsparse.rcsfile(f.path) rcs = RcsKeywords() # We try our best to generate the same commit hashes over both pserver # and rsync. To avoid differences in file content due to expansion of # RCS keywords which contain absolute file paths (such as "Header"), # attempt to expand such paths in the same way as a regular CVS server # would expand them. # Whether this will avoid content differences depends on pserver and # rsync servers exposing the same server-side path to the CVS repository. # However, this is the best we can do, and only matters if an origin can # be fetched over both pserver and rsync. Each will still be treated as # a distinct origin, but will hopefully point at the same SWH snapshot. # In any case, an absolute path based on the origin URL looks nicer than # an absolute path based on a temporary directory used by the CVS loader. path_str, encoding = decode_path(f.path) server_style_path = path_str.replace( self.cvsroot_path, self.server_style_cvsroot ) if server_style_path[0] != "/": server_style_path = "/" + server_style_path if self.custom_id_keyword is not None: rcs.add_id_keyword(self.custom_id_keyword) contents = rcs.expand_keyword( server_style_path, rcsfile, f.rev, self.excluded_keywords, filename_encoding=encoding, ) os.makedirs(os.path.dirname(wtpath), exist_ok=True) outfile = open(wtpath, mode="wb") outfile.write(contents) outfile.close() self.add_content(path, wtpath)
[docs] def checkout_file_with_cvsclient( self, k: ChangeSetKey, f: FileRevision, cvsclient: CVSClient ): assert self.cvsroot_path path = file_path(os.fsencode(self.cvsroot_path), f.path) wtpath = os.path.join(os.fsencode(self.tempdir_path), path) if not self.file_path_is_safe(wtpath): raise BadPathException(f"unsafe path found in cvs rlog output: {f.path!r}") self.log.debug("rev %s state %s file %s", f.rev, f.state, f.path) if f.state == "dead": # remove this file from work tree try: os.remove(wtpath) except FileNotFoundError: pass if path in self.swh_dir: del self.swh_dir[path] else: dirname = os.path.dirname(wtpath) os.makedirs(dirname, exist_ok=True) self.log.debug("checkout to %s\n", wtpath) cvsclient.checkout(path, f.rev, dest_path=wtpath, expand_keywords=True) self.add_content(path, wtpath)
[docs] def process_cvs_changesets( self, cvs_changesets: List[ChangeSetKey], use_rcsparse: bool, ) -> Iterator[ Tuple[List[Content], List[SkippedContent], List[Directory], Revision] ]: """Process CVS revisions. At each CVS revision, check out contents and compute swh hashes. Yields: tuple (contents, skipped-contents, directories, revision) of dict as a dictionary with keys, sha1_git, sha1, etc... """ for k in cvs_changesets: tstr = time.strftime("%c", time.gmtime(k.max_time)) self.log.debug( "changeset from %s by %s on branch %s", tstr, k.author, k.branch ) logmsg: Optional[bytes] = b"" # Check out all files of this revision and get a log message. # # The log message is obtained from the first file in the changeset. # The message will usually be the same for all affected files, and # the SWH archive will only store one version of the log message. for f in k.revs: rcsfile = None if use_rcsparse: if rcsfile is None: rcsfile = rcsparse.rcsfile(f.path) if not logmsg: logmsg = rcsfile.getlog(k.revs[0].rev) self.checkout_file_with_rcsparse(k, f, rcsfile) else: if not logmsg: logmsg = self.rlog.getlog(self.rlog_file, f.path, k.revs[0].rev) self.checkout_file_with_cvsclient(k, f, self.cvsclient) # TODO: prune empty directories? (revision, swh_dir) = self.compute_swh_revision(k, logmsg) contents: List[Content] = [] skipped_contents: List[SkippedContent] = [] directories: List[Directory] = [] for obj_node in swh_dir.collect(): obj = obj_node.to_model() # type: ignore obj_type = obj.object_type if obj_type in ( Content.object_type, from_disk.DiskBackedContent.object_type, ): contents.append(obj.with_data()) elif obj_type == SkippedContent.object_type: skipped_contents.append(obj) elif obj_type == Directory.object_type: directories.append(obj) else: assert False, obj_type yield contents, skipped_contents, directories, revision
[docs] def pre_cleanup(self) -> None: """Cleanup potential dangling files from prior runs (e.g. OOM killed tasks) """ clean_dangling_folders( self.temp_directory, pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, log=self.log, )
[docs] def cleanup(self) -> None: self.log.debug("cleanup")
[docs] def configure_custom_id_keyword(self, cvsconfig: TextIO): """Parse CVSROOT/config and look for a custom keyword definition. There are two different configuration directives in use for this purpose. The first variant stems from a patch which was never accepted into upstream CVS and uses the tag directive: tag=MyName With this, the "MyName" keyword becomes an alias for the "Id" keyword. This variant is prelevant in CVS versions shipped on BSD. The second variant stems from upstream CVS 1.12 and looks like: LocalKeyword=MyName=SomeKeyword KeywordExpand=iMyName We only support "SomeKeyword" if it specifies "Id" or "CVSHeader", for now. The KeywordExpand directive can be used to suppress expansion of keywords by listing keywords after an initial "e" character ("exclude", as opposed to an "include" list which uses an initial "i" character). For example, this disables expansion of the Date and Name keywords: KeywordExpand=eDate,Name """ for line in cvsconfig: line = line.strip() try: (config_key, value) = line.split("=", 1) except ValueError: continue config_key = config_key.strip() value = value.strip() if config_key == "tag": self.custom_id_keyword = value elif config_key == "LocalKeyword": try: (custom_kwname, kwname) = value.split("=", 1) except ValueError: continue if kwname.strip() in ("Id", "CVSHeader"): self.custom_id_keyword = custom_kwname.strip() elif config_key == "KeywordExpand" and value.startswith("e"): excluded_keywords = value[1:].split(",") for k in excluded_keywords: self.excluded_keywords.append(k.strip())
[docs] @rsync_retry() def execute_rsync( self, rsync_cmd: List[str], **run_opts ) -> subprocess.CompletedProcess: rsync = subprocess.run(rsync_cmd, **run_opts) rsync.check_returncode() return rsync
[docs] def fetch_cvs_repo_with_rsync(self, host: str, path: str) -> None: # URL *must* end with a trailing slash in order to get CVSROOT listed url = "rsync://%s%s/" % (host, os.path.dirname(path)) try: rsync = self.execute_rsync( ["rsync", url], capture_output=True, encoding="utf-8" ) except subprocess.CalledProcessError as cpe: if cpe.returncode == 23 and "No such file or directory" in cpe.stderr: raise NotFound("CVS repository not found at {url}") raise have_cvsroot = False have_module = False for line in rsync.stdout.split("\n"): self.log.debug("rsync server: %s", line) if line.endswith(" CVSROOT"): have_cvsroot = True elif line.endswith(" %s" % self.cvs_module_name): have_module = True if have_module and have_cvsroot: break if not have_module: raise NotFound(f"CVS module {self.cvs_module_name} not found at {url}") if not have_cvsroot: raise NotFound(f"No CVSROOT directory found at {url}") # Fetch the CVSROOT directory and the desired CVS module. assert self.cvsroot_path for d in ("CVSROOT", self.cvs_module_name): target_dir = os.path.join(self.cvsroot_path, d) os.makedirs(target_dir, exist_ok=True) # Append trailing path separators ("/" in the URL and os.path.sep in the # local target directory path) to ensure that rsync will place files # directly within our target directory . self.execute_rsync( ["rsync", "-az", url + d + "/", target_dir + os.path.sep] )
[docs] def prepare(self) -> None: self._last_revision = None self.tempdir_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) url = urlparse(self.origin.url) self.log.debug( "prepare; origin_url=%s scheme=%s path=%s", self.origin.url, url.scheme, url.path, ) if not url.path: raise NotFound(f"Invalid CVS origin URL '{self.origin.url}'") self.cvs_module_name = os.path.basename(url.path) self.server_style_cvsroot = os.path.dirname(url.path) self.worktree_path = os.path.join(self.tempdir_path, self.cvs_module_name) if url.scheme == "file" or url.scheme == "rsync": # local CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = tempfile.mkdtemp( suffix="-%s" % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=self.temp_directory, ) if url.scheme == "file": if not os.path.exists(url.path): raise NotFound elif url.scheme == "rsync": assert url.hostname is not None self.fetch_cvs_repo_with_rsync(url.hostname, url.path) have_rcsfile = False have_cvsroot = False for root, dirs, files in os.walk(os.fsencode(self.cvsroot_path)): if b"CVSROOT" in dirs: have_cvsroot = True dirs.remove(b"CVSROOT") continue for f in files: filepath = os.path.join(root, f) if f[-2:] == b",v": rcsfile = rcsparse.rcsfile(filepath) # noqa: F841 self.log.debug( "Looks like we have data to convert; " "found a valid RCS file at %s", filepath, ) have_rcsfile = True break if have_rcsfile: break if not have_rcsfile: raise NotFound( f"Directory {self.cvsroot_path} does not contain any valid " "RCS files", ) if not have_cvsroot: self.log.warn( "The CVS repository at '%s' lacks a CVSROOT directory; " "we might be ingesting an incomplete copy of the repository", self.cvsroot_path, ) # The file CVSROOT/config will usually contain ASCII data only. # We allow UTF-8 just in case. Other encodings may result in an # error and will require manual intervention, for now. cvsconfig_path = os.path.join(self.cvsroot_path, "CVSROOT", "config") if os.path.exists(cvsconfig_path): cvsconfig = open(cvsconfig_path, mode="r", encoding="utf-8") self.configure_custom_id_keyword(cvsconfig) cvsconfig.close() # Unfortunately, there is no way to convert CVS history in an # iterative fashion because the data is not indexed by any kind # of changeset ID. We need to walk the history of each and every # RCS file in the repository during every visit, even if no new # changes will be added to the SWH archive afterwards. # "CVS’s repository is the software equivalent of a telephone book # sorted by telephone number." # https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/ # # An implicit assumption made here is that self.cvs_changesets will # fit into memory in its entirety. If it won't fit then the CVS walker # will need to be modified such that it spools the list of changesets # to disk instead. cvs = CvsConv(self.cvsroot_path, RcsKeywords(), False, CHANGESET_FUZZ_SEC) self.log.debug("Walking CVS module %s", self.cvs_module_name) cvs.walk(self.cvs_module_name) cvs_changesets = sorted(cvs.changesets) self.log.debug( "CVS changesets found in %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=True ) elif url.scheme == "pserver" or url.scheme == "fake" or url.scheme == "ssh": # remote CVS repository conversion if not self.cvsroot_path: self.cvsroot_path = os.path.dirname(url.path) self.cvsclient = CVSClient(url) cvsroot_path = os.path.dirname(url.path) self.log.debug( "Fetching CVS rlog from %s:%s/%s", url.hostname, cvsroot_path, self.cvs_module_name, ) try: main_rlog_file = self.cvsclient.fetch_rlog() except CVSProtocolError as cvs_err: if "cannot find module" in str(cvs_err): raise NotFound( f"CVS module named {self.cvs_module_name} cannot be found" ) else: raise self.rlog = RlogConv(cvsroot_path, CHANGESET_FUZZ_SEC) self.rlog.parse_rlog(main_rlog_file) # Find file deletion events only visible in Attic directories. main_changesets = self.rlog.changesets attic_paths = [] attic_rlog_files = [] assert self.cvsroot_path cvsroot_path_bytes = os.fsencode(self.cvsroot_path) for k in main_changesets: for changed_file in k.revs: path = file_path(cvsroot_path_bytes, changed_file.path) if path.startswith(cvsroot_path_bytes): path = path[ len(os.path.commonpath([cvsroot_path_bytes, path])) + 1 : ] parent_path = os.path.dirname(path) if parent_path.split(b"/")[-1] == b"Attic": continue attic_path = parent_path + b"/Attic" if attic_path in attic_paths: continue attic_paths.append(attic_path) # avoid multiple visits # Try to fetch more rlog data from this Attic directory. attic_rlog_file = self.cvsclient.fetch_rlog( path=attic_path, state="dead", ) if attic_rlog_file: attic_rlog_files.append(attic_rlog_file) if len(attic_rlog_files) == 0: self.rlog_file = main_rlog_file else: # Combine all the rlog pieces we found and re-parse. fp = tempfile.TemporaryFile() for attic_rlog_file in attic_rlog_files: for line in attic_rlog_file: fp.write(line) attic_rlog_file.close() main_rlog_file.seek(0) for line in main_rlog_file: fp.write(line) main_rlog_file.close() fp.seek(0) self.rlog.parse_rlog(cast(BinaryIO, fp)) self.rlog_file = cast(BinaryIO, fp) cvs_changesets = sorted(self.rlog.changesets) self.log.debug( "CVS changesets found for %s: %d", self.cvs_module_name, len(cvs_changesets), ) self.swh_revision_gen = self.process_cvs_changesets( cvs_changesets, use_rcsparse=False ) else: raise NotFound(f"Invalid CVS origin URL '{self.origin.url}'")
[docs] def fetch_data(self) -> bool: """Fetch the next CVS revision.""" try: data = next(self.swh_revision_gen) except StopIteration: self.snapshot = self.generate_and_load_snapshot(self._last_revision) self.log.debug( "SWH snapshot ID: %s", hashutil.hash_to_hex(self.snapshot.id) ) self.flush() self.loaded_snapshot_id = self.snapshot.id return False except Exception: self.log.exception("Exception in fetch_data:") sentry_sdk.capture_exception() self._visit_status = "failed" return False # Stopping iteration self._contents, self._skipped_contents, self._directories, rev = data self._revisions = [rev] return True
[docs] def build_swh_revision( self, k: ChangeSetKey, logmsg: Optional[bytes], dir_id: bytes, parents: Sequence[bytes], ) -> Revision: """Given a CVS revision, build a swh revision. Args: k: changeset data logmsg: the changeset's log message dir_id: the tree's hash identifier parents: the revision's parents identifier Returns: The swh revision dictionary. """ author = Person.from_fullname(k.author.encode("UTF-8")) date = TimestampWithTimezone.from_dict(k.max_time) return Revision( type=RevisionType.CVS, date=date, committer_date=date, directory=dir_id, message=logmsg, author=author, committer=author, synthetic=True, extra_headers=[], parents=tuple(parents), )
[docs] def generate_and_load_snapshot( self, revision: Optional[Revision] = None ) -> Snapshot: """Create the snapshot either from existing revision. Args: revision (dict): Last revision seen if any (None by default) Returns: Optional[Snapshot] The newly created snapshot """ snap = Snapshot( branches=( { DEFAULT_BRANCH: SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ) } if revision is not None else {} ) ) self.log.debug("snapshot: %s", snap) self.storage.snapshot_add([snap]) return snap
[docs] def store_data(self) -> None: "Add our current CVS changeset to the archive." self.storage.skipped_content_add(self._skipped_contents) self.storage.content_add(self._contents) self.storage.directory_add(self._directories) self.storage.revision_add(self._revisions) self.flush() self._skipped_contents = [] self._contents = [] self._directories = [] self._revisions = []
[docs] def load_status(self) -> Dict[str, Any]: if self.snapshot is None: load_status = "failed" elif self.last_snapshot == self.snapshot or not self.snapshot.branches: load_status = "uneventful" else: load_status = "eventful" return { "status": load_status, }
[docs] def visit_status(self) -> str: return self._visit_status