[PATCH 1 of 1] add a (still incomplete) SSHStore based on 'hg serve' mecanism

david.douard at logilab.fr david.douard at logilab.fr
Tue Nov 3 03:15:47 CST 2009


# HG changeset patch
# User David Douard <david.douard at logilab.fr>
# Date 1257238494 -3600
# Node ID 0ea32cb55d4cff41d626078d70c6a0ddb771d09c
# Parent  178d2b6ec1eac0c21d72556b9da8c9299e154fd6
add a (still incomplete) SSHStore based on 'hg serve' mecanism

diff --git a/bfiles.py b/bfiles.py
--- a/bfiles.py
+++ b/bfiles.py
@@ -10,6 +10,7 @@
 
 from mercurial import \
      util, extensions, dirstate, cmdutil, match, url as url_, node, error
+from mercurial import sshserver
 from mercurial.i18n import _
 
 # -- Commands ----------------------------------------------------------
@@ -399,13 +400,104 @@
                     # someone might delete a big file before committing it
                     pass
                 _move_pending(ui, repo, bfdirstate, ctx, filename, realfile)
-                
+
         bfdirstate.write()
         return node
 
     extensions.wrapfunction(repo, 'status', localrepo_status)
     extensions.wrapfunction(repo, 'commitctx', localrepo_commitctx)
 
+    serversetup()
+
+
+def serversetup():
+    """
+    Monkey patching sshserver to add bfiles protocol support
+    """
+
+    # -- wrap sshserver.do_hello ---------------------------------------
+    # dirty hack to be able to add 'bfstore' capability to sshserve
+    def _sshserver_do_hello_wrapper(origfn, self):
+        respond = self.respond
+        caps = []
+        self.respond = caps.append
+        origfn(self)
+        self.respond = respond
+        caps.append('bfstore')
+        allcaps = ' '.join([x.strip() for x in caps])
+        self.respond("%s\n" % allcaps)
+
+    # much cleaner version, if sshserver have the new getcapabilities method 
+    def _sshserver_getcapabilities_wrapper(origfn, self):
+        caps = origfn(self)
+        caps.append('bfstore')
+        return caps
+
+    if hasattr(sshserver.sshserver, "getcapabilities"):
+        extensions.wrapfunction(sshserver.sshserver, "getcapabilities",
+                                _sshserver_getcapabilities_wrapper)
+    else: # no support for getcapabilities, use good ol' dirty hack
+        extensions.wrapfunction(sshserver.sshserver, "do_hello",
+                                _sshserver_do_hello_wrapper)
+
+
+    # -- add sshserver.do_xxx commands ---------------------------------
+    def _sshserver_do_bfput(self):
+        """Shim this function into the sshserver so that it responds to
+        the bfput command.
+        """
+        key, fname = self.getarg()
+        if os.path.exists(fname):
+            self.respond('file %s already exists' % fname)
+            return
+        destdir = os.path.dirname(fname)
+        util.makedirs(destdir)
+        try:
+            fd = open(fname, "wb")
+        except IOError:
+            self.respond('cannot create file')
+        self.respond("")
+
+        try:
+            count = int(self.fin.readline())
+            while count:
+                fd.write(self.fin.read(count))
+                count = int(self.fin.readline())
+            self.respond('')
+        finally:
+            #
+            fd.close()
+
+    def _sshserver_do_bfget(self):
+        """Shim this function into the sshserver so that it responds to
+        the bfget command.
+        """
+        key, fname = self.getarg()
+        if not os.path.isfile(fname):
+            self.respond('file %s does not exists' % fname)
+            return
+        try:
+            fd = open(fname, "rb")
+        except IOError:
+            self.respond('cannot read file')
+            return
+        size = util.fstat(fd).st_size
+
+        self.respond(str(size))
+
+        try:
+            while 1:
+                d = fd.read(4096)
+                if not d:
+                    break
+                self.respond(d)
+            self.respond('')
+        finally:
+            #
+            fd.close()
+
+    sshserver.sshserver.do_bfput = _sshserver_do_bfput
+    sshserver.sshserver.do_bfget = _sshserver_do_bfget
 
 # -- Private worker functions ------------------------------------------
 
@@ -495,7 +587,7 @@
 
     (outfd, tmp_name) = tempfile.mkstemp(dir=pending_dir)
     tmpfile = os.fdopen(outfd, 'w')
-   
+
     ui.debug('copying %s to %s\n' % (filename, tmp_name))
     hasher = hashlib.sha1()
     while True:
@@ -704,6 +796,8 @@
     match = url_.scheme_re.match(url)
     if not match:                       # regular filesystem path
         scheme = 'file'
+    else:
+        scheme = match.group(1)
     try:
         klass = _store_klass[scheme]
     except KeyError:
@@ -795,7 +889,7 @@
 
         write(_('searching %d changesets for big files\n') % len(revs))
         verified = set()                # set of (filename, filenode) tuples
-        
+
         for rev in revs:
             cctx = self.repo[rev]
             cset = "%d:%s" % (cctx.rev(), node.short(cctx.node()))
@@ -856,10 +950,201 @@
         finally:
             infile.close()
 
-
 class SSHStore(BaseStore):
     '''A store accessed via SSH.'''
-    
+
+    def __init__(self, ui, repo, url):
+        super(SSHStore, self).__init__(ui, repo, url)
+
+        m = re.match(r'^ssh://(([^@]+)@)?([^:/]+)(:(\d+))?(/(.*))?$', url)
+        if not m:
+            self.abort(error.RepoError(_("couldn't parse location %s") % url))
+
+        self.user = m.group(2)
+        self.host = m.group(3)
+        self.port = m.group(5)
+        self.path = m.group(7) or "."
+
+        sshcmd = self.ui.config("ui", "ssh", "ssh")
+        remotecmd = self.ui.config("ui", "remotecmd", "hg")
+
+        args = util.sshargs(sshcmd, self.host, self.user, self.port)
+
+        try:
+            self.validate_repo(ui, sshcmd, args, remotecmd)
+        except error.RepoError:
+            # if validate_repo fails, it means that we must create a
+            # repo in the remote store. This repo is only used to be
+            # able to run a sshserver on the remote store.
+            #
+            # XXX hg's sshserve could be fixed to be able to run
+            # without a remote repo.
+            self.cleanup()
+            cmd = '%s %s "%s init %s"'
+            cmd = cmd % (sshcmd, args, remotecmd, self.path)
+
+            ui.debug(_('running %s\n') % cmd)
+            res = util.system(cmd)
+            if res != 0:
+                self.abort(error.RepoError(_("could not create remote repo")))
+            self.validate_repo(ui, sshcmd, args, remotecmd)
+
+    def put(self, source, filename, hash):
+        destdir = os.path.join(self.path, filename)
+        dest = os.path.join(destdir, hash)
+        self.sendfile(source, dest)
+        self.ui.debug('put %s to remote store\n' % source)
+
+    def get(self, files):
+        success = []
+        missing = []
+        for (filename, hash) in files:
+            self.ui.note(_('getting %s\n') % filename)
+            dest = self.repo.wjoin(filename)
+            destdir = os.path.dirname(dest)
+            util.makedirs(destdir)
+            if not os.path.isdir(destdir):
+                self.abort(error.RepoError(_('cannot create dest directory %s') % destdir))
+            storefile = os.path.join(self.path, filename, hash)
+            try:
+                self.recvfile(dest, storefile)
+            except error.RepoError:
+                missing.append((filename, hash))
+        return (success, missing)
+
+    def sendfile(self, filename, destname):
+        self.ui.debug('sshstore.sendfile(%s, %s)\n' % (filename, destname))
+        self.do_cmd("bfput", destname=destname)
+        r = self._recv()
+        if r:
+            # remote may send "unsynced changes"
+            self.abort(error.RepoError(_("put failed: %s") % r))
+        fd = open(filename)
+        while 1:
+            d = fd.read(4096)
+            if not d:
+                break
+            self._send(d)
+        self._send("", flush=True)
+        r = self._recv()
+        if r:
+            # remote may send "unsynced changes"
+            self.abort(error.RepoError(_("put failed: %s") % r))
+
+    def recvfile(self, filename, destname):
+        self.ui.debug('sshstore.recvfile(%s, %s)\n' % (filename, destname))
+        self.do_cmd("bfget", destname=destname)
+        r = self._recv()
+        try:
+            size = int(r)
+        except:
+            self.abort(error.RepoError(_("get failed, unexpected response: %s") % r))
+
+        fd = open(filename, 'wb')
+        while 1:
+            d = self._recv()
+            if not d:
+                break
+            fd.write(d)
+        fd.flush()
+        wsize = util.fstat(fd).st_size
+        fd.close()
+        if wsize != size:
+            self.abort(error.RepoError(_('get failed, file size does not match (%s vs %s)') % (size, wsize)))
+
+    # -- XXX The code below is stolen and adapted from
+    #    mercurial's sshrepo 
+
+    def validate_repo(self, ui, sshcmd, args, remotecmd):
+        # cleanup up previous run
+        self.cleanup()
+
+        cmd = '%s %s "%s -R %s serve --stdio"'
+        cmd = cmd % (sshcmd, args, remotecmd, self.path)
+
+        cmd = util.quotecommand(cmd)
+        ui.debug(_('running %s\n') % cmd)
+        self.pipeo, self.pipei, self.pipee = util.popen3(cmd)
+
+        # skip any noise generated by remote shell
+        self.do_cmd("hello")
+        r = self.do_cmd("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
+        lines = ["", "dummy"]
+        max_noise = 500
+        while lines[-1] and max_noise:
+            l = r.readline()
+            self.readerr()
+            if lines[-1] == "1\n" and l == "\n":
+                break
+            if l:
+                ui.debug(_("remote: "), l)
+            lines.append(l)
+            max_noise -= 1
+        else:
+            self.abort(error.RepoError(_("no suitable response from remote hg")))
+
+        self.capabilities = set()
+        for l in reversed(lines):
+            if l.startswith("capabilities:"):
+                self.capabilities.update(l[:-1].split(":")[1].split())
+                break
+        if "bfstore" not in self.capabilities:
+            self.abort(error.RepoError(_("remote hg does not support bfile")))
+
+    def readerr(self):
+        while 1:
+            size = util.fstat(self.pipee).st_size
+            if size == 0: break
+            l = self.pipee.readline()
+            if not l: break
+            self.ui.status(_("remote: "), l)
+
+    def abort(self, exception):
+        self.cleanup(reporterr=self.ui.debugflag)
+        raise exception
+
+    def cleanup(self, reporterr=True):
+        try:
+            self.pipeo.close()
+            self.pipei.close()
+            # read the error descriptor until EOF
+            if reporterr:
+                for l in self.pipee:
+                    self.ui.status(_("remote: "), l)
+            self.pipee.close()
+        except:
+            pass
+
+    __del__ = cleanup
+
+    def do_cmd(self, cmd, **args):
+        self.ui.debug(_("sending %s command\n") % cmd)
+        self.pipeo.write("%s\n" % cmd)
+        for k, v in args.iteritems():
+            self.pipeo.write("%s %d\n" % (k, len(v)))
+            self.pipeo.write(v)
+        self.pipeo.flush()
+
+        return self.pipei
+
+    def _recv(self):
+        l = self.pipei.readline()
+        self.readerr()
+        try:
+            l = int(l)
+        except:
+            self.abort(error.ResponseError(_("unexpected response:"), l))
+        return self.pipei.read(l)
+
+    def _send(self, data, flush=False):
+        self.pipeo.write("%d\n" % len(data))
+        if data:
+            self.pipeo.write(data)
+        if flush:
+            self.pipeo.flush()
+        self.readerr()
+
+
 
 _store_klass = {
     'file': LocalStore,


More information about the Mercurial-devel mailing list