[PATCH 2 of 2 V2] upgraderepo: add a config option for parallel computation
Augie Fackler
raf at durin42.com
Fri Dec 6 14:11:31 EST 2019
> On Dec 2, 2019, at 05:50, Pierre-Yves David <pierre-yves.david at ens-lyon.org> wrote:
>
> # HG changeset patch
> # User Pierre-Yves David <pierre-yves.david at octobus.net>
> # Date 1569765632 -7200
> # Sun Sep 29 16:00:32 2019 +0200
> # Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2
> # Parent eb24da344625c3c7c34ff71abce165889de3cc2f
> # EXP-Topic sidedata-copies-perf
> # Available At https://dev.heptapod.net/octobus/mercurial-devel/
> # hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e
> upgraderepo: add a config option for parallel computation
queued, but I'd appreciate seeing numbers in the near-ash future for how much time this saves, or we should drop the use of multiprocessing IMO
>
> The option is put to use to compute new copy tracing side data in parallel. It
> use the multiprocessing module as it had the appropriate primitive for what we
> needed. Gregory Szorc had concerned on windows so we disabled it there.
>
> See inline comment for details on the parallel implementation.
>
> diff --git a/mercurial/configitems.py b/mercurial/configitems.py
> --- a/mercurial/configitems.py
> +++ b/mercurial/configitems.py
> @@ -706,6 +706,9 @@ coreconfigitem(
> b'experimental', b'worker.wdir-get-thread-safe', default=False,
> )
> coreconfigitem(
> + b'experimental', b'worker.repository-upgrade', default=False,
> +)
> +coreconfigitem(
> b'experimental', b'xdiff', default=False,
> )
> coreconfigitem(
> diff --git a/mercurial/copies.py b/mercurial/copies.py
> --- a/mercurial/copies.py
> +++ b/mercurial/copies.py
> @@ -8,6 +8,7 @@
> from __future__ import absolute_import
>
> import collections
> +import multiprocessing
> import os
>
> from .i18n import _
> @@ -989,6 +990,102 @@ def _getsidedata(srcrepo, rev):
>
>
> def getsidedataadder(srcrepo, destrepo):
> + use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade')
> + if pycompat.iswindows or not use_w:
> + return _get_simple_sidedata_adder(srcrepo, destrepo)
> + else:
> + return _get_worker_sidedata_adder(srcrepo, destrepo)
> +
> +
> +def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
> + """The function used by worker precomputing sidedata
> +
> + It read an input queue containing revision numbers
> + It write in an output queue containing (rev, <sidedata-map>)
> +
> + The `None` input value is used as a stop signal.
> +
> + The `tokens` semaphore is user to avoid having too many unprocessed
> + entries. The workers needs to acquire one token before fetching a task.
> + They will be released by the consumer of the produced data.
> + """
> + tokens.acquire()
> + rev = revs_queue.get()
> + while rev is not None:
> + data = _getsidedata(srcrepo, rev)
> + sidedata_queue.put((rev, data))
> + tokens.acquire()
> + rev = revs_queue.get()
> + # processing of `None` is completed, release the token.
> + tokens.release()
> +
> +
> +BUFF_PER_WORKER = 50
> +
> +
> +def _get_worker_sidedata_adder(srcrepo, destrepo):
> + """The parallel version of the sidedata computation
> +
> + This code spawn a pool of worker that precompute a buffer of sidedata
> + before we actually need them"""
> + # avoid circular import copies -> scmutil -> worker -> copies
> + from . import worker
> +
> + nbworkers = worker._numworkers(srcrepo.ui)
> +
> + tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
> + revsq = multiprocessing.Queue()
> + sidedataq = multiprocessing.Queue()
> +
> + assert srcrepo.filtername is None
> + # queue all tasks beforehand, revision numbers are small and it make
> + # synchronisation simpler
> + #
> + # Since the computation for each node can be quite expensive, the overhead
> + # of using a single queue is not revelant. In practice, most computation
> + # are fast but some are very expensive and dominate all the other smaller
> + # cost.
> + for r in srcrepo.changelog.revs():
> + revsq.put(r)
> + # queue the "no more tasks" markers
> + for i in range(nbworkers):
> + revsq.put(None)
> +
> + allworkers = []
> + for i in range(nbworkers):
> + args = (srcrepo, revsq, sidedataq, tokens)
> + w = multiprocessing.Process(target=_sidedata_worker, args=args)
> + allworkers.append(w)
> + w.start()
> +
> + # dictionnary to store results for revision higher than we one we are
> + # looking for. For example, if we need the sidedatamap for 42, and 43 is
> + # received, when shelve 43 for later use.
> + staging = {}
> +
> + def sidedata_companion(revlog, rev):
> + sidedata = {}
> + if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
> + # Is the data previously shelved ?
> + sidedata = staging.pop(rev, None)
> + if sidedata is None:
> + # look at the queued result until we find the one we are lookig
> + # for (shelve the other ones)
> + r, sidedata = sidedataq.get()
> + while r != rev:
> + staging[r] = sidedata
> + r, sidedata = sidedataq.get()
> + tokens.release()
> + return False, (), sidedata
> +
> + return sidedata_companion
> +
> +
> +def _get_simple_sidedata_adder(srcrepo, destrepo):
> + """The simple version of the sidedata computation
> +
> + It just compute it in the same thread on request"""
> +
> def sidedatacompanion(revlog, rev):
> sidedata = {}
> if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel at mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
More information about the Mercurial-devel
mailing list