[PATCH 2 of 2 V2] upgraderepo: add a config option for parallel computation

Pierre-Yves David pierre-yves.david at ens-lyon.org
Mon Dec 2 05:50:19 EST 2019


# HG changeset patch
# User Pierre-Yves David <pierre-yves.david at octobus.net>
# Date 1569765632 -7200
#      Sun Sep 29 16:00:32 2019 +0200
# Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2
# Parent  eb24da344625c3c7c34ff71abce165889de3cc2f
# EXP-Topic sidedata-copies-perf
# Available At https://dev.heptapod.net/octobus/mercurial-devel/
#              hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e
upgraderepo: add a config option for parallel computation

The option is put to use to compute new copy tracing side data in parallel. It
use the multiprocessing module as it had the appropriate primitive for what we
needed. Gregory Szorc had concerned on windows so we disabled it there.

See inline comment for details on the parallel implementation.

diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -706,6 +706,9 @@ coreconfigitem(
     b'experimental', b'worker.wdir-get-thread-safe', default=False,
 )
 coreconfigitem(
+    b'experimental', b'worker.repository-upgrade', default=False,
+)
+coreconfigitem(
     b'experimental', b'xdiff', default=False,
 )
 coreconfigitem(
diff --git a/mercurial/copies.py b/mercurial/copies.py
--- a/mercurial/copies.py
+++ b/mercurial/copies.py
@@ -8,6 +8,7 @@
 from __future__ import absolute_import
 
 import collections
+import multiprocessing
 import os
 
 from .i18n import _
@@ -989,6 +990,102 @@ def _getsidedata(srcrepo, rev):
 
 
 def getsidedataadder(srcrepo, destrepo):
+    use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade')
+    if pycompat.iswindows or not use_w:
+        return _get_simple_sidedata_adder(srcrepo, destrepo)
+    else:
+        return _get_worker_sidedata_adder(srcrepo, destrepo)
+
+
+def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
+    """The function used by worker precomputing sidedata
+
+    It read an input queue containing revision numbers
+    It write in an output queue containing (rev, <sidedata-map>)
+
+    The `None` input value is used as a stop signal.
+
+    The `tokens` semaphore is user to avoid having too many unprocessed
+    entries. The workers needs to acquire one token before fetching a task.
+    They will be released by the consumer of the produced data.
+    """
+    tokens.acquire()
+    rev = revs_queue.get()
+    while rev is not None:
+        data = _getsidedata(srcrepo, rev)
+        sidedata_queue.put((rev, data))
+        tokens.acquire()
+        rev = revs_queue.get()
+    # processing of `None` is completed, release the token.
+    tokens.release()
+
+
+BUFF_PER_WORKER = 50
+
+
+def _get_worker_sidedata_adder(srcrepo, destrepo):
+    """The parallel version of the sidedata computation
+
+    This code spawn a pool of worker that precompute a buffer of sidedata
+    before we actually need them"""
+    # avoid circular import copies -> scmutil -> worker -> copies
+    from . import worker
+
+    nbworkers = worker._numworkers(srcrepo.ui)
+
+    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
+    revsq = multiprocessing.Queue()
+    sidedataq = multiprocessing.Queue()
+
+    assert srcrepo.filtername is None
+    # queue all tasks beforehand, revision numbers are small and it make
+    # synchronisation simpler
+    #
+    # Since the computation for each node can be quite expensive, the overhead
+    # of using a single queue is not revelant. In practice, most computation
+    # are fast but some are very expensive and dominate all the other smaller
+    # cost.
+    for r in srcrepo.changelog.revs():
+        revsq.put(r)
+    # queue the "no more tasks" markers
+    for i in range(nbworkers):
+        revsq.put(None)
+
+    allworkers = []
+    for i in range(nbworkers):
+        args = (srcrepo, revsq, sidedataq, tokens)
+        w = multiprocessing.Process(target=_sidedata_worker, args=args)
+        allworkers.append(w)
+        w.start()
+
+    # dictionnary to store results for revision higher than we one we are
+    # looking for. For example, if we need the sidedatamap for 42, and 43 is
+    # received, when shelve 43 for later use.
+    staging = {}
+
+    def sidedata_companion(revlog, rev):
+        sidedata = {}
+        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
+            # Is the data previously shelved ?
+            sidedata = staging.pop(rev, None)
+            if sidedata is None:
+                # look at the queued result until we find the one we are lookig
+                # for (shelve the other ones)
+                r, sidedata = sidedataq.get()
+                while r != rev:
+                    staging[r] = sidedata
+                    r, sidedata = sidedataq.get()
+            tokens.release()
+        return False, (), sidedata
+
+    return sidedata_companion
+
+
+def _get_simple_sidedata_adder(srcrepo, destrepo):
+    """The simple version of the sidedata computation
+
+    It just compute it in the same thread on request"""
+
     def sidedatacompanion(revlog, rev):
         sidedata = {}
         if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog


More information about the Mercurial-devel mailing list