D1459: workers: handling exceptions in windows workers

wlis (Wojciech Lis) phabricator at mercurial-scm.org
Mon Nov 20 13:37:15 EST 2017


wlis created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers.
  If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received.
  
  We don't have to join threads if is_alive() is false

TEST PLAN
  Ran multiple updates/enable/disable sparse profile and things worked well
  
  Ran test on CentOS- all tests passing on @ passed here
  
  Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088
  PS C:\open\<repo>> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile <profile>
  updating [==>                                                                                                                ]  1300/39166 1m57sException in thread Thread-3:
  Traceback (most recent call last):
  
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
      self.run()
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
      raise e
  
  Exception: Forged exception
  
  Exception in thread Thread-2:
  Traceback (most recent call last):
  
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner
      self.run()
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run
      raise e
  
  Exception: Forged exception
  <...>
  Traceback (most recent call last):
  
    File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in <module>
      dispatch.run()
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run
      status = (dispatch(req) or 0) & 255
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch
      ret = _runcatch(req)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch
      return _callcatch(ui, _runcatchfunc)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch
      return scmutil.callcatch(ui, func)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch
      return func()
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc
      return _dispatch(req)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch
      cmdpats, cmdoptions)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand
      return orig(lui, repo, *args, **kwargs)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper
      result = orig(lui, repo, cmd, fullargs, *args)
    File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand
      return orig(lui, repo, cmd, fullargs, *args)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles
      res = runcommand(lui, repo, *args)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes
      res = runcommand(lui, repo, *args)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand
      return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand
      result = orig(lui, repo, cmd, fullargs, *args)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand
      ret = _runcommand(ui, options, cmd, d)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand
      return cmdfunc()
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in <lambda>
      d = lambda: util.checksignature(func)(ui, *args, **strcmdopt)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check
      return func(*args, **kwargs)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse
      disableprofile=disableprofile, force=force)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config
      len, _refresh(ui, repo, oldstatus, oldsparsematch, force))
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh
      mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False)
    File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates
      return orig(repo, actions, wctx, mctx, overwrite, labels=labels)
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates
      for i, item in prog:
    File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker
      raise t.exception
  
  Exception: Forged exception
  PS C:\open\ovrsource>

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1459

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -214,18 +214,45 @@
             self._resultqueue = resultqueue
             self._func = func
             self._staticargs = staticargs
+            self._interrupted = False
+            self.exception = None
+
+        def interrupt(self):
+            self._interrupted = True
 
         def run(self):
-            while not self._taskqueue.empty():
-                try:
-                    args = self._taskqueue.get_nowait()
-                    for res in self._func(*self._staticargs + (args,)):
-                        self._resultqueue.put(res)
-                except util.empty:
-                    break
+            try:
+                while not self._taskqueue.empty():
+                    try:
+                        args = self._taskqueue.get_nowait()
+                        for res in self._func(*self._staticargs + (args,)):
+                            self._resultqueue.put(res)
+                            # threading doesn't provide a native way to
+                            # interrupt execution. handle it manually at every
+                            # iteration.
+                            if self._interrupted:
+                                return
+                    except util.empty:
+                        break
+            except Exception as e:
+                # store the exception such that the main thread can resurface
+                # it as if the func was running without workers.
+                self.exception = e
+                raise
+
+    threads = []
+    def killworkers():
+        for t in threads:
+            t.interrupt()
+        for t in threads:
+            # try to let the threads handle interruption, but don't wait
+            # indefintely. the thread could be in infinite loop, handling
+            # a very long task or in a deadlock situation
+            t.join(5)
+            if t.is_alive():
+                raise error.Abort(_('failed to join worker thread'))
 
     workers = _numworkers(ui)
-    threads = []
     resultqueue = util.queue()
     taskqueue = util.queue()
     # partition work to more pieces than workers to minimize the chance
@@ -236,12 +263,24 @@
         t = Worker(taskqueue, resultqueue, func, staticargs)
         threads.append(t)
         t.start()
-    while any(t.is_alive() for t in threads):
+
+    while len(threads) > 0:
         while not resultqueue.empty():
             yield resultqueue.get()
-        t = threads[0]
-        t.join(0.05)
-        if not t.is_alive():
+        threads[0].join(0.05)
+        finishedthreads = [_t for _t in threads if not _t.is_alive()]
+        for t in finishedthreads:
+            if t.exception is not None:
+                try:
+                    killworkers()
+                except Exception:
+                    # pass over the workers joining failure. it is more
+                    # important to surface the inital exception than the
+                    # fact that one of workers may be processing a large
+                    # task and does not get to handle the interruption.
+                    ui.warn(_("failed to kill worker threads while handling "
+                              "an exception"))
+                raise t.exception
             threads.remove(t)
     while not resultqueue.empty():
         yield resultqueue.get()



To: wlis, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list