[PATCH 09 of 11] worker: allow a function to be run in multiple worker processes
Bryan O'Sullivan
bos at serpentine.com
Sat Feb 9 08:06:49 CST 2013
# HG changeset patch
# User Bryan O'Sullivan <bryano at fb.com>
# Date 1360418465 0
# Node ID dfb4e4bfedfc6db5fddf08b95276de01f1b1366d
# Parent 023956ed1b098df7f93a5c1857b2f5ec00f3e45d
worker: allow a function to be run in multiple worker processes
If we estimate that it will be worth the cost, we run the function in
multiple processes. Otherwise, we run it in-process.
Children report progress to the parent through a pipe.
Not yet implemented on Windows.
diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -5,7 +5,7 @@
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
-import os
+import os, signal, sys
def countcpus():
'''try to count the number of CPUs on the system'''
@@ -54,3 +54,58 @@ def worthwhile(costperop, nops):
linear = costperop * nops
benefit = linear - (_startupcost * _numworkers + linear / _numworkers)
return benefit >= 0.15
+
+def worker(costperarg, func, staticargs, args):
+ '''run a function, possibly in parallel in multiple worker
+ processes.
+
+ returns a progress iterator
+
+ costperarg - cost of a single task
+
+ func - function to run
+
+ staticargs - arguments to pass to every invocation of the function
+
+ args - arguments to split into chunks, to pass to individual
+ workers
+ '''
+ if worthwhile(costperarg, len(args)):
+ return _platformworker(func, staticargs, args)
+ return func(*staticargs + (args,))
+
+def _posixworker(func, staticargs, args):
+ rfd, wfd = os.pipe()
+ for pargs in partition(args, _numworkers):
+ pid = os.fork()
+ if pid == 0:
+ try:
+ os.close(rfd)
+ for i, item in func(*(staticargs + (pargs,))):
+ os.write(wfd, '%d %s\n' % (i, item))
+ os._exit(0)
+ except KeyboardInterrupt:
+ os._exit(255)
+ os.close(wfd)
+ fp = os.fdopen(rfd, 'rb', 0)
+ oldhandler = signal.getsignal(signal.SIGINT)
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ def cleanup():
+ # python 2.4 is too dumb for try/yield/finally
+ signal.signal(signal.SIGINT, oldhandler)
+ problems = 0
+ for i in xrange(_numworkers):
+ problems |= os.wait()[1]
+ if problems:
+ sys.exit(1)
+ try:
+ for line in fp:
+ l = line.split(' ', 1)
+ yield int(l[0]), l[1][:-1]
+ except: # re-raises
+ cleanup()
+ raise
+ cleanup()
+
+if os.name != 'nt':
+ _platformworker = _posixworker
More information about the Mercurial-devel
mailing list