[PATCH 2 of 4] worker: rewrite error handling so os._exit covers all cases

Jun Wu quark at fb.com
Tue May 2 20:09:36 EDT 2017


# HG changeset patch
# User Jun Wu <quark at fb.com>
# Date 1492905008 25200
#      Sat Apr 22 16:50:08 2017 -0700
# Node ID 44a3524b51508ebc0175f7e89196341f6cf61d64
# Parent  59296cf853f194570bfa4e714e412c427851c4ca
# Available At https://bitbucket.org/quark-zju/hg-draft
#              hg pull https://bitbucket.org/quark-zju/hg-draft -r 44a3524b5150
worker: rewrite error handling so os._exit covers all cases

Previously the worker error handling is like:

    pid = os.fork()   --+
    if pid == 0:        |
        ....            | problematic
        ....          --+
        try:          --+
            ....        | worker error handling
                      --+

If a signal arrives when Python is executing the "problematic" lines, an
external error handling (dispatch.py) will take over the control flow and
it's no longer guaranteed "os._exit" is called (see 86cd09bc13ba for why it
is necessary).

This patch rewrites the error handling so it covers all possible code paths
for a worker even during fork.

Note: "os.getpid() == parentpid" is used to test if the process is parent or
not intentionally, instead of checking "pid", because "pid = os.fork()" may
be not atomic - it's possible that that a signal hits the worker before the
assignment completes [1].  The newly added test replaces "os.fork" to
exercise that extreme case.

[1]: CPython compiles "pid = os.fork()" to 2 byte codes: "CALL_FUNCTION" and
"STORE_FAST", so it's probably not atomic:

    def f():
        pid = os.fork()

    dis.dis(f)
      2           0 LOAD_GLOBAL              0 (os)
                  3 LOAD_ATTR                1 (fork)
                  6 CALL_FUNCTION            0
                  9 STORE_FAST               0 (pid)
                 12 LOAD_CONST               0 (None)
                 15 RETURN_VALUE

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -135,35 +135,41 @@ def _posixworker(ui, func, staticargs, a
     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
     ui.flush()
+    parentpid = os.getpid()
     for pargs in partition(args, workers):
-        pid = os.fork()
-        if pid == 0:
-            signal.signal(signal.SIGINT, oldhandler)
-            signal.signal(signal.SIGCHLD, oldchldhandler)
-
-            def workerfunc():
-                os.close(rfd)
-                for i, item in func(*(staticargs + (pargs,))):
-                    os.write(wfd, '%d %s\n' % (i, item))
-                return 0
+        # make sure we use os._exit in all worker code paths. otherwise the
+        # worker may do some clean-ups which could cause surprises like
+        # deadlock. see sshpeer.cleanup for example.
+        # override error handling *before* fork. this is necessary because
+        # exception (signal) may arrive after fork, before "pid =" assignment
+        # completes, and other exception handler (dispatch.py) can lead to
+        # unexpected code path without os._exit.
+        ret = -1
+        try:
+            pid = os.fork()
+            if pid == 0:
+                signal.signal(signal.SIGINT, oldhandler)
+                signal.signal(signal.SIGCHLD, oldchldhandler)
 
-            # make sure we use os._exit in all code paths. otherwise the worker
-            # may do some clean-ups which could cause surprises like deadlock.
-            # see sshpeer.cleanup for example.
-            ret = 0
-            try:
+                def workerfunc():
+                    os.close(rfd)
+                    for i, item in func(*(staticargs + (pargs,))):
+                        os.write(wfd, '%d %s\n' % (i, item))
+                    return 0
+
+                ret = scmutil.callcatch(ui, workerfunc)
+        except: # parent re-raises, child never returns
+            if os.getpid() == parentpid:
+                raise
+            exctype = sys.exc_info()[0]
+            force = not issubclass(exctype, KeyboardInterrupt)
+            ui.traceback(force=force)
+        finally:
+            if os.getpid() != parentpid:
                 try:
-                    ret = scmutil.callcatch(ui, workerfunc)
-                finally:
                     ui.flush()
-            except KeyboardInterrupt:
-                os._exit(255)
-            except: # never return, therefore no re-raises
-                try:
-                    ui.traceback(force=True)
-                    ui.flush()
+                except: # never returns, no re-raises
+                    pass
                 finally:
-                    os._exit(255)
-            else:
-                os._exit(ret & 255)
+                    os._exit(ret & 255)
         pids.add(pid)
     os.close(wfd)
diff --git a/tests/test-worker.t b/tests/test-worker.t
--- a/tests/test-worker.t
+++ b/tests/test-worker.t
@@ -92,3 +92,35 @@ Traceback must be printed for unknown ex
   Traceback (most recent call last):
 
+Workers should not do cleanups in all cases
+
+  $ cat > $TESTTMP/detectcleanup.py <<EOF
+  > from __future__ import absolute_import
+  > import atexit
+  > import os
+  > import time
+  > oldfork = os.fork
+  > count = 0
+  > parentpid = os.getpid()
+  > def delayedfork():
+  >     global count
+  >     count += 1
+  >     pid = oldfork()
+  >     # make it easier to test SIGTERM hitting other workers when they have
+  >     # not set up error handling yet.
+  >     if count > 1 and pid == 0:
+  >         time.sleep(0.1)
+  >     return pid
+  > os.fork = delayedfork
+  > def cleanup():
+  >     if os.getpid() != parentpid:
+  >         os.write(1, 'should never happen\n')
+  > atexit.register(cleanup)
+  > EOF
+
+  $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
+  > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort
+  start
+  abort: known exception
+  [255]
+
 #endif


More information about the Mercurial-devel mailing list