Source code for swh.loader.cvs.cvsclient

# Copyright (C) 2015-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Minimal CVS client implementation

"""

import os.path
import socket
import subprocess
import tempfile
from typing import IO, Tuple

from tenacity import retry
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt

from swh.loader.exception import NotFound

CVS_PSERVER_PORT = 2401
CVS_PROTOCOL_BUFFER_SIZE = 8192
EXAMPLE_PSERVER_URL = "pserver://user:password@cvs.example.com/cvsroot/repository"
EXAMPLE_SSH_URL = "ssh://user@cvs.example.com/cvsroot/repository"

VALID_RESPONSES = [
    "ok",
    "error",
    "Valid-requests",
    "Checked-in",
    "New-entry",
    "Checksum",
    "Copy-file",
    "Updated",
    "Created",
    "Update-existing",
    "Merged",
    "Patched",
    "Rcs-diff",
    "Mode",
    "Removed",
    "Remove-entry",
    "Template",
    "Notified",
    "Module-expansion",
    "Wrapper-rcsOption",
    "M",
    "Mbinary",
    "E",
    "F",
    "MT",
]

# Trivially encode strings to protect them from innocent eyes (i.e.,
# inadvertent password compromises, like a network administrator
# who's watching packets for legitimate reasons and accidentally sees
# the password protocol go by).
#
# This is NOT secure encryption.


[docs] def scramble_password(password): s = ["A"] # scramble scheme version number # fmt: off scramble_shifts = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, # noqa: E241 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, # noqa: E241,E131,B950 114,120, 53, 79, 96,109, 72,108, 70, 64, 76, 67,116, 74, 68, 87, # noqa: E241,E131,B950 111, 52, 75,119, 49, 34, 82, 81, 95, 65,112, 86,118,110,122,105, # noqa: E241,E131,B950 41, 57, 83, 43, 46,102, 40, 89, 38,103, 45, 50, 42,123, 91, 35, # noqa: E241,E131,B950 125, 55, 54, 66,124,126, 59, 47, 92, 71,115, 78, 88,107,106, 56, # noqa: E241,E131,B950 36,121,117,104,101,100, 69, 73, 99, 63, 94, 93, 39, 37, 61, 48, # noqa: E241,E131,B950 58,113, 32, 90, 44, 98, 60, 51, 33, 97, 62, 77, 84, 80, 85,223, # noqa: E241,E131,B950 225,216,187,166,229,189,222,188,141,249,148,200,184,136,248,190, # noqa: E241,E131,B950 199,170,181,204,138,232,218,183,255,234,220,247,213,203,226,193, # noqa: E241,E131,B950 174,172,228,252,217,201,131,230,197,211,145,238,161,179,160,212, # noqa: E241,E131,B950 207,221,254,173,202,146,224,151,140,196,205,130,135,133,143,246, # noqa: E241,E131,B950 192,159,244,239,185,168,215,144,139,165,180,157,147,186,214,176, # noqa: E241,E131,B950 227,231,219,169,175,156,206,198,129,164,150,210,154,177,134,127, # noqa: E241,E131,B950 182,128,158,208,162,132,167,209,149,241,153,251,237,236,171,195, # noqa: E241,E131,B950 243,233,253,240,194,250,191,155,142,137,245,235,163,242,178,152] # noqa: E241,E131,B950 # fmt: on for c in password: s.append("%c" % scramble_shifts[ord(c)]) return "".join(s)
[docs] def decode_path(path: bytes) -> Tuple[str, str]: """Attempt to decode a file path based on encodings known to be used in CVS repositories that can be found in the wild. Args: path: raw bytes path Returns: A tuple (decoded path, encoding) """ path_encodings = ["ascii", "iso-8859-1", "utf-8"] for encoding in path_encodings: try: how = "ignore" if encoding == path_encodings[-1] else "strict" path_str = path.decode(encoding, how) break except UnicodeError: pass return path_str, encoding
[docs] class CVSProtocolError(Exception): pass
[docs] class CVSClient: # connection to an existing pserver might sometimes fail, # retrying the operation usually fixes the issue
[docs] @retry( retry=retry_if_exception_type(NotFound), stop=stop_after_attempt(max_attempt_number=3), reraise=True, ) def connect_pserver(self, hostname, port, username, password): if port is None: port = CVS_PSERVER_PORT if username is None: raise NotFound( "Username is required for " "a pserver connection: %s" % EXAMPLE_PSERVER_URL ) try: self.socket = socket.create_connection((hostname, port)) except ConnectionRefusedError: raise NotFound("Could not connect to %s:%s", hostname, port) # use empty password if it is None scrambled_password = scramble_password(password or "") request = "BEGIN AUTH REQUEST\n%s\n%s\n%s\nEND AUTH REQUEST\n" % ( self.cvsroot_path, username, scrambled_password, ) print("Request: %s\n" % request) self.socket.sendall(request.encode("UTF-8")) response = self.conn_read_line() if response != b"I LOVE YOU\n": raise NotFound( "pserver authentication failed for %s:%s: %s" % (hostname, port, response) )
[docs] def connect_ssh(self, hostname, port, username): command = ["ssh"] if username is not None: # Assume 'auth' contains only a user name. # We do not support password authentication with SSH since the # anoncvs user is usually granted access without a password. command += ["-l", "%s" % username] if port is not None: command += ["-p", "%d" % port] # accept new SSH hosts keys upon first use; changed host keys # will require intervention command += ["-o", "StrictHostKeyChecking=accept-new"] # disable interactive prompting command += ["-o", "BatchMode=yes"] # disable further option processing by adding '--' command += ["--"] command += ["%s" % hostname, "cvs", "server"] # use non-buffered I/O to match behaviour of self.socket self.ssh = subprocess.Popen( command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
[docs] def connect_fake(self): command = ["cvs", "server"] # use non-buffered I/O to match behaviour of self.socket self.ssh = subprocess.Popen( command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
[docs] def conn_read_line(self, require_newline=True): if len(self.linebuffer) != 0: return self.linebuffer.pop(0) buf = b"" idx = -1 while idx == -1: if len(buf) >= CVS_PROTOCOL_BUFFER_SIZE: if require_newline: raise CVSProtocolError( "Overlong response from " "CVS server: %s" % buf ) else: break if self.socket: buf += self.socket.recv(CVS_PROTOCOL_BUFFER_SIZE) elif self.ssh: buf += self.ssh.stdout.read(CVS_PROTOCOL_BUFFER_SIZE) else: raise Exception("No valid connection") if not buf: return None idx = buf.rfind(b"\n") if idx != -1: self.linebuffer = buf[: idx + 1].splitlines(keepends=True) else: if require_newline: raise CVSProtocolError("Invalid response from CVS server: %s" % buf) else: self.linebuffer.append(buf) if len(self.incomplete_line) > 0: self.linebuffer[0] = self.incomplete_line + self.linebuffer[0] if idx != -1: self.incomplete_line = buf[idx + 1 :] else: self.incomplete_line = b"" return self.linebuffer.pop(0)
[docs] def conn_write(self, data): if self.socket: return self.socket.sendall(data) if self.ssh: self.ssh.stdin.write(data) return self.ssh.stdin.flush() raise Exception("No valid connection")
[docs] def conn_write_str(self, s, encoding="utf-8"): return self.conn_write(s.encode(encoding))
[docs] def conn_close(self): if self.socket: self.socket.close() if self.ssh: self.ssh.kill() try: self.ssh.wait(timeout=10) except subprocess.TimeoutExpired as e: raise subprocess.TimeoutExpired( "Could not terminate " "ssh program: %s" % e )
def __init__(self, url): """ Connect to a CVS server at the specified URL and perform the initial CVS protocol handshake. """ self.hostname = url.hostname self.cvsroot_path = os.path.dirname(url.path) self.cvs_module_name = os.path.basename(url.path) self.socket = None self.ssh = None self.linebuffer = list() self.incomplete_line = b"" self.tempfile_cutoff = 100 * 1024 * 1024 if url.scheme == "pserver": self.connect_pserver(url.hostname, url.port, url.username, url.password) elif url.scheme == "ssh": self.connect_ssh(url.hostname, url.port, url.username) elif url.scheme == "fake": self.connect_fake() else: raise NotFound("Invalid CVS origin URL '%s'" % url) # we should have a connection now assert self.socket or self.ssh self.conn_write_str( "Root %s\nValid-responses %s\nvalid-requests\n" "UseUnchanged\n" % (self.cvsroot_path, " ".join(VALID_RESPONSES)) ) response = self.conn_read_line() if not response: raise CVSProtocolError("No response from CVS server") try: if response[0:15] != b"Valid-requests ": raise CVSProtocolError( "Invalid response from " "CVS server: %s" % response ) except IndexError: raise CVSProtocolError("Invalid response from CVS server: %s" % response) response = self.conn_read_line() if response != b"ok\n": raise CVSProtocolError("Invalid response from CVS server: %s" % response) def __del__(self): self.conn_close() def _parse_rlog_response(self, fp: IO[bytes]): rlog_output = tempfile.SpooledTemporaryFile(max_size=self.tempfile_cutoff) expect_error = False for line in fp: if expect_error: raise CVSProtocolError("CVS server error: %r" % line) if line == b"ok\n": break elif line[0:2] == b"M ": rlog_output.write(line[2:]) elif line[0:8] == b"MT text ": rlog_output.write(line[8:-1]) elif line[0:8] == b"MT date ": rlog_output.write(line[8:-1]) elif line[0:10] == b"MT newline": rlog_output.write(line[10:]) elif line[0:7] == b"error ": expect_error = True continue else: raise CVSProtocolError("Bad CVS protocol response: %r" % line) rlog_output.seek(0) return rlog_output
[docs] def fetch_rlog(self, path: bytes = b"", state=""): if path: path_arg, encoding = decode_path(path) else: path_arg, encoding = self.cvs_module_name, "utf-8" if len(state) > 0: state_arg = "Argument -s%s\n" % state else: state_arg = "" fp = tempfile.SpooledTemporaryFile(max_size=self.tempfile_cutoff) self.conn_write_str( "Global_option -q\n" f"{state_arg}" "Argument --\n" f"Argument {path_arg}\n" "rlog\n", encoding=encoding, ) while True: response = self.conn_read_line() if response is None: raise CVSProtocolError("No response from CVS server") if response[0:2] == b"E ": if len(path) > 0 and ( response.endswith(b" - ignored\n") or b"could not read RCS file" in response ): response = self.conn_read_line() if response not in (b"error \n", b"ok\n"): raise CVSProtocolError( "Invalid response from CVS server: %s" % response ) return None # requested path does not exist (ignore) raise CVSProtocolError("Error response from CVS server: %s" % response) fp.write(response) if response == b"ok\n": break fp.seek(0) return self._parse_rlog_response(fp)
[docs] def checkout(self, path: bytes, rev: str, dest_path: bytes, expand_keywords: bool): """ Download a file revision from the cvs server and store the file's contents in a temporary file. If expand_keywords is set then ask the server to expand RCS keywords in file content. From the server's point of view this function behaves much like 'cvs update -r rev path'. The server is unaware that we do not actually maintain a CVS working copy. Because of this it sends more information than we need. We simply skip responses that are of no interest to us. """ skip_line = False expect_modeline = False expect_bytecount = False have_bytecount = False bytecount = 0 path_str, encoding = decode_path(path) dirname = os.path.dirname(path_str) if dirname: self.conn_write_str( "Directory %s\n%s\n" % (dirname, os.path.join(self.cvsroot_path, dirname)) ) if expand_keywords: # use server-side per-file default expansion rules karg = "" else: # force binary file mode karg = "Argument -kb\n" # TODO: cvs <= 1.10 servers expect to be given every Directory along the path. self.conn_write_str( "Global_option -q\n" "Argument -N\n" "Argument -P\n" f"Argument -r{rev}\n" f"{karg}" "Argument --\n" f"Argument {path_str}\n" "Directory .\n" f"{os.path.join(self.cvsroot_path, self.cvs_module_name)}\n" "co\n", encoding=encoding, ) with open(dest_path, "wb") as co_output: while True: if have_bytecount: if bytecount < 0: raise CVSProtocolError("server sent too much file content data") response = self.conn_read_line(require_newline=False) if response is None: raise CVSProtocolError("Incomplete response from CVS server") if len(response) > bytecount: # When a file lacks a final newline we receive a line which # contains file content as well as CVS protocol response data. # Split last line of file content from CVS protocol data... co_output.write(response[:bytecount]) response = response[bytecount:] bytecount = 0 # ...and process the CVS protocol response below. else: co_output.write(response) bytecount -= len(response) continue else: response = self.conn_read_line() if response[0:2] == b"E ": if ( b"Skipping `$Log$' keyword due to excessive comment leader" in response ): # non fatal error, continue checkout operation without `$Log$' # keyword expansion continue raise CVSProtocolError("Error from CVS server: %s" % response) if response == b"ok\n": if have_bytecount: break else: raise CVSProtocolError("server sent 'ok' but no file contents") if skip_line: skip_line = False continue elif expect_bytecount: try: bytecount = int(response[0:-1]) # strip trailing \n except ValueError: raise CVSProtocolError( "Bad CVS protocol response: %s" % response ) have_bytecount = True continue elif response in (b"M \n", b"MT +updated\n", b"MT -updated\n"): continue elif response[0:9] == b"MT fname ": continue elif response.split(b" ")[0] in ( b"Created", b"Checked-in", b"Update-existing", b"Updated", b"Removed", ): skip_line = True continue elif response[0:1] == b"/": expect_modeline = True continue elif expect_modeline and response[0:2] == b"u=": expect_modeline = False expect_bytecount = True continue elif response[0:2] == b"M ": continue elif response[0:8] == b"MT text ": continue elif response[0:10] == b"MT newline": continue else: raise CVSProtocolError("Bad CVS protocol response: %s" % response)