[PATCH 03 of 14 V3] streamclone: define first iteration of version 2 of stream format
Yuya Nishihara
yuya at tcha.org
Sat Jan 20 04:57:35 EST 2018
On Sat, 20 Jan 2018 00:47:08 +0100, Boris Feld wrote:
> # HG changeset patch
> # User Boris Feld <boris.feld at octobus.net>
> # Date 1516232936 -3600
> # Thu Jan 18 00:48:56 2018 +0100
> # Node ID 542df1a9814ff6e7e688c59528e3d8bad82f8c11
> # Parent 5dbc3c53c923b8d11b5efcaf0f415b3d8c8c5180
> # EXP-Topic b2-stream
> # Available At https://bitbucket.org/octobus/mercurial-devel/
> # hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 542df1a9814f
> streamclone: define first iteration of version 2 of stream format
> +def _emit(repo, entries, totalfilesize):
> + """actually emit the stream bundle"""
> + progress = repo.ui.progress
> + progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
> + vfs = repo.svfs
> + try:
> + seen = 0
> + for name, size in entries:
> + yield util.uvarintencode(len(name))
> + fp = vfs(name)
> + try:
> + yield util.uvarintencode(size)
> + yield name
> + if size <= 65536:
> + chunks = (fp.read(size),)
> + else:
> + chunks = util.filechunkiter(fp, limit=size)
> + for chunk in chunks:
> + seen += len(chunk)
> + progress(_('bundle'), seen, total=totalfilesize,
> + unit=_('bytes'))
> + yield chunk
> + finally:
> + fp.close()
> + finally:
> + progress(_('bundle'), None)
> +
> +def generatev2(repo):
> + """Emit content for version 2 of a streaming clone.
> +
> + the data stream consists the following entries:
> + 1) A varint containing the length of the filename
> + 2) A varint containing the length of file data
> + 3) N bytes containing the filename (the internal, store-agnostic form)
> + 4) N bytes containing the file data
> +
> + Returns a 3-tuple of (file count, file size, data iterator).
> + """
> +
> + with repo.lock():
> +
> + entries = []
> + totalfilesize = 0
> +
> + repo.ui.debug('scanning\n')
> + for name, ename, size in _walkstreamfiles(repo):
> + if size:
> + entries.append((name, size))
> + totalfilesize += size
> +
> + chunks = _emit(repo, entries, totalfilesize)
It seemed weird to release a lock before chunks are consumed, but generatev1()
had a commentary why we acquire a lock. It'll be nice if generatev2() has some
hints.
> +def consumev2(repo, fp, filecount, filesize):
> + """Apply the contents from a version 2 streaming clone.
> +
> + Data is read from an object that only needs to provide a ``read(size)``
> + method.
> + """
> + with repo.lock():
> + repo.ui.status(_('%d files to transfer, %s of data\n') %
> + (filecount, util.bytecount(filesize)))
> +
> + start = util.timer()
> + handledbytes = 0
> + progress = repo.ui.progress
> +
> + progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
> +
> + vfs = repo.svfs
> +
> + with repo.transaction('clone'):
> + with vfs.backgroundclosing(repo.ui):
> + for i in range(filecount):
> + namelen = util.uvarintdecodestream(fp)
> + datalen = util.uvarintdecodestream(fp)
> +
> + name = fp.read(namelen)
> +
> + if repo.ui.debugflag:
> + repo.ui.debug('adding %s (%s)\n' %
> + (name, util.bytecount(datalen)))
> +
> + with vfs(name, 'w') as ofp:
> + for chunk in util.filechunkiter(fp, limit=datalen):
> + handledbytes += len(chunk)
> + progress(_('clone'), handledbytes, total=filesize,
> + unit=_('bytes'))
> + ofp.write(chunk)
Nit: readexactly() could be used here to detect immature end of name/data
chunks.
More information about the Mercurial-devel
mailing list