[PATCH 09 of 11] worker: allow a function to be run in multiple worker processes

Bryan O'Sullivan bos at serpentine.com
Sat Feb 9 08:06:49 CST 2013


# HG changeset patch
# User Bryan O'Sullivan <bryano at fb.com>
# Date 1360418465 0
# Node ID dfb4e4bfedfc6db5fddf08b95276de01f1b1366d
# Parent  023956ed1b098df7f93a5c1857b2f5ec00f3e45d
worker: allow a function to be run in multiple worker processes

If we estimate that it will be worth the cost, we run the function in
multiple processes. Otherwise, we run it in-process.

Children report progress to the parent through a pipe.

Not yet implemented on Windows.

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -5,7 +5,7 @@
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
-import os
+import os, signal, sys
 
 def countcpus():
     '''try to count the number of CPUs on the system'''
@@ -54,3 +54,58 @@ def worthwhile(costperop, nops):
     linear = costperop * nops
     benefit = linear - (_startupcost * _numworkers + linear / _numworkers)
     return benefit >= 0.15
+
+def worker(costperarg, func, staticargs, args):
+    '''run a function, possibly in parallel in multiple worker
+    processes.
+
+    returns a progress iterator
+
+    costperarg - cost of a single task
+
+    func - function to run
+
+    staticargs - arguments to pass to every invocation of the function
+
+    args - arguments to split into chunks, to pass to individual
+    workers
+    '''
+    if worthwhile(costperarg, len(args)):
+        return _platformworker(func, staticargs, args)
+    return func(*staticargs + (args,))
+
+def _posixworker(func, staticargs, args):
+    rfd, wfd = os.pipe()
+    for pargs in partition(args, _numworkers):
+        pid = os.fork()
+        if pid == 0:
+            try:
+                os.close(rfd)
+                for i, item in func(*(staticargs + (pargs,))):
+                    os.write(wfd, '%d %s\n' % (i, item))
+                os._exit(0)
+            except KeyboardInterrupt:
+                os._exit(255)
+    os.close(wfd)
+    fp = os.fdopen(rfd, 'rb', 0)
+    oldhandler = signal.getsignal(signal.SIGINT)
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
+    def cleanup():
+        # python 2.4 is too dumb for try/yield/finally
+        signal.signal(signal.SIGINT, oldhandler)
+        problems = 0
+        for i in xrange(_numworkers):
+            problems |= os.wait()[1]
+        if problems:
+            sys.exit(1)
+    try:
+        for line in fp:
+            l = line.split(' ', 1)
+            yield int(l[0]), l[1][:-1]
+    except: # re-raises
+        cleanup()
+        raise
+    cleanup()
+
+if os.name != 'nt':
+    _platformworker = _posixworker


More information about the Mercurial-devel mailing list