[PATCH 15 of 15] streamclone: also stream caches to the client
Augie Fackler
raf at durin42.com
Fri Jan 19 15:39:42 EST 2018
On Fri, Jan 19, 2018 at 09:08:59PM +0100, Boris Feld wrote:
> # HG changeset patch
> # User Boris Feld <boris.feld at octobus.net>
> # Date 1516233012 -3600
> # Thu Jan 18 00:50:12 2018 +0100
> # Node ID cc93d342d0a692565edc6a1c8cf8acdea36a0980
> # Parent fec6950ccabdd6d93484732b341ae06697954890
> # EXP-Topic b2-stream
> # Available At https://bitbucket.org/octobus/mercurial-devel/
> # hg pull https://bitbucket.org/octobus/mercurial-devel/ -r cc93d342d0a6
> streamclone: also stream caches to the client
>
> When stream clone is used over bundle2, relevant cache files are also streamed.
> This is expected to be a massive performance win for clone since no important
> cache will have to be recomputed.
Some numbers here would be nice.
>
> diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py
> --- a/mercurial/streamclone.py
> +++ b/mercurial/streamclone.py
> @@ -11,10 +11,12 @@ import contextlib
> import os
> import struct
> import tempfile
> +import warnings
>
> from .i18n import _
> from . import (
> branchmap,
> + cacheutil,
> error,
> phases,
> store,
> @@ -435,6 +437,10 @@ class streamcloneapplier(object):
> _fileappend = 0 # append only file
> _filefull = 1 # full snapshot file
>
> +# Source of the file
> +_srcstore = 's' # store (svfs)
> +_srccache = 'c' # cache (cache)
> +
> # This is it's own function so extensions can override it.
> def _walkstreamfullstorefiles(repo):
> """list snapshot file from the store"""
> @@ -443,12 +449,12 @@ def _walkstreamfullstorefiles(repo):
> fnames.append('phaseroots')
> return fnames
>
> -def _filterfull(entry, copy, vfs):
> +def _filterfull(entry, copy, vfsmap):
> """actually copy the snapshot files"""
> - name, ftype, data = entry
> + src, name, ftype, data = entry
> if ftype != _filefull:
> return entry
> - return (name, ftype, copy(vfs.join(name)))
> + return (src, name, ftype, copy(vfsmap[src].join(name)))
>
> @contextlib.contextmanager
> def maketempcopies():
> @@ -466,19 +472,33 @@ def maketempcopies():
> for tmp in files:
> util.tryunlink(tmp)
>
> +def _makemap(repo):
> + """make a (src -> vfs) map for the repo"""
> + vfsmap = {
> + _srcstore: repo.svfs,
> + _srccache: repo.cachevfs,
> + }
> + # we keep repo.vfs out of the on purpose, ther are too many danger there
> + # (eg: .hg/hgrc)
> + assert repo.vfs not in vfsmap.values()
> +
> + return vfsmap
> +
> def _emit(repo, entries, totalfilesize):
> """actually emit the stream bundle"""
> - vfs = repo.svfs
> + vfsmap = _makemap(repo)
> progress = repo.ui.progress
> progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
> with maketempcopies() as copy:
> try:
> # copy is delayed until we are in the try
> - entries = [_filterfull(e, copy, vfs) for e in entries]
> + entries = [_filterfull(e, copy, vfsmap) for e in entries]
> yield None # this release the lock on the repository
> seen = 0
>
> - for name, ftype, data in entries:
> + for src, name, ftype, data in entries:
> + vfs = vfsmap[src]
> + yield src
> yield util.uvarintencode(len(name))
> if ftype == _fileappend:
> fp = vfs(name)
> @@ -507,10 +527,11 @@ def generatev2(repo):
> """Emit content for version 2 of a streaming clone.
>
> the data stream consists the following entries:
> - 1) A varint containing the length of the filename
> - 2) A varint containing the length of file data
> - 3) N bytes containing the filename (the internal, store-agnostic form)
> - 4) N bytes containing the file data
> + 1) A char representing the file destination (eg: store or cache)
> + 2) A varint containing the length of the filename
> + 3) A varint containing the length of file data
> + 4) N bytes containing the filename (the internal, store-agnostic form)
> + 5) N bytes containing the file data
>
> Returns a 3-tuple of (file count, file size, data iterator).
> """
> @@ -523,12 +544,16 @@ def generatev2(repo):
> repo.ui.debug('scanning\n')
> for name, ename, size in _walkstreamfiles(repo):
> if size:
> - entries.append((name, _fileappend, size))
> + entries.append((_srcstore, name, _fileappend, size))
> totalfilesize += size
> for name in _walkstreamfullstorefiles(repo):
> if repo.svfs.exists(name):
> totalfilesize += repo.svfs.lstat(name).st_size
> - entries.append((name, _filefull, None))
> + entries.append((_srcstore, name, _filefull, None))
> + for name in cacheutil.cachetocopy(repo):
> + if repo.cachevfs.exists(name):
> + totalfilesize += repo.cachevfs.lstat(name).st_size
> + entries.append((_srccache, name, _filefull, None))
>
> chunks = _emit(repo, entries, totalfilesize)
> first = next(chunks)
> @@ -536,6 +561,16 @@ def generatev2(repo):
>
> return len(entries), totalfilesize, chunks
>
> + at contextlib.contextmanager
> +def nested(*ctxs):
> + with warnings.catch_warnings():
> + # For some reason, Python decided 'nested' was deprecated without
> + # replacement. They officially advertised for filtering the deprecation
> + # warning for people who actually need the feature.
> + warnings.filterwarnings("ignore",category=DeprecationWarning)
> + with contextlib.nested(*ctxs):
> + yield
> +
> def consumev2(repo, fp, filecount, filesize):
> """Apply the contents from a version 2 streaming clone.
>
> @@ -552,19 +587,23 @@ def consumev2(repo, fp, filecount, files
>
> progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
>
> - vfs = repo.svfs
> + vfsmap = _makemap(repo)
>
> with repo.transaction('clone'):
> - with vfs.backgroundclosing(repo.ui):
> + ctxs = (vfs.backgroundclosing(repo.ui)
> + for vfs in vfsmap.values())
> + with nested(*ctxs):
> for i in range(filecount):
> + src = fp.read(1)
> + vfs = vfsmap[src]
> namelen = util.uvarintdecodestream(fp)
> datalen = util.uvarintdecodestream(fp)
>
> name = fp.read(namelen)
>
> if repo.ui.debugflag:
> - repo.ui.debug('adding %s (%s)\n' %
> - (name, util.bytecount(datalen)))
> + repo.ui.debug('adding [%s] %s (%s)\n' %
> + (src, name, util.bytecount(datalen)))
>
> with vfs(name, 'w') as ofp:
> for chunk in util.filechunkiter(fp, limit=datalen):
> diff --git a/tests/test-clone-uncompressed.t b/tests/test-clone-uncompressed.t
> --- a/tests/test-clone-uncompressed.t
> +++ b/tests/test-clone-uncompressed.t
> @@ -38,8 +38,13 @@ Basic clone
> #if stream-bundle2
> $ hg clone --stream -U http://localhost:$HGPORT clone1
> streaming all changes
> - 1027 files to transfer, 96.3 KB of data
> - transferred 96.3 KB in * seconds (* */sec) (glob)
> + 1030 files to transfer, 96.4 KB of data
> + transferred 96.4 KB in * seconds (* */sec) (glob)
> +
> + $ ls -1 clone1/.hg/cache
> + branch2-served
> + rbc-names-v1
> + rbc-revs-v1
> #endif
>
> --uncompressed is an alias to --stream
> @@ -55,8 +60,8 @@ Basic clone
> #if stream-bundle2
> $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
> streaming all changes
> - 1027 files to transfer, 96.3 KB of data
> - transferred 96.3 KB in * seconds (* */sec) (glob)
> + 1030 files to transfer, 96.4 KB of data
> + transferred 96.4 KB in * seconds (* */sec) (glob)
> #endif
>
> Clone with background file closing enabled
> @@ -95,10 +100,11 @@ Clone with background file closing enabl
> bundle2-input-bundle: with-transaction
> bundle2-input-part: "stream" (params: 4 mandatory) supported
> applying stream bundle
> - 1027 files to transfer, 96.3 KB of data
> + 1030 files to transfer, 96.4 KB of data
> + starting 4 threads for background file closing
> starting 4 threads for background file closing
> - transferred 96.3 KB in * seconds (* */sec) (glob)
> - bundle2-input-part: total payload size 110887
> + transferred 96.4 KB in * seconds (* */sec) (glob)
> + bundle2-input-part: total payload size 112077
> bundle2-input-part: "listkeys" (params: 1 mandatory) supported
> bundle2-input-bundle: 1 parts total
> checking for updated bookmarks
> @@ -136,8 +142,8 @@ Streaming of secrets can be overridden b
> #if stream-bundle2
> $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
> streaming all changes
> - 1027 files to transfer, 96.3 KB of data
> - transferred 96.3 KB in * seconds (* */sec) (glob)
> + 1030 files to transfer, 96.4 KB of data
> + transferred 96.4 KB in * seconds (* */sec) (glob)
> #endif
>
> $ killdaemons.py
> @@ -253,8 +259,8 @@ clone it
> #if stream-bundle2
> $ hg clone --stream http://localhost:$HGPORT with-bookmarks
> streaming all changes
> - 1027 files to transfer, 96.3 KB of data
> - transferred 96.3 KB in * seconds (* */sec) (glob)
> + 1033 files to transfer, 96.6 KB of data
> + transferred 96.6 KB in * seconds (* */sec) (glob)
> updating to branch default
> 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
> #endif
> @@ -283,8 +289,8 @@ Clone as publishing
> #if stream-bundle2
> $ hg clone --stream http://localhost:$HGPORT phase-publish
> streaming all changes
> - 1027 files to transfer, 96.3 KB of data
> - transferred 96.3 KB in * seconds (* */sec) (glob)
> + 1033 files to transfer, 96.6 KB of data
> + transferred 96.6 KB in * seconds (* */sec) (glob)
> updating to branch default
> 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
> #endif
> @@ -318,8 +324,8 @@ Clone as non publishing
> #if stream-bundle2
> $ hg clone --stream http://localhost:$HGPORT phase-no-publish
> streaming all changes
> - 1028 files to transfer, 96.4 KB of data
> - transferred 96.4 KB in * seconds (* */sec) (glob)
> + 1034 files to transfer, 96.7 KB of data
> + transferred 96.7 KB in * seconds (* */sec) (glob)
> updating to branch default
> 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
> $ hg -R phase-no-publish phase -r 'all()'
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel at mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
More information about the Mercurial-devel
mailing list