D1564: worker: make windows workers daemons

wlis (Wojciech Lis) phabricator at mercurial-scm.org
Thu Nov 30 19:26:11 UTC 2017


wlis created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  The windows workers weren't daemons and were not correctly killed when ctrl-c'd from the terminal. Withi this change when the main thread is killed, all daemons get killed as well.
  I also reduced the time we give to workers to cleanup nicely to not have people ctrl-c'ing when they get inpatient.
  
  The output when threads clened up nicely:
  
    PS C:\<dir>> hg.exe sparse --disable-profile SparseProfiles/<profile>.sparse
    interrupted!
  
  The output when threads don't clenup in 1 sec:
  
    PS C:\<dir> hg.exe sparse --enable-profile SparseProfiles/<profile>.sparse
    failed to kill worker threads while handling an exception
    interrupted!
    Exception in thread Thread-4 (most likely raised during interpreter shutdown):
    PS C:\<dir>>

TEST PLAN
  Run hg command on windows (pull/update/sparse). Ctrl-C'd sparse --enable-profile command that was using threads and observed in proces explorer that all threads got killed.
  ran tests on CentOS

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1564

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -12,6 +12,7 @@
 import signal
 import sys
 import threading
+import time
 
 from .i18n import _
 from . import (
@@ -216,6 +217,7 @@
             self._func = func
             self._staticargs = staticargs
             self._interrupted = False
+            self.daemon = True
             self.exception = None
 
         def interrupt(self):
@@ -242,16 +244,22 @@
                 raise
 
     threads = []
-    def killworkers():
+    def trykillworkers():
+        # Allow up to 1 second to clean worker threads nicely
+        cleanupend = time.time() + 1
         for t in threads:
             t.interrupt()
         for t in threads:
-            # try to let the threads handle interruption, but don't wait
-            # indefintely. the thread could be in infinite loop, handling
-            # a very long task or in a deadlock situation
-            t.join(5)
+            remainingtime = cleanupend - time.time()
+            t.join(remainingtime)
             if t.is_alive():
-                raise error.Abort(_('failed to join worker thread'))
+                # pass over the workers joining failure. it is more
+                # important to surface the inital exception than the
+                # fact that one of workers may be processing a large
+                # task and does not get to handle the interruption.
+                ui.warn(_("failed to kill worker threads while "
+                          "handling an exception\n"))
+                return
 
     workers = _numworkers(ui)
     resultqueue = util.queue()
@@ -264,25 +272,20 @@
         t = Worker(taskqueue, resultqueue, func, staticargs)
         threads.append(t)
         t.start()
-
-    while len(threads) > 0:
-        while not resultqueue.empty():
-            yield resultqueue.get()
-        threads[0].join(0.05)
-        finishedthreads = [_t for _t in threads if not _t.is_alive()]
-        for t in finishedthreads:
-            if t.exception is not None:
-                try:
-                    killworkers()
-                except Exception:
-                    # pass over the workers joining failure. it is more
-                    # important to surface the inital exception than the
-                    # fact that one of workers may be processing a large
-                    # task and does not get to handle the interruption.
-                    ui.warn(_("failed to kill worker threads while handling "
-                              "an exception"))
-                raise t.exception
-            threads.remove(t)
+    try:
+        while len(threads) > 0:
+            while not resultqueue.empty():
+                yield resultqueue.get()
+            threads[0].join(0.05)
+            finishedthreads = [_t for _t in threads if not _t.is_alive()]
+            for t in finishedthreads:
+                if t.exception is not None:
+                    trykillworkers()
+                    raise t.exception
+                threads.remove(t)
+    except KeyboardInterrupt:
+        trykillworkers()
+        raise
     while not resultqueue.empty():
         yield resultqueue.get()
 



To: wlis, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list