D3960: worker: use one pipe per posix worker and select() in parent process

hooper (Danny Hooper) phabricator at mercurial-scm.org
Tue Jul 17 22:44:10 UTC 2018


hooper created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This allows us to pass results larger than PIPE_BUF through the pipes without
  interleaving them. This is necessary now because "hg fix" sends file contents
  as the result from workers.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -9,6 +9,7 @@
 
 import errno
 import os
+import select
 import signal
 import sys
 import threading
@@ -89,7 +90,6 @@
     return func(*staticargs + (args,))
 
 def _posixworker(ui, func, staticargs, args):
-    rfd, wfd = os.pipe()
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +138,15 @@
     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
     ui.flush()
     parentpid = os.getpid()
+    pipes = []
     for pargs in partition(args, workers):
+        # Every worker gets its own pipe to send results on, so we don't have to
+        # implement atomic writes larger than PIPE_BUF. Each forked process has
+        # its own pipe's descriptors in the local variables, and the parent
+        # process has the full list of pipe descriptors (and it doesn't really
+        # care what order they're in).
+        rfd, wfd = os.pipe()
+        pipes.append((rfd, wfd))
         # make sure we use os._exit in all worker code paths. otherwise the
         # worker may do some clean-ups which could cause surprises like
         # deadlock. see sshpeer.cleanup for example.
@@ -175,8 +183,10 @@
                 finally:
                     os._exit(ret & 255)
         pids.add(pid)
-    os.close(wfd)
-    fp = os.fdopen(rfd, r'rb', 0)
+    fps = []
+    for rfd, wfd in pipes:
+        os.close(wfd)
+        fps.append(os.fdopen(rfd, r'rb', 0))
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)
         waitforworkers()
@@ -187,15 +197,23 @@
                 os.kill(os.getpid(), -status)
             sys.exit(status)
     try:
-        while True:
+        while fps:
             try:
-                yield util.pickle.load(fp)
-            except EOFError:
-                break
-            except IOError as e:
-                if e.errno == errno.EINTR:
+                rlist, wlist, xlist = select.select(fps, [], fps)
+            except select.error as e:
+                if e[0] == errno.EINTR:
                     continue
                 raise
+            for fp in rlist + xlist:
+                try:
+                    yield util.pickle.load(fp)
+                except EOFError:
+                    fp.close()
+                except IOError as e:
+                    if e.errno == errno.EINTR:
+                        continue
+                    raise
+            fps = [fp for fp in fps if not fp.closed]
     except: # re-raises
         killworkers()
         cleanup()



To: hooper, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list