[PATCH 2 of 2 V2] upgraderepo: add a config option for parallel computation

Augie Fackler raf at durin42.com
Fri Dec 6 14:11:31 EST 2019



> On Dec 2, 2019, at 05:50, Pierre-Yves David <pierre-yves.david at ens-lyon.org> wrote:
> 
> # HG changeset patch
> # User Pierre-Yves David <pierre-yves.david at octobus.net>
> # Date 1569765632 -7200
> #      Sun Sep 29 16:00:32 2019 +0200
> # Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2
> # Parent  eb24da344625c3c7c34ff71abce165889de3cc2f
> # EXP-Topic sidedata-copies-perf
> # Available At https://dev.heptapod.net/octobus/mercurial-devel/
> #              hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e
> upgraderepo: add a config option for parallel computation

queued, but I'd appreciate seeing numbers in the near-ash future for how much time this saves, or we should drop the use of multiprocessing IMO

> 
> The option is put to use to compute new copy tracing side data in parallel. It
> use the multiprocessing module as it had the appropriate primitive for what we
> needed. Gregory Szorc had concerned on windows so we disabled it there.
> 
> See inline comment for details on the parallel implementation.
> 
> diff --git a/mercurial/configitems.py b/mercurial/configitems.py
> --- a/mercurial/configitems.py
> +++ b/mercurial/configitems.py
> @@ -706,6 +706,9 @@ coreconfigitem(
>     b'experimental', b'worker.wdir-get-thread-safe', default=False,
> )
> coreconfigitem(
> +    b'experimental', b'worker.repository-upgrade', default=False,
> +)
> +coreconfigitem(
>     b'experimental', b'xdiff', default=False,
> )
> coreconfigitem(
> diff --git a/mercurial/copies.py b/mercurial/copies.py
> --- a/mercurial/copies.py
> +++ b/mercurial/copies.py
> @@ -8,6 +8,7 @@
> from __future__ import absolute_import
> 
> import collections
> +import multiprocessing
> import os
> 
> from .i18n import _
> @@ -989,6 +990,102 @@ def _getsidedata(srcrepo, rev):
> 
> 
> def getsidedataadder(srcrepo, destrepo):
> +    use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade')
> +    if pycompat.iswindows or not use_w:
> +        return _get_simple_sidedata_adder(srcrepo, destrepo)
> +    else:
> +        return _get_worker_sidedata_adder(srcrepo, destrepo)
> +
> +
> +def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
> +    """The function used by worker precomputing sidedata
> +
> +    It read an input queue containing revision numbers
> +    It write in an output queue containing (rev, <sidedata-map>)
> +
> +    The `None` input value is used as a stop signal.
> +
> +    The `tokens` semaphore is user to avoid having too many unprocessed
> +    entries. The workers needs to acquire one token before fetching a task.
> +    They will be released by the consumer of the produced data.
> +    """
> +    tokens.acquire()
> +    rev = revs_queue.get()
> +    while rev is not None:
> +        data = _getsidedata(srcrepo, rev)
> +        sidedata_queue.put((rev, data))
> +        tokens.acquire()
> +        rev = revs_queue.get()
> +    # processing of `None` is completed, release the token.
> +    tokens.release()
> +
> +
> +BUFF_PER_WORKER = 50
> +
> +
> +def _get_worker_sidedata_adder(srcrepo, destrepo):
> +    """The parallel version of the sidedata computation
> +
> +    This code spawn a pool of worker that precompute a buffer of sidedata
> +    before we actually need them"""
> +    # avoid circular import copies -> scmutil -> worker -> copies
> +    from . import worker
> +
> +    nbworkers = worker._numworkers(srcrepo.ui)
> +
> +    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
> +    revsq = multiprocessing.Queue()
> +    sidedataq = multiprocessing.Queue()
> +
> +    assert srcrepo.filtername is None
> +    # queue all tasks beforehand, revision numbers are small and it make
> +    # synchronisation simpler
> +    #
> +    # Since the computation for each node can be quite expensive, the overhead
> +    # of using a single queue is not revelant. In practice, most computation
> +    # are fast but some are very expensive and dominate all the other smaller
> +    # cost.
> +    for r in srcrepo.changelog.revs():
> +        revsq.put(r)
> +    # queue the "no more tasks" markers
> +    for i in range(nbworkers):
> +        revsq.put(None)
> +
> +    allworkers = []
> +    for i in range(nbworkers):
> +        args = (srcrepo, revsq, sidedataq, tokens)
> +        w = multiprocessing.Process(target=_sidedata_worker, args=args)
> +        allworkers.append(w)
> +        w.start()
> +
> +    # dictionnary to store results for revision higher than we one we are
> +    # looking for. For example, if we need the sidedatamap for 42, and 43 is
> +    # received, when shelve 43 for later use.
> +    staging = {}
> +
> +    def sidedata_companion(revlog, rev):
> +        sidedata = {}
> +        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
> +            # Is the data previously shelved ?
> +            sidedata = staging.pop(rev, None)
> +            if sidedata is None:
> +                # look at the queued result until we find the one we are lookig
> +                # for (shelve the other ones)
> +                r, sidedata = sidedataq.get()
> +                while r != rev:
> +                    staging[r] = sidedata
> +                    r, sidedata = sidedataq.get()
> +            tokens.release()
> +        return False, (), sidedata
> +
> +    return sidedata_companion
> +
> +
> +def _get_simple_sidedata_adder(srcrepo, destrepo):
> +    """The simple version of the sidedata computation
> +
> +    It just compute it in the same thread on request"""
> +
>     def sidedatacompanion(revlog, rev):
>         sidedata = {}
>         if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel at mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel



More information about the Mercurial-devel mailing list