[PATCH 03 of 14 V3] streamclone: define first iteration of version 2 of stream format

Yuya Nishihara yuya at tcha.org
Sat Jan 20 04:57:35 EST 2018


On Sat, 20 Jan 2018 00:47:08 +0100, Boris Feld wrote:
> # HG changeset patch
> # User Boris Feld <boris.feld at octobus.net>
> # Date 1516232936 -3600
> #      Thu Jan 18 00:48:56 2018 +0100
> # Node ID 542df1a9814ff6e7e688c59528e3d8bad82f8c11
> # Parent  5dbc3c53c923b8d11b5efcaf0f415b3d8c8c5180
> # EXP-Topic b2-stream
> # Available At https://bitbucket.org/octobus/mercurial-devel/
> #              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 542df1a9814f
> streamclone: define first iteration of version 2 of stream format

> +def _emit(repo, entries, totalfilesize):
> +    """actually emit the stream bundle"""
> +    progress = repo.ui.progress
> +    progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
> +    vfs = repo.svfs
> +    try:
> +        seen = 0
> +        for name, size in entries:
> +            yield util.uvarintencode(len(name))
> +            fp = vfs(name)
> +            try:
> +                yield util.uvarintencode(size)
> +                yield name
> +                if size <= 65536:
> +                    chunks = (fp.read(size),)
> +                else:
> +                    chunks = util.filechunkiter(fp, limit=size)
> +                for chunk in chunks:
> +                    seen += len(chunk)
> +                    progress(_('bundle'), seen, total=totalfilesize,
> +                             unit=_('bytes'))
> +                    yield chunk
> +            finally:
> +                fp.close()
> +    finally:
> +        progress(_('bundle'), None)
> +
> +def generatev2(repo):
> +    """Emit content for version 2 of a streaming clone.
> +
> +    the data stream consists the following entries:
> +    1) A varint containing the length of the filename
> +    2) A varint containing the length of file data
> +    3) N bytes containing the filename (the internal, store-agnostic form)
> +    4) N bytes containing the file data
> +
> +    Returns a 3-tuple of (file count, file size, data iterator).
> +    """
> +
> +    with repo.lock():
> +
> +        entries = []
> +        totalfilesize = 0
> +
> +        repo.ui.debug('scanning\n')
> +        for name, ename, size in _walkstreamfiles(repo):
> +            if size:
> +                entries.append((name, size))
> +                totalfilesize += size
> +
> +        chunks = _emit(repo, entries, totalfilesize)

It seemed weird to release a lock before chunks are consumed, but generatev1()
had a commentary why we acquire a lock. It'll be nice if generatev2() has some
hints.

> +def consumev2(repo, fp, filecount, filesize):
> +    """Apply the contents from a version 2 streaming clone.
> +
> +    Data is read from an object that only needs to provide a ``read(size)``
> +    method.
> +    """
> +    with repo.lock():
> +        repo.ui.status(_('%d files to transfer, %s of data\n') %
> +                       (filecount, util.bytecount(filesize)))
> +
> +        start = util.timer()
> +        handledbytes = 0
> +        progress = repo.ui.progress
> +
> +        progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
> +
> +        vfs = repo.svfs
> +
> +        with repo.transaction('clone'):
> +            with vfs.backgroundclosing(repo.ui):
> +                for i in range(filecount):
> +                    namelen = util.uvarintdecodestream(fp)
> +                    datalen = util.uvarintdecodestream(fp)
> +
> +                    name = fp.read(namelen)
> +
> +                    if repo.ui.debugflag:
> +                        repo.ui.debug('adding %s (%s)\n' %
> +                                      (name, util.bytecount(datalen)))
> +
> +                    with vfs(name, 'w') as ofp:
> +                        for chunk in util.filechunkiter(fp, limit=datalen):
> +                            handledbytes += len(chunk)
> +                            progress(_('clone'), handledbytes, total=filesize,
> +                                     unit=_('bytes'))
> +                            ofp.write(chunk)

Nit: readexactly() could be used here to detect immature end of name/data
chunks.


More information about the Mercurial-devel mailing list