[PATCH 2 of 3] exchange: move code for consuming streaming clone into exchange

Gregory Szorc gregory.szorc at gmail.com
Thu May 21 12:41:33 CDT 2015


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1432229265 25200
#      Thu May 21 10:27:45 2015 -0700
# Node ID 6daeec09937f66004fd962e11f03019e3c508f37
# Parent  0aa9c408c2be4492495dce4ae04aea4472eefbeb
exchange: move code for consuming streaming clone into exchange

For reasons outlined in the previous commit, we want to make the code
for consuming "stream bundles" reusable. This patch extracts the code
into a standalone function.

diff --git a/mercurial/exchange.py b/mercurial/exchange.py
--- a/mercurial/exchange.py
+++ b/mercurial/exchange.py
@@ -4,8 +4,9 @@
 #
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
+import time
 from i18n import _
 from node import hex, nullid
 import errno, urllib
 import util, scmutil, changegroup, base85, error, store
@@ -1396,4 +1397,67 @@ def generatestreamclone(repo):
                 for chunk in util.filechunkiter(sopener(name), limit=size):
                     yield chunk
     finally:
         sopener.mustaudit = oldaudit
+
+def consumestreamclone(repo, fp):
+    """Apply the contents from a streaming clone file.
+
+    This takes the output from "streamout" and applies it to the specified
+    repository.
+
+    Like "streamout," the status line added by the wire protocol is not handled
+    by this function.
+    """
+    lock = repo.lock()
+    try:
+        repo.ui.status(_('streaming all changes\n'))
+        l = fp.readline()
+        try:
+            total_files, total_bytes = map(int, l.split(' ', 1))
+        except (ValueError, TypeError):
+            raise error.ResponseError(
+                _('unexpected response from remote server:'), l)
+        repo.ui.status(_('%d files to transfer, %s of data\n') %
+                       (total_files, util.bytecount(total_bytes)))
+        handled_bytes = 0
+        repo.ui.progress(_('clone'), 0, total=total_bytes)
+        start = time.time()
+
+        tr = repo.transaction(_('clone'))
+        try:
+            for i in xrange(total_files):
+                # XXX doesn't support '\n' or '\r' in filenames
+                l = fp.readline()
+                try:
+                    name, size = l.split('\0', 1)
+                    size = int(size)
+                except (ValueError, TypeError):
+                    raise error.ResponseError(
+                        _('unexpected response from remote server:'), l)
+                if repo.ui.debugflag:
+                    repo.ui.debug('adding %s (%s)\n' %
+                                  (name, util.bytecount(size)))
+                # for backwards compat, name was partially encoded
+                ofp = repo.svfs(store.decodedir(name), 'w')
+                for chunk in util.filechunkiter(fp, limit=size):
+                    handled_bytes += len(chunk)
+                    repo.ui.progress(_('clone'), handled_bytes,
+                                     total=total_bytes)
+                    ofp.write(chunk)
+                ofp.close()
+            tr.close()
+        finally:
+            tr.release()
+
+        # Writing straight to files circumvented the inmemory caches
+        repo.invalidate()
+
+        elapsed = time.time() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        repo.ui.progress(_('clone'), None)
+        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(total_bytes), elapsed,
+                        util.bytecount(total_bytes / elapsed)))
+    finally:
+        lock.release()
diff --git a/mercurial/localrepo.py b/mercurial/localrepo.py
--- a/mercurial/localrepo.py
+++ b/mercurial/localrepo.py
@@ -1775,57 +1775,10 @@ class localrepository(object):
             elif resp == 2:
                 raise util.Abort(_('locking the remote repository failed'))
             elif resp != 0:
                 raise util.Abort(_('the server sent an unknown error code'))
-            self.ui.status(_('streaming all changes\n'))
-            l = fp.readline()
-            try:
-                total_files, total_bytes = map(int, l.split(' ', 1))
-            except (ValueError, TypeError):
-                raise error.ResponseError(
-                    _('unexpected response from remote server:'), l)
-            self.ui.status(_('%d files to transfer, %s of data\n') %
-                           (total_files, util.bytecount(total_bytes)))
-            handled_bytes = 0
-            self.ui.progress(_('clone'), 0, total=total_bytes)
-            start = time.time()
 
-            tr = self.transaction(_('clone'))
-            try:
-                for i in xrange(total_files):
-                    # XXX doesn't support '\n' or '\r' in filenames
-                    l = fp.readline()
-                    try:
-                        name, size = l.split('\0', 1)
-                        size = int(size)
-                    except (ValueError, TypeError):
-                        raise error.ResponseError(
-                            _('unexpected response from remote server:'), l)
-                    if self.ui.debugflag:
-                        self.ui.debug('adding %s (%s)\n' %
-                                      (name, util.bytecount(size)))
-                    # for backwards compat, name was partially encoded
-                    ofp = self.svfs(store.decodedir(name), 'w')
-                    for chunk in util.filechunkiter(fp, limit=size):
-                        handled_bytes += len(chunk)
-                        self.ui.progress(_('clone'), handled_bytes,
-                                         total=total_bytes)
-                        ofp.write(chunk)
-                    ofp.close()
-                tr.close()
-            finally:
-                tr.release()
-
-            # Writing straight to files circumvented the inmemory caches
-            self.invalidate()
-
-            elapsed = time.time() - start
-            if elapsed <= 0:
-                elapsed = 0.001
-            self.ui.progress(_('clone'), None)
-            self.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
-                           (util.bytecount(total_bytes), elapsed,
-                            util.bytecount(total_bytes / elapsed)))
+            exchange.consumestreamclone(self, fp)
 
             # new requirements = old non-format requirements +
             #                    new format-related remote requirements
             # requirements from the streamed-in repository


More information about the Mercurial-devel mailing list