[PATCH 09 of 11] worker: allow a function to be run in multiple worker processes
Idan Kamara
idankk86 at gmail.com
Sat Feb 9 09:35:26 CST 2013
On Sat, Feb 9, 2013 at 4:06 PM, Bryan O'Sullivan <bos at serpentine.com> wrote:
>
> # HG changeset patch
> # User Bryan O'Sullivan <bryano at fb.com>
> # Date 1360418465 0
> # Node ID dfb4e4bfedfc6db5fddf08b95276de01f1b1366d
> # Parent 023956ed1b098df7f93a5c1857b2f5ec00f3e45d
> worker: allow a function to be run in multiple worker processes
>
> If we estimate that it will be worth the cost, we run the function in
> multiple processes. Otherwise, we run it in-process.
>
> Children report progress to the parent through a pipe.
>
> Not yet implemented on Windows.
>
> diff --git a/mercurial/worker.py b/mercurial/worker.py
> --- a/mercurial/worker.py
> +++ b/mercurial/worker.py
> @@ -5,7 +5,7 @@
> # This software may be used and distributed according to the terms of the
> # GNU General Public License version 2 or any later version.
>
> -import os
> +import os, signal, sys
>
> def countcpus():
> '''try to count the number of CPUs on the system'''
> @@ -54,3 +54,58 @@ def worthwhile(costperop, nops):
> linear = costperop * nops
> benefit = linear - (_startupcost * _numworkers + linear /
> _numworkers)
> return benefit >= 0.15
> +
> +def worker(costperarg, func, staticargs, args):
> + '''run a function, possibly in parallel in multiple worker
> + processes.
> +
> + returns a progress iterator
> +
> + costperarg - cost of a single task
> +
> + func - function to run
> +
> + staticargs - arguments to pass to every invocation of the function
> +
> + args - arguments to split into chunks, to pass to individual
> + workers
> + '''
> + if worthwhile(costperarg, len(args)):
> + return _platformworker(func, staticargs, args)
> + return func(*staticargs + (args,))
> +
> +def _posixworker(func, staticargs, args):
> + rfd, wfd = os.pipe()
> + for pargs in partition(args, _numworkers):
> + pid = os.fork()
> + if pid == 0:
> + try:
> + os.close(rfd)
> + for i, item in func(*(staticargs + (pargs,))):
> + os.write(wfd, '%d %s\n' % (i, item))
> + os._exit(0)
> + except KeyboardInterrupt:
> + os._exit(255)
Isn't it a problem you're not exiting here for other exceptions
too, assuming this code isn't run by 'hg' (e.g. things that
import mercurial, or the command server)?
> + os.close(wfd)
> + fp = os.fdopen(rfd, 'rb', 0)
> + oldhandler = signal.getsignal(signal.SIGINT)
> + signal.signal(signal.SIGINT, signal.SIG_IGN)
> + def cleanup():
> + # python 2.4 is too dumb for try/yield/finally
> + signal.signal(signal.SIGINT, oldhandler)
> + problems = 0
> + for i in xrange(_numworkers):
> + problems |= os.wait()[1]
> + if problems:
> + sys.exit(1)
This exit is also going to be a problem for things that
call into the internal API.
And doesn't it also lose the cause of the error?
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://selenic.com/pipermail/mercurial-devel/attachments/20130209/4c41b051/attachment.html>
More information about the Mercurial-devel
mailing list