[PATCH 3 of 3] localrepo: extract stream clone application into reusable function

Gregory Szorc gregory.szorc at gmail.com
Thu May 21 12:41:34 CDT 2015


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1432230066 25200
#      Thu May 21 10:41:06 2015 -0700
# Node ID d7de3df8ca970d8757bdd9ed95faa56c01f47414
# Parent  6daeec09937f66004fd962e11f03019e3c508f37
localrepo: extract stream clone application into reusable function

The existing stream_in method assumes a streaming clone is applied via
the wire protocol. Previous patches have enabled streaming clone data to
be produced and consumed outside the context of the wire protocol.
However, the consuming part was incomplete because it didn't deal with
things like updating the branch caches or writing out a requirements
file.

This patch finishes the separation of stream clone handling from the
wire protocol. After this patch, it is possible to consume stream clones
from arbitrary sources, including files. Mozilla plans to leverage this
to serve pre-generated stream clone files to consumers, drastically
reducing the wall and CPU time required to clone large repositories.
This will enable clones to be nearly as fast as `tar`.

diff --git a/mercurial/localrepo.py b/mercurial/localrepo.py
--- a/mercurial/localrepo.py
+++ b/mercurial/localrepo.py
@@ -1754,30 +1754,42 @@ class localrepository(object):
         """
         return util.hooks()
 
     def stream_in(self, remote, remotereqs):
+        # Save remote branchmap. We will use it later
+        # to speed up branchcache creation
+        rbranchmap = None
+        if remote.capable("branchmap"):
+            rbranchmap = remote.branchmap()
+
+        fp = remote.stream_out()
+        l = fp.readline()
+        try:
+            resp = int(l)
+        except ValueError:
+            raise error.ResponseError(
+                _('unexpected response from remote server:'), l)
+        if resp == 1:
+            raise util.Abort(_('operation forbidden by server'))
+        elif resp == 2:
+            raise util.Abort(_('locking the remote repository failed'))
+        elif resp != 0:
+            raise util.Abort(_('the server sent an unknown error code'))
+
+        self.applystreamclone(remotereqs, rbranchmap, fp)
+        return len(self.heads()) + 1
+
+    def applystreamclone(self, remotereqs, remotebranchmap, fp):
+        """Apply stream clone data to this repository.
+
+        "remotereqs" is a set of requirements to handle the incoming data.
+        "remotebranchmap" is the result of a branchmap lookup on the remote. It
+        can be None.
+        "fp" is a file object containing the raw stream data, suitable for
+        feeding into exchange.consumestreamclone.
+        """
         lock = self.lock()
         try:
-            # Save remote branchmap. We will use it later
-            # to speed up branchcache creation
-            rbranchmap = None
-            if remote.capable("branchmap"):
-                rbranchmap = remote.branchmap()
-
-            fp = remote.stream_out()
-            l = fp.readline()
-            try:
-                resp = int(l)
-            except ValueError:
-                raise error.ResponseError(
-                    _('unexpected response from remote server:'), l)
-            if resp == 1:
-                raise util.Abort(_('operation forbidden by server'))
-            elif resp == 2:
-                raise util.Abort(_('locking the remote repository failed'))
-            elif resp != 0:
-                raise util.Abort(_('the server sent an unknown error code'))
-
             exchange.consumestreamclone(self, fp)
 
             # new requirements = old non-format requirements +
             #                    new format-related remote requirements
@@ -1786,12 +1798,12 @@ class localrepository(object):
                     self.requirements - self.supportedformats)
             self._applyopenerreqs()
             self._writerequirements()
 
-            if rbranchmap:
+            if remotebranchmap:
                 rbheads = []
                 closed = []
-                for bheads in rbranchmap.itervalues():
+                for bheads in remotebranchmap.itervalues():
                     rbheads.extend(bheads)
                     for h in bheads:
                         r = self.changelog.rev(h)
                         b, c = self.changelog.branchinfo(r)
@@ -1800,9 +1812,9 @@ class localrepository(object):
 
                 if rbheads:
                     rtiprev = max((int(self.changelog.rev(node))
                             for node in rbheads))
-                    cache = branchmap.branchcache(rbranchmap,
+                    cache = branchmap.branchcache(remotebranchmap,
                                                   self[rtiprev].node(),
                                                   rtiprev,
                                                   closednodes=closed)
                     # Try to stick it as low as possible
@@ -1813,9 +1825,8 @@ class localrepository(object):
                             self._branchcaches[candidate] = cache
                             cache.write(rview)
                             break
             self.invalidate()
-            return len(self.heads()) + 1
         finally:
             lock.release()
 
     def clone(self, remote, heads=[], stream=None):


More information about the Mercurial-devel mailing list