D3960: worker: use one pipe per posix worker and select() in parent process
hooper (Danny Hooper)
phabricator at mercurial-scm.org
Tue Jul 17 22:44:10 UTC 2018
hooper created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.
REVISION SUMMARY
This allows us to pass results larger than PIPE_BUF through the pipes without
interleaving them. This is necessary now because "hg fix" sends file contents
as the result from workers.
REPOSITORY
rHG Mercurial
REVISION DETAIL
https://phab.mercurial-scm.org/D3960
AFFECTED FILES
mercurial/worker.py
CHANGE DETAILS
diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -9,6 +9,7 @@
import errno
import os
+import select
import signal
import sys
import threading
@@ -89,7 +90,6 @@
return func(*staticargs + (args,))
def _posixworker(ui, func, staticargs, args):
- rfd, wfd = os.pipe()
workers = _numworkers(ui)
oldhandler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +138,15 @@
oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
ui.flush()
parentpid = os.getpid()
+ pipes = []
for pargs in partition(args, workers):
+ # Every worker gets its own pipe to send results on, so we don't have to
+ # implement atomic writes larger than PIPE_BUF. Each forked process has
+ # its own pipe's descriptors in the local variables, and the parent
+ # process has the full list of pipe descriptors (and it doesn't really
+ # care what order they're in).
+ rfd, wfd = os.pipe()
+ pipes.append((rfd, wfd))
# make sure we use os._exit in all worker code paths. otherwise the
# worker may do some clean-ups which could cause surprises like
# deadlock. see sshpeer.cleanup for example.
@@ -175,8 +183,10 @@
finally:
os._exit(ret & 255)
pids.add(pid)
- os.close(wfd)
- fp = os.fdopen(rfd, r'rb', 0)
+ fps = []
+ for rfd, wfd in pipes:
+ os.close(wfd)
+ fps.append(os.fdopen(rfd, r'rb', 0))
def cleanup():
signal.signal(signal.SIGINT, oldhandler)
waitforworkers()
@@ -187,15 +197,23 @@
os.kill(os.getpid(), -status)
sys.exit(status)
try:
- while True:
+ while fps:
try:
- yield util.pickle.load(fp)
- except EOFError:
- break
- except IOError as e:
- if e.errno == errno.EINTR:
+ rlist, wlist, xlist = select.select(fps, [], fps)
+ except select.error as e:
+ if e[0] == errno.EINTR:
continue
raise
+ for fp in rlist + xlist:
+ try:
+ yield util.pickle.load(fp)
+ except EOFError:
+ fp.close()
+ except IOError as e:
+ if e.errno == errno.EINTR:
+ continue
+ raise
+ fps = [fp for fp in fps if not fp.closed]
except: # re-raises
killworkers()
cleanup()
To: hooper, #hg-reviewers
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list