[PATCH 09 of 11] worker: allow a function to be run in multiple worker processes

Idan Kamara idankk86 at gmail.com
Sat Feb 9 09:35:26 CST 2013


On Sat, Feb 9, 2013 at 4:06 PM, Bryan O'Sullivan <bos at serpentine.com> wrote:
>
> # HG changeset patch
> # User Bryan O'Sullivan <bryano at fb.com>
> # Date 1360418465 0
> # Node ID dfb4e4bfedfc6db5fddf08b95276de01f1b1366d
> # Parent  023956ed1b098df7f93a5c1857b2f5ec00f3e45d
> worker: allow a function to be run in multiple worker processes
>
> If we estimate that it will be worth the cost, we run the function in
> multiple processes. Otherwise, we run it in-process.
>
> Children report progress to the parent through a pipe.
>
> Not yet implemented on Windows.
>
> diff --git a/mercurial/worker.py b/mercurial/worker.py
> --- a/mercurial/worker.py
> +++ b/mercurial/worker.py
> @@ -5,7 +5,7 @@
>  # This software may be used and distributed according to the terms of the
>  # GNU General Public License version 2 or any later version.
>
> -import os
> +import os, signal, sys
>
>  def countcpus():
>      '''try to count the number of CPUs on the system'''
> @@ -54,3 +54,58 @@ def worthwhile(costperop, nops):
>      linear = costperop * nops
>      benefit = linear - (_startupcost * _numworkers + linear /
> _numworkers)
>      return benefit >= 0.15
> +
> +def worker(costperarg, func, staticargs, args):
> +    '''run a function, possibly in parallel in multiple worker
> +    processes.
> +
> +    returns a progress iterator
> +
> +    costperarg - cost of a single task
> +
> +    func - function to run
> +
> +    staticargs - arguments to pass to every invocation of the function
> +
> +    args - arguments to split into chunks, to pass to individual
> +    workers
> +    '''
> +    if worthwhile(costperarg, len(args)):
> +        return _platformworker(func, staticargs, args)
> +    return func(*staticargs + (args,))
> +
> +def _posixworker(func, staticargs, args):
> +    rfd, wfd = os.pipe()
> +    for pargs in partition(args, _numworkers):
> +        pid = os.fork()
> +        if pid == 0:
> +            try:
> +                os.close(rfd)
> +                for i, item in func(*(staticargs + (pargs,))):
> +                    os.write(wfd, '%d %s\n' % (i, item))
> +                os._exit(0)
> +            except KeyboardInterrupt:
> +                os._exit(255)

Isn't it a problem you're not exiting here for other exceptions
too, assuming this code isn't run by 'hg' (e.g. things that
import mercurial, or the command server)?

> +    os.close(wfd)
> +    fp = os.fdopen(rfd, 'rb', 0)
> +    oldhandler = signal.getsignal(signal.SIGINT)
> +    signal.signal(signal.SIGINT, signal.SIG_IGN)
> +    def cleanup():
> +        # python 2.4 is too dumb for try/yield/finally
> +        signal.signal(signal.SIGINT, oldhandler)
> +        problems = 0
> +        for i in xrange(_numworkers):
> +            problems |= os.wait()[1]
> +        if problems:
> +            sys.exit(1)

This exit is also going to be a problem for things that
call into the internal API.

And doesn't it also lose the cause of the error?
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://selenic.com/pipermail/mercurial-devel/attachments/20130209/4c41b051/attachment.html>


More information about the Mercurial-devel mailing list