D1458: workers: implemented worker on windows

wlis (Wojciech Lis) phabricator at mercurial-scm.org
Mon Nov 20 13:37:14 EST 2017


wlis created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This change implements thread based worker on windows.
  The handling of exception from within threads will happen in separate diff.
  
  The worker is for now used in mercurial/merge.py and in lfs extension
  
  After multiple tests and milions of files materiealized, thousands lfs fetched
  it seems that neither merge.py nor lfs/blobstore.py are not thread safe. I also
  looked through the code and besides the backgroundfilecloser (handled in base
  of this) things look good.
  
  The performance boost of this on windows is
  
  ~50% for sparse --enable-profile
  
  - Speedup of hg up/rebase - not exactly measured

TEST PLAN
  Ran 10s of hg sparse --enable-profile and --disable-profile operations on large profiles and verified that workers are running. Used sysinternals suite to see that all threads are spawned and run as they should
  
  Run various other operations on the repo including update and rebase
  
  Ran tests on CentOS and all tests that pass on @ pass here

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1458

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -11,6 +11,7 @@
 import os
 import signal
 import sys
+import threading
 
 from .i18n import _
 from . import (
@@ -53,7 +54,7 @@
             raise error.Abort(_('number of cpus must be an integer'))
     return min(max(countcpus(), 4), 32)
 
-if pycompat.isposix:
+if pycompat.isposix or pycompat.iswindows:
     _startupcost = 0.01
 else:
     _startupcost = 1e30
@@ -203,7 +204,51 @@
     elif os.WIFSIGNALED(code):
         return -os.WTERMSIG(code)
 
-if not pycompat.iswindows:
+def _windowsworker(ui, func, staticargs, args):
+    class Worker(threading.Thread):
+        def __init__(self, taskqueue, resultqueue, func, staticargs,
+                     group=None, target=None, name=None, verbose=None):
+            threading.Thread.__init__(self, group=group, target=target,
+                                      name=name, verbose=verbose)
+            self._taskqueue = taskqueue
+            self._resultqueue = resultqueue
+            self._func = func
+            self._staticargs = staticargs
+
+        def run(self):
+            while not self._taskqueue.empty():
+                try:
+                    args = self._taskqueue.get_nowait()
+                    for res in self._func(*self._staticargs + (args,)):
+                        self._resultqueue.put(res)
+                except util.empty:
+                    break
+
+    workers = _numworkers(ui)
+    threads = []
+    resultqueue = util.queue()
+    taskqueue = util.queue()
+    # partition work to more pieces than workers to minimize the chance
+    # of uneven distribution of large tasks between the workers
+    for pargs in partition(args, workers * 20):
+        taskqueue.put(pargs)
+    for _i in range(workers):
+        t = Worker(taskqueue, resultqueue, func, staticargs)
+        threads.append(t)
+        t.start()
+    while any(t.is_alive() for t in threads):
+        while not resultqueue.empty():
+            yield resultqueue.get()
+        t = threads[0]
+        t.join(0.05)
+        if not t.is_alive():
+            threads.remove(t)
+    while not resultqueue.empty():
+        yield resultqueue.get()
+
+if pycompat.iswindows:
+    _platformworker = _windowsworker
+else:
     _platformworker = _posixworker
     _exitstatus = _posixexitstatus
 



To: wlis, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list