Source code for swh.loader.svn.utils

# Copyright (C) 2016-2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from dataclasses import dataclass
from datetime import datetime
import errno
from functools import lru_cache
import logging
import os
import re
import shutil
from subprocess import PIPE, Popen, call, run
import tempfile
from typing import Optional, Tuple
from urllib.parse import quote, unquote, urlparse, urlunparse

import iso8601
from subvertpy import SubversionException

logger = logging.getLogger(__name__)

[docs] class OutputStream: """Helper class to read lines from a program output while it is running Args: fileno (int): File descriptor of a program output stream opened in text mode """ def __init__(self, fileno): self._fileno = fileno self._buffer = ""
[docs] def read_lines(self): """ Read available lines from the output stream and return them. Returns: Tuple[List[str], bool]: A tuple whose first member is the read lines and second member a boolean indicating if there are still some other lines available to read. """ try: output =, 1000).decode() except OSError as e: if e.errno != errno.EIO: raise output = "" output = output.replace("\r\n", "\n") lines = output.split("\n") lines[0] = self._buffer + lines[0] if output: self._buffer = lines[-1] return (lines[:-1], True) else: self._buffer = "" if len(lines) == 1 and not lines[0]: lines = [] return (lines, False)
[docs] def init_svn_repo_from_dump( dump_path: str, prefix: Optional[str] = None, suffix: Optional[str] = None, root_dir: str = "/tmp", gzip: bool = False, cleanup_dump: bool = True, max_rev: int = -1, ) -> Tuple[str, str]: """Given a path to a svn dump, initialize an svn repository with the content of said dump. Args: dump_path: The dump to the path prefix: optional prefix file name for the working directory suffix: optional suffix file name for the working directory root_dir: the root directory where the working directory is created gzip: Boolean to determine whether we treat the dump as compressed or not. cleanup_dump: Whether we want this function call to clean up the dump at the end of the repository initialization. Raises: ValueError in case of failure to run the command to uncompress and load the dump. Returns: A tuple: - temporary folder: containing the mounted repository - repo_path: path to the mounted repository inside the temporary folder """ project_name = os.path.basename(os.path.dirname(dump_path)) temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_dir) try: repo_path = os.path.join(temp_dir, project_name) # create the repository that will be loaded with the dump cmd = ["svnadmin", "create", repo_path] r = call(cmd) if r != 0: raise ValueError( "Failed to initialize empty svn repo for %s" % project_name ) read_dump_cmd = ["cat", dump_path] if gzip: read_dump_cmd = ["gzip", "-dc", dump_path] with Popen(read_dump_cmd, stdout=PIPE) as dump: # load dump and bypass properties validation as Unicode decoding errors # are already handled in loader implementation (see _ra_codecs_error_handler # in cmd = [ "svnadmin", "load", "-q", "--bypass-prop-validation", "--no-flush-to-disk", # loading is significantly faster with this option ] if max_rev > 0: cmd.append(f"-r1:{max_rev}") cmd.append(repo_path) svnadmin_load = run(cmd, stdin=dump.stdout, capture_output=True, text=True) if svnadmin_load.returncode != 0: if max_rev > 0: # if max_rev is specified, we might have a truncated dump due to # an error when executing svnrdump, check if max_rev have been # loaded and continue loading process if it is the case svnadmin_info = run( ["svnadmin", "info", repo_path], capture_output=True, text=True ) if f"Revisions: {max_rev}\n" in svnadmin_info.stdout: return temp_dir, repo_path raise ValueError( f"Failed to mount the svn dump for project {project_name}\n" + svnadmin_load.stderr ) return temp_dir, repo_path except Exception as e: shutil.rmtree(temp_dir) raise e finally: if cleanup_dump: try: # At this time, the temporary svn repository is mounted from the dump or # the svn repository failed to mount. Either way, we can drop the dump. os.remove(dump_path) assert not os.path.exists(dump_path) except OSError as e: logger.warn("Failure to remove the dump %s: %s", dump_path, e)
[docs] def init_svn_repo_from_archive_dump( archive_path: str, prefix: Optional[str] = None, suffix: Optional[str] = None, root_dir: str = "/tmp", cleanup_dump: bool = True, ) -> Tuple[str, str]: """Given a path to an archive containing an svn dump, initializes an svn repository with the content of the uncompressed dump. Args: archive_path: The archive svn dump path prefix: optional prefix file name for the working directory suffix: optional suffix file name for the working directory root_dir: the root directory where the working directory is created gzip: Boolean to determine whether we treat the dump as compressed or not. cleanup_dump: Whether we want this function call to clean up the dump at the end of the repository initialization. Raises: ValueError in case of failure to run the command to uncompress and load the dump. Returns: A tuple: - temporary folder: containing the mounted repository - repo_path: path to the mounted repository inside the temporary folder """ return init_svn_repo_from_dump( archive_path, prefix=prefix, suffix=suffix, root_dir=root_dir, gzip=True, cleanup_dump=cleanup_dump, )
[docs] def svn_urljoin(base_url: str, *args) -> str: """Join a base URL and a list of paths in a SVN way. For instance: - svn_urljoin("", "foo", "bar") will return " - svn_urljoin("", "../bar") will return " Args: base_url: Base URL to join paths with args: path components Returns: The joined URL """ parsed_url = urlparse(base_url) path = os.path.abspath( os.path.join(parsed_url.path or "/", *[arg.strip("/") for arg in args]) ) return f"{parsed_url.scheme}://{parsed_url.netloc}{path}"
[docs] @dataclass(frozen=True) class ExternalDefinition: path: str url: str revision: Optional[int] peg_revision: Optional[int] relative_url: bool legacy_format: bool
[docs] def parse_external_definition( external: str, dir_path: str, repo_url: str ) -> ExternalDefinition: """Parse a subversion external definition. Args: external: an external definition, extracted from the lines split of a svn:externals property value dir_path: The path of the directory in the subversion repository where the svn:externals property was set repo_url: URL of the subversion repository Returns: A tuple with the following members: - path relative to dir_path where the external should be exported - URL of the external to export - optional revision of the external to export - optional peg revision of the external to export - boolean indicating if the external URL is relative to the repository URL and targets a path not in the repository """ path = "" external_url = "" revision = None peg_revision = None relative_url = False prev_part = None # turn multiple spaces into a single one and split on space for external_part in external.split(): if prev_part == "-r": # parse revision in the form "-r XXX" revision = int(external_part) elif external_part.startswith("-r") and external_part != "-r": # parse revision in the form "-rXXX" revision = int(external_part[2:]) elif external_part.startswith("^/"): # URL relative to the root of the repository in which the svn:externals # property is versioned external_url = svn_urljoin(repo_url, external_part[2:]) relative_url = not external_url.startswith(repo_url) elif external_part.startswith("//"): # URL relative to the scheme of the URL of the directory on which the # svn:externals property is set scheme = urlparse(repo_url).scheme external_url = f"{scheme}:{external_part}" relative_url = not external_url.startswith(repo_url) elif external_part.startswith("/"): # URL relative to the root URL of the server on which the svn:externals # property is versioned parsed_url = urlparse(repo_url) root_url = f"{parsed_url.scheme}://{parsed_url.netloc}" external_url = svn_urljoin(root_url, external_part) relative_url = not external_url.startswith(repo_url) elif external_part.startswith("../"): # URL relative to the URL of the directory on which the svn:externals # property is set external_url = svn_urljoin(repo_url, dir_path, external_part) relative_url = not external_url.startswith(repo_url) elif re.match(r"^.*:*//.*", external_part): # absolute external URL external_url = external_part # subversion >= 1.6 added a quoting and escape mechanism to the syntax so # that the path of the external working copy may contain whitespace. elif external_part.startswith('\\"'): external_split = external.split('\\"') path = [ e.replace("\\ ", " ") for e in external_split if e.startswith(external_part[2:]) ][0] path = f'"{path}"' elif external_part.endswith('\\"'): continue elif external_part.startswith(('"', "'")): external_split = external.split(external_part[0]) path_prefix = external_part.strip(external_part[0]) path = next(iter([e for e in external_split if e.startswith(path_prefix)])) elif external_part.endswith(('"', "'")): continue elif not external_part.startswith("\\") and external_part != "-r": # path of the external relative to dir_path path = external_part.replace("\\\\", "\\") if path == external_part: path = external_part.replace("\\", "") if path.startswith("./"): path = path.replace("./", "", 1) prev_part = external_part parsed_external_url = urlparse(external_url) if "@" in external_url: # try to extract revision number if external URL is in the form # url, revision_s = external_url.rsplit("@", maxsplit=1) try: # ensure revision_s can be parsed to int peg_revision = int(revision_s) external_url = url except ValueError: if parsed_external_url.username is None: # handle URL like external_url = url if revision_s.startswith("{") and revision_s.endswith("}"): # revision as a date special case, subvertpy does not support such revision # format in its API so we need to get the HEAD revision number at that date try: date = iso8601.parse_date(revision_s[1:-1]) repo_root_url = get_repo_root_url(external_url) peg_revision = get_head_revision_at_date(repo_root_url, date) except Exception as e: # typically when repository no longer exists or temporary network failures, # for the latter case if the loader did not export the external at the right # revision it will detect it at next origin visit and perform a full reload. logger.debug(e) pass if not external_url or not path: raise ValueError(f"Failed to parse external definition '{external}'") return ExternalDefinition( path=path.rstrip("/"), url=unquote(external_url), revision=revision, peg_revision=peg_revision, relative_url=relative_url, legacy_format=( external.strip().startswith(path) and external.strip().endswith(external_url) and parsed_external_url.scheme is not None ), )
[docs] def is_recursive_external( origin_url: str, dir_path: str, external_path: str, external_url: str ) -> bool: """ Check if an external definition can lead to a recursive subversion export operation ( Args: origin_url: repository URL dir_path: path of the directory where external is defined external_path: path of the external relative to the directory external_url: external URL Returns: Whether the external definition is recursive """ assert external_url parsed_origin_url = urlparse(origin_url) parsed_external_url = urlparse(external_url) external_url = urlunparse( parsed_external_url._replace(scheme=parsed_origin_url.scheme) ) return svn_urljoin(origin_url, quote(dir_path), quote(external_path)).startswith( external_url )
[docs] @lru_cache() def get_head_revision_at_date(svn_url: str, date: datetime) -> int: """Get HEAD revision for repository at given date. This function wraps calls to :meth:`swh.loader.svn.svn_repo.SvnRepo.get_head_revision_at_date` and put result in cache. Args: svn_url: URL of subversion repository date: the reference date Returns: the revision number of the HEAD revision at that date Raises: SubversionException: repository URL is not valid ValueError: first revision date is greater than given date """ from swh.loader.svn.svn_repo import get_svn_repo return get_svn_repo(svn_url).get_head_revision_at_date(date)
@lru_cache() def _get_repo_root_url(svn_url: str) -> str: from swh.loader.svn.svn_repo import get_svn_repo return get_svn_repo(svn_url).repos_root_url
[docs] def get_repo_root_url(svn_url): """Get root URL for a repository. Suversion URL might target a sub-project in a repository. That function computes the root URL of the repository and put result in cache. Args: svn_url: URL of subversion repository Returns: the root URL of the repository Raises: SubversionException: subversion URL is not valid """ url_splitted = svn_url.split("/") urls = [svn_url] + [ "/".join(url_splitted[:-i]) for i in range(1, len(url_splitted)) ] for url in urls: try: return _get_repo_root_url(url) except SubversionException: # URL no longer valid, retry by removing last sub-path in it as targeted # path might no longer exists in HEAD revision pass return svn_url
[docs] def quote_svn_url(url: str) -> str: """Quote Subversion URL with special characters in it for subversion operations involving URLs (export for instance) to succeed. """ return quote(url, safe="/:!$&'()*+,=@")