[PATCH 1 of 3] exchange: move code for generating a streaming clone into exchange

Gregory Szorc gregory.szorc at gmail.com
Thu May 21 12:49:32 CDT 2015


marmoute, durin42, et al: I'd like you to think about possibly integrating
the "stream clone" data format into changegroups/bundles. I'm not sure if
this is something we could sneak into cg1 or cg2 as an alternative
"compression" format or whether this will require a bundle2 part. Whatever
the solution, this is something I'd like to see landed in 3.5 so servers
can emit the stream clone data to vanilla clients.

As it stands, I imagine Mozilla will copy the stream handling code into our
bundleclone extension until we can consume stream clones with vanilla
Mercurial, presumably with bundle2.

On Thu, May 21, 2015 at 10:41 AM, Gregory Szorc <gregory.szorc at gmail.com>
wrote:

> # HG changeset patch
> # User Gregory Szorc <gregory.szorc at gmail.com>
> # Date 1432229242 25200
> #      Thu May 21 10:27:22 2015 -0700
> # Node ID 0aa9c408c2be4492495dce4ae04aea4472eefbeb
> # Parent  451df92cec4912aefac57a4cf82e9268192c867b
> exchange: move code for generating a streaming clone into exchange
>
> Streaming clones are fast because they are essentially tar files.
> On mozilla-central, a streaming clone only consumes ~55s CPU time
> on clients as opposed to ~340s CPU time for a regular clone or gzip
> bundle unbundle.
>
> Mozilla is deploying static file "lookaside" support to our Mercurial
> server. Static bundles are pre-generated and uploaded to S3. When a
> clone is performed, the static file is fetched, applied, and then an
> incremental pull is performed. Unfortunately, on an ideal network
> connection this still takes as much wall and CPU time as a regular
> clone (although it does save significant server resources).
>
> We like the client-side wall time wins of streaming clones. But we want
> to leverage S3-based pre-generated files for serving the bulk of clone
> data.
>
> This patch moves the code for producing a "stream bundle" into its
> own standalone function, away from the wire protocol. This will enable
> stream bundle files to be produced outside the context of the wire
> protocol.
>
> A bikeshed on whether exchange is the best module for this function
> might be warranted. I selected exchange instead of changegroup because
> "stream bundles" aren't changegroups (yet).
>
> diff --git a/mercurial/exchange.py b/mercurial/exchange.py
> --- a/mercurial/exchange.py
> +++ b/mercurial/exchange.py
> @@ -7,9 +7,9 @@
>
>  from i18n import _
>  from node import hex, nullid
>  import errno, urllib
> -import util, scmutil, changegroup, base85, error
> +import util, scmutil, changegroup, base85, error, store
>  import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
>  import lock as lockmod
>
>  def readbundle(ui, fh, fname, vfs=None):
> @@ -1331,4 +1331,69 @@ def unbundle(repo, cg, heads, source, ur
>          lockmod.release(tr, lock, wlock)
>          if recordout is not None:
>              recordout(repo.ui.popbuffer())
>      return r
> +
> +# This is it's own function so extensions can override it.
> +def _walkstreamfiles(repo):
> +    return repo.store.walk()
> +
> +def generatestreamclone(repo):
> +    """Emit content for a streaming clone.
> +
> +    This is a generator of raw chunks that constitute a streaming clone.
> +
> +    The stream begins with a line of 2 space-delimited integers
> containing the
> +    number of entries and total bytes size.
> +
> +    Next, are N entries for each file being transferred. Each file entry
> starts
> +    as a line with the file name and integer size delimited by a null
> byte.
> +    The raw file data follows. Following the raw file data is the next
> file
> +    entry, or EOF.
> +
> +    When used on the wire protocol, an additional line indicating protocol
> +    success will be prepended to the stream. This function is not
> responsible
> +    for adding it.
> +
> +    This function will obtain a repository lock to ensure a consistent
> view of
> +    the store is captured. It therefore may raise LockError.
> +    """
> +    entries = []
> +    total_bytes = 0
> +    # Get consistent snapshot of repo, lock during scan.
> +    lock = repo.lock()
> +    try:
> +        repo.ui.debug('scanning\n')
> +        for name, ename, size in _walkstreamfiles(repo):
> +            if size:
> +                entries.append((name, size))
> +                total_bytes += size
> +    finally:
> +            lock.release()
> +
> +    repo.ui.debug('%d files, %d bytes to transfer\n' %
> +                  (len(entries), total_bytes))
> +    yield '%d %d\n' % (len(entries), total_bytes)
> +
> +    sopener = repo.svfs
> +    oldaudit = sopener.mustaudit
> +    debugflag = repo.ui.debugflag
> +    sopener.mustaudit = False
> +
> +    try:
> +        for name, size in entries:
> +            if debugflag:
> +                repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
> +            # partially encode name over the wire for backwards compat
> +            yield '%s\0%d\n' % (store.encodedir(name), size)
> +            if size <= 65536:
> +                fp = sopener(name)
> +                try:
> +                    data = fp.read(size)
> +                finally:
> +                    fp.close()
> +                yield data
> +            else:
> +                for chunk in util.filechunkiter(sopener(name),
> limit=size):
> +                    yield chunk
> +    finally:
> +        sopener.mustaudit = oldaudit
> diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
> --- a/mercurial/wireproto.py
> +++ b/mercurial/wireproto.py
> @@ -743,75 +743,29 @@ def pushkey(repo, proto, namespace, key,
>
>  def _allowstream(ui):
>      return ui.configbool('server', 'uncompressed', True, untrusted=True)
>
> -def _walkstreamfiles(repo):
> -    # this is it's own function so extensions can override it
> -    return repo.store.walk()
> -
>  @wireprotocommand('stream_out')
>  def stream(repo, proto):
>      '''If the server supports streaming clone, it advertises the "stream"
>      capability with a value representing the version and flags of the repo
>      it is serving. Client checks to see if it understands the format.
> -
> -    The format is simple: the server writes out a line with the amount
> -    of files, then the total amount of bytes to be transferred (separated
> -    by a space). Then, for each file, the server first writes the filename
> -    and file size (separated by the null character), then the file
> contents.
>      '''
> -
>      if not _allowstream(repo.ui):
>          return '1\n'
>
> -    entries = []
> -    total_bytes = 0
> +    def getstream(it):
> +        yield '0\n'
> +        for chunk in it:
> +            yield chunk
> +
>      try:
> -        # get consistent snapshot of repo, lock during scan
> -        lock = repo.lock()
> -        try:
> -            repo.ui.debug('scanning\n')
> -            for name, ename, size in _walkstreamfiles(repo):
> -                if size:
> -                    entries.append((name, size))
> -                    total_bytes += size
> -        finally:
> -            lock.release()
> +        # LockError may be raised before the first result is yielded.
> Don't
> +        # emit output until we're sure we got the lock successfully.
> +        it = exchange.generatestreamclone(repo)
> +        return streamres(getstream(it))
>      except error.LockError:
> -        return '2\n' # error: 2
> -
> -    def streamer(repo, entries, total):
> -        '''stream out all metadata files in repository.'''
> -        yield '0\n' # success
> -        repo.ui.debug('%d files, %d bytes to transfer\n' %
> -                      (len(entries), total_bytes))
> -        yield '%d %d\n' % (len(entries), total_bytes)
> -
> -        sopener = repo.svfs
> -        oldaudit = sopener.mustaudit
> -        debugflag = repo.ui.debugflag
> -        sopener.mustaudit = False
> -
> -        try:
> -            for name, size in entries:
> -                if debugflag:
> -                    repo.ui.debug('sending %s (%d bytes)\n' % (name,
> size))
> -                # partially encode name over the wire for backwards compat
> -                yield '%s\0%d\n' % (store.encodedir(name), size)
> -                if size <= 65536:
> -                    fp = sopener(name)
> -                    try:
> -                        data = fp.read(size)
> -                    finally:
> -                        fp.close()
> -                    yield data
> -                else:
> -                    for chunk in util.filechunkiter(sopener(name),
> limit=size):
> -                        yield chunk
> -        finally:
> -            sopener.mustaudit = oldaudit
> -
> -    return streamres(streamer(repo, entries, total_bytes))
> +        return '2\n'
>
>  @wireprotocommand('unbundle', 'heads')
>  def unbundle(repo, proto, heads):
>      their_heads = decodelist(heads)
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://selenic.com/pipermail/mercurial-devel/attachments/20150521/f7ca9fa4/attachment.html>


More information about the Mercurial-devel mailing list