[PATCH 1 of 8 stream clone bundles V2] streamclone: support for producing and consuming stream clone bundles

Gregory Szorc gregory.szorc at gmail.com
Sat Oct 17 18:44:57 UTC 2015


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1445105692 25200
#      Sat Oct 17 11:14:52 2015 -0700
# Node ID c2930ef1a6b6b7f69957cbf64a6ffee80ecea873
# Parent  e1568d5eb0522b3ab20a1cbb15ec06f6d198c9a8
streamclone: support for producing and consuming stream clone bundles

Up to this point, stream clones only existed as a dynamically generated
data format produced and consumed during streaming clones. In order to
support this efficient cloning format with the clone bundles feature, we
need a more formal, on disk representation of the streaming clone data.

This patch introduces a new "bundle" type for streaming clones. Unlike
existing bundles, it does not contain changegroup data. It does,
however, share the same concepts like the 4 byte header which identifies
the type of data that follows and the 2 byte abbreviation for
compression types (of which only "UN" is currently supported).

The new bundle format is essentially the existing stream clone version 1
data format with some headers at the beginning.

Content negotiation at stream clone request time checked for repository
format/requirements compatibility before initiating a stream clone. We
can't do active content negotiation when using clone bundles. So, we put
this set of requirements inside the payload so consumers have a built-in
mechanism for checking compatibility before reading and applying lots of
data. Of course, we will also advertise this requirements set in clone
bundles. But that's for another patch.

We currently don't have a mechanism to produce and consume this new
bundle format. This will be implemented in upcoming patches.

It's worth noting that if a legacy client attempts to `hg unbundle` a
stream clone bundle (with the "HGS1" header), it will abort with:
"unknown bundle version S1," which seems appropriate.

diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -6,8 +6,9 @@
 # GNU General Public License version 2 or any later version.
 
 from __future__ import absolute_import
 
+import struct
 import time
 
 from .i18n import _
 from . import (
@@ -235,8 +236,63 @@ def generatev1wireproto(repo):
     yield '%d %d\n' % (filecount, bytecount)
     for chunk in it:
         yield chunk
 
+def generatebundlev1(repo, compression='UN'):
+    """Emit content for version 1 of a stream clone bundle.
+
+    The first 4 bytes of the output ("HGS1") denote this as stream clone
+    bundle version 1.
+
+    The next 2 bytes indicate the compression type. Only "UN" is currently
+    supported.
+
+    The next 16 bytes are two 64-bit big endian unsigned integers indicating
+    file count and byte count, respectively.
+
+    The next 2 bytes is a 16-bit big endian unsigned short declaring the length
+    of the requirements string, including a trailing \0. The following N bytes
+    are the requirements string, which is ASCII containing a comma-delimited
+    list of repo requirements that are needed to support the data.
+
+    The remaining content is the output of ``generatev1()`` (which may be
+    compressed in the future).
+
+    Returns a tuple of (requirements, data generator).
+    """
+    if compression != 'UN':
+        raise ValueError('we do not support the compression argument yet')
+
+    requirements = repo.requirements & repo.supportedformats
+    requires = ','.join(sorted(requirements))
+
+    def gen():
+        yield 'HGS1'
+        yield compression
+
+        filecount, bytecount, it = generatev1(repo)
+        repo.ui.status(_('writing %d bytes for %d files\n') %
+                         (bytecount, filecount))
+
+        yield struct.pack('>QQ', filecount, bytecount)
+        yield struct.pack('>H', len(requires) + 1)
+        yield requires + '\0'
+
+        # This is where we'll add compression in the future.
+        assert compression == 'UN'
+
+        seen = 0
+        repo.ui.progress(_('bundle'), 0, total=bytecount)
+
+        for chunk in it:
+            seen += len(chunk)
+            repo.ui.progress(_('bundle'), seen, total=bytecount)
+            yield chunk
+
+        repo.ui.progress(_('bundle'), None)
+
+    return requirements, gen()
+
 def consumev1(repo, fp, filecount, bytecount):
     """Apply the contents from version 1 of a streaming clone file handle.
 
     This takes the output from "streamout" and applies it to the specified
@@ -289,4 +345,48 @@ def consumev1(repo, fp, filecount, bytec
                        (util.bytecount(bytecount), elapsed,
                         util.bytecount(bytecount / elapsed)))
     finally:
         lock.release()
+
+def applybundlev1(repo, fp):
+    """Apply the content from a stream clone bundle version 1.
+
+    We assume the 4 byte header has been read and validated and the file handle
+    is at the 2 byte compression identifier.
+    """
+    if len(repo):
+        raise error.Abort(_('cannot apply stream clone bundle on non-empty '
+                            'repo'))
+
+    compression = fp.read(2)
+    if compression != 'UN':
+        raise error.Abort(_('only uncompressed stream clone bundles are '
+            'supported; got %s') % compression)
+
+    filecount, bytecount = struct.unpack('>QQ', fp.read(16))
+    requireslen = struct.unpack('>H', fp.read(2))[0]
+    requires = fp.read(requireslen)
+
+    if not requires.endswith('\0'):
+        raise error.Abort(_('malformed stream clone bundle: '
+                            'requirements not properly encoded'))
+
+    requirements = set(requires.rstrip('\0').split(','))
+    missingreqs = requirements - repo.supportedformats
+    if missingreqs:
+        raise error.Abort(_('unable to apply stream clone: '
+                            'unsupported format: %s') %
+                            ', '.join(sorted(missingreqs)))
+
+    consumev1(repo, fp, filecount, bytecount)
+
+class streamcloneapplier(object):
+    """Class to manage applying streaming clone bundles.
+
+    We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
+    readers to perform bundle type-specific functionality.
+    """
+    def __init__(self, fh):
+        self._fh = fh
+
+    def apply(self, repo):
+        return applybundlev1(repo, self._fh)


More information about the Mercurial-devel mailing list