# Copyright (C) 2016-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
from datetime import datetime
import errno
from functools import lru_cache
import logging
import os
import re
import shutil
from subprocess import PIPE, Popen, call, run
import tempfile
from typing import Optional, Tuple
from urllib.parse import quote, unquote, urlparse, urlunparse
import iso8601
from subvertpy import SubversionException
logger = logging.getLogger(__name__)
[docs]
class OutputStream:
"""Helper class to read lines from a program output while
it is running
Args:
fileno (int): File descriptor of a program output stream
opened in text mode
"""
def __init__(self, fileno):
self._fileno = fileno
self._buffer = ""
[docs]
def read_lines(self):
"""
Read available lines from the output stream and return them.
Returns:
Tuple[List[str], bool]: A tuple whose first member is the read
lines and second member a boolean indicating if there are
still some other lines available to read.
"""
try:
output = os.read(self._fileno, 1000).decode()
except OSError as e:
if e.errno != errno.EIO:
raise
output = ""
output = output.replace("\r\n", "\n")
lines = output.split("\n")
lines[0] = self._buffer + lines[0]
if output:
self._buffer = lines[-1]
return (lines[:-1], True)
else:
self._buffer = ""
if len(lines) == 1 and not lines[0]:
lines = []
return (lines, False)
[docs]
def init_svn_repo_from_dump(
dump_path: str,
prefix: Optional[str] = None,
suffix: Optional[str] = None,
root_dir: str = "/tmp",
gzip: bool = False,
cleanup_dump: bool = True,
max_rev: int = -1,
) -> Tuple[str, str]:
"""Given a path to a svn dump, initialize an svn repository with the content of said
dump.
Args:
dump_path: The dump to the path
prefix: optional prefix file name for the working directory
suffix: optional suffix file name for the working directory
root_dir: the root directory where the working directory is created
gzip: Boolean to determine whether we treat the dump as compressed or not.
cleanup_dump: Whether we want this function call to clean up the dump at the end
of the repository initialization.
Raises:
ValueError in case of failure to run the command to uncompress and load the
dump.
Returns:
A tuple:
- temporary folder: containing the mounted repository
- repo_path: path to the mounted repository inside the temporary folder
"""
project_name = os.path.basename(os.path.dirname(dump_path))
temp_dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=root_dir)
try:
repo_path = os.path.join(temp_dir, project_name)
# create the repository that will be loaded with the dump
cmd = ["svnadmin", "create", repo_path]
r = call(cmd)
if r != 0:
raise ValueError(
"Failed to initialize empty svn repo for %s" % project_name
)
read_dump_cmd = ["cat", dump_path]
if gzip:
read_dump_cmd = ["gzip", "-dc", dump_path]
with Popen(read_dump_cmd, stdout=PIPE) as dump:
# load dump and bypass properties validation as Unicode decoding errors
# are already handled in loader implementation (see _ra_codecs_error_handler
# in ra.py)
cmd = [
"svnadmin",
"load",
"-q",
"--bypass-prop-validation",
"--no-flush-to-disk", # loading is significantly faster with this option
]
if max_rev > 0:
cmd.append(f"-r1:{max_rev}")
cmd.append(repo_path)
svnadmin_load = run(cmd, stdin=dump.stdout, capture_output=True, text=True)
if svnadmin_load.returncode != 0:
if max_rev > 0:
# if max_rev is specified, we might have a truncated dump due to
# an error when executing svnrdump, check if max_rev have been
# loaded and continue loading process if it is the case
svnadmin_info = run(
["svnadmin", "info", repo_path], capture_output=True, text=True
)
if f"Revisions: {max_rev}\n" in svnadmin_info.stdout:
return temp_dir, repo_path
raise ValueError(
f"Failed to mount the svn dump for project {project_name}\n"
+ svnadmin_load.stderr
)
return temp_dir, repo_path
except Exception as e:
shutil.rmtree(temp_dir)
raise e
finally:
if cleanup_dump:
try:
# At this time, the temporary svn repository is mounted from the dump or
# the svn repository failed to mount. Either way, we can drop the dump.
os.remove(dump_path)
assert not os.path.exists(dump_path)
except OSError as e:
logger.warn("Failure to remove the dump %s: %s", dump_path, e)
[docs]
def init_svn_repo_from_archive_dump(
archive_path: str,
prefix: Optional[str] = None,
suffix: Optional[str] = None,
root_dir: str = "/tmp",
cleanup_dump: bool = True,
) -> Tuple[str, str]:
"""Given a path to an archive containing an svn dump, initializes an svn repository
with the content of the uncompressed dump.
Args:
archive_path: The archive svn dump path
prefix: optional prefix file name for the working directory
suffix: optional suffix file name for the working directory
root_dir: the root directory where the working directory is created
gzip: Boolean to determine whether we treat the dump as compressed or not.
cleanup_dump: Whether we want this function call to clean up the dump at the end
of the repository initialization.
Raises:
ValueError in case of failure to run the command to uncompress
and load the dump.
Returns:
A tuple:
- temporary folder: containing the mounted repository
- repo_path: path to the mounted repository inside the
temporary folder
"""
return init_svn_repo_from_dump(
archive_path,
prefix=prefix,
suffix=suffix,
root_dir=root_dir,
gzip=True,
cleanup_dump=cleanup_dump,
)
[docs]
def svn_urljoin(base_url: str, *args) -> str:
"""Join a base URL and a list of paths in a SVN way.
For instance:
- svn_urljoin("http://example.org", "foo", "bar")
will return "https://example.org/foo/bar
- svn_urljoin("http://example.org/foo", "../bar")
will return "https://example.org/bar
Args:
base_url: Base URL to join paths with
args: path components
Returns:
The joined URL
"""
parsed_url = urlparse(base_url)
path = os.path.abspath(
os.path.join(parsed_url.path or "/", *[arg.strip("/") for arg in args])
)
return f"{parsed_url.scheme}://{parsed_url.netloc}{path}"
[docs]
@dataclass(frozen=True)
class ExternalDefinition:
path: str
url: str
revision: Optional[int]
peg_revision: Optional[int]
relative_url: bool
legacy_format: bool
[docs]
def parse_external_definition(
external: str, dir_path: str, repo_url: str
) -> ExternalDefinition:
"""Parse a subversion external definition.
Args:
external: an external definition, extracted from the lines split of a
svn:externals property value
dir_path: The path of the directory in the subversion repository where
the svn:externals property was set
repo_url: URL of the subversion repository
Returns:
A tuple with the following members:
- path relative to dir_path where the external should be exported
- URL of the external to export
- optional revision of the external to export
- optional peg revision of the external to export
- boolean indicating if the external URL is relative to the repository
URL and targets a path not in the repository
"""
path = ""
external_url = ""
revision = None
peg_revision = None
relative_url = False
prev_part = None
# turn multiple spaces into a single one and split on space
for external_part in external.split():
if prev_part == "-r":
# parse revision in the form "-r XXX"
revision = int(external_part)
elif external_part.startswith("-r") and external_part != "-r":
# parse revision in the form "-rXXX"
revision = int(external_part[2:])
elif external_part.startswith("^/"):
# URL relative to the root of the repository in which the svn:externals
# property is versioned
external_url = svn_urljoin(repo_url, external_part[2:])
relative_url = not external_url.startswith(repo_url)
elif external_part.startswith("//"):
# URL relative to the scheme of the URL of the directory on which the
# svn:externals property is set
scheme = urlparse(repo_url).scheme
external_url = f"{scheme}:{external_part}"
relative_url = not external_url.startswith(repo_url)
elif external_part.startswith("/"):
# URL relative to the root URL of the server on which the svn:externals
# property is versioned
parsed_url = urlparse(repo_url)
root_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
external_url = svn_urljoin(root_url, external_part)
relative_url = not external_url.startswith(repo_url)
elif external_part.startswith("../"):
# URL relative to the URL of the directory on which the svn:externals
# property is set
external_url = svn_urljoin(repo_url, dir_path, external_part)
relative_url = not external_url.startswith(repo_url)
elif re.match(r"^.*:*//.*", external_part):
# absolute external URL
external_url = external_part
# subversion >= 1.6 added a quoting and escape mechanism to the syntax so
# that the path of the external working copy may contain whitespace.
elif external_part.startswith('\\"'):
external_split = external.split('\\"')
path = [
e.replace("\\ ", " ")
for e in external_split
if e.startswith(external_part[2:])
][0]
path = f'"{path}"'
elif external_part.endswith('\\"'):
continue
elif external_part.startswith(('"', "'")):
external_split = external.split(external_part[0])
path_prefix = external_part.strip(external_part[0])
path = next(iter([e for e in external_split if e.startswith(path_prefix)]))
elif external_part.endswith(('"', "'")):
continue
elif not external_part.startswith("\\") and external_part != "-r":
# path of the external relative to dir_path
path = external_part.replace("\\\\", "\\")
if path == external_part:
path = external_part.replace("\\", "")
if path.startswith("./"):
path = path.replace("./", "", 1)
prev_part = external_part
parsed_external_url = urlparse(external_url)
if "@" in external_url:
# try to extract revision number if external URL is in the form
# http://svn.example.org/repos/test/path@XXX
url, revision_s = external_url.rsplit("@", maxsplit=1)
try:
# ensure revision_s can be parsed to int
peg_revision = int(revision_s)
external_url = url
except ValueError:
if parsed_external_url.username is None:
# handle URL like http://user@svn.example.org/
external_url = url
if revision_s.startswith("{") and revision_s.endswith("}"):
# revision as a date special case, subvertpy does not support such revision
# format in its API so we need to get the HEAD revision number at that date
try:
date = iso8601.parse_date(revision_s[1:-1])
repo_root_url = get_repo_root_url(external_url)
peg_revision = get_head_revision_at_date(repo_root_url, date)
except Exception as e:
# typically when repository no longer exists or temporary network failures,
# for the latter case if the loader did not export the external at the right
# revision it will detect it at next origin visit and perform a full reload.
logger.debug(e)
pass
if not external_url or not path:
raise ValueError(f"Failed to parse external definition '{external}'")
return ExternalDefinition(
path=path.rstrip("/"),
url=unquote(external_url),
revision=revision,
peg_revision=peg_revision,
relative_url=relative_url,
legacy_format=(
external.strip().startswith(path)
and external.strip().endswith(external_url)
and parsed_external_url.scheme is not None
),
)
[docs]
def is_recursive_external(
origin_url: str, dir_path: str, external_path: str, external_url: str
) -> bool:
"""
Check if an external definition can lead to a recursive subversion export
operation (https://issues.apache.org/jira/browse/SVN-1703).
Args:
origin_url: repository URL
dir_path: path of the directory where external is defined
external_path: path of the external relative to the directory
external_url: external URL
Returns:
Whether the external definition is recursive
"""
assert external_url
parsed_origin_url = urlparse(origin_url)
parsed_external_url = urlparse(external_url)
external_url = urlunparse(
parsed_external_url._replace(scheme=parsed_origin_url.scheme)
)
return svn_urljoin(origin_url, quote(dir_path), quote(external_path)).startswith(
external_url
)
[docs]
@lru_cache()
def get_head_revision_at_date(svn_url: str, date: datetime) -> int:
"""Get HEAD revision for repository at given date.
This function wraps calls to
:meth:`swh.loader.svn.svn_repo.SvnRepo.get_head_revision_at_date`
and put result in cache.
Args:
svn_url: URL of subversion repository
date: the reference date
Returns:
the revision number of the HEAD revision at that date
Raises:
SubversionException: repository URL is not valid
ValueError: first revision date is greater than given date
"""
from swh.loader.svn.svn_repo import get_svn_repo
return get_svn_repo(svn_url).get_head_revision_at_date(date)
@lru_cache()
def _get_repo_root_url(svn_url: str) -> str:
from swh.loader.svn.svn_repo import get_svn_repo
return get_svn_repo(svn_url).repos_root_url
[docs]
def get_repo_root_url(svn_url):
"""Get root URL for a repository.
Suversion URL might target a sub-project in a repository.
That function computes the root URL of the repository and
put result in cache.
Args:
svn_url: URL of subversion repository
Returns:
the root URL of the repository
Raises:
SubversionException: subversion URL is not valid
"""
url_splitted = svn_url.split("/")
urls = [svn_url] + [
"/".join(url_splitted[:-i]) for i in range(1, len(url_splitted))
]
for url in urls:
try:
return _get_repo_root_url(url)
except SubversionException:
# URL no longer valid, retry by removing last sub-path in it as targeted
# path might no longer exists in HEAD revision
pass
return svn_url
[docs]
def quote_svn_url(url: str) -> str:
"""Quote Subversion URL with special characters in it for subversion operations
involving URLs (export for instance) to succeed.
"""
return quote(url, safe="/:!$&'()*+,=@")