[PATCH 3 of 3] worker: handle worker failures more aggressively

Isaac Jurado diptongo at gmail.com
Tue Feb 19 16:23:59 CST 2013


Replying Bryan O'Sullivan:
> # HG changeset patch
> # User Bryan O'Sullivan <bryano at fb.com>
> # Date 1361297550 28800
> # Node ID 42c14cff887e20d033dbaa8f8c00100e807a1149
> # Parent  9ef52f0a93a0cba939742743ff59e4c2a2463fab
> worker: handle worker failures more aggressively
> 
> We now wait for worker processes in a separate thread, so that we can
> spot failures in a timely way, wihout waiting for the progress pipe
> to drain.
> 
> If a worker fails, we recover the pre-parallel-update behaviour of
> failing early by killing its peers before propagating the failure.
> 
> diff --git a/mercurial/worker.py b/mercurial/worker.py
> --- a/mercurial/worker.py
> +++ b/mercurial/worker.py
> @@ -6,7 +6,7 @@
>  # GNU General Public License version 2 or any later version.
>  
>  from i18n import _
> -import os, signal, sys, util
> +import os, signal, sys, threading, util
>  
>  def countcpus():
>      '''try to count the number of CPUs on the system'''
> @@ -77,6 +77,7 @@ def _posixworker(ui, func, staticargs, a
>      workers = _numworkers(ui)
>      oldhandler = signal.getsignal(signal.SIGINT)
>      signal.signal(signal.SIGINT, signal.SIG_IGN)
> +    pids, problem = [], [0]
>      for pargs in partition(args, workers):
>          pid = os.fork()
>          if pid == 0:
> @@ -88,25 +89,40 @@ def _posixworker(ui, func, staticargs, a
>                  os._exit(0)
>              except KeyboardInterrupt:
>                  os._exit(255)
> +        pids.append(pid)
> +    pids.reverse()
>      os.close(wfd)
>      fp = os.fdopen(rfd, 'rb', 0)
> -    def cleanup():
> -        # python 2.4 is too dumb for try/yield/finally
> -        signal.signal(signal.SIGINT, oldhandler)
> -        problem = None
> -        for i in xrange(workers):
> +    def killworkers():
> +        # if one worker bails, there's no good reason to wait for the rest
> +        for p in pids:
> +            try:
> +                os.kill(p, signal.SIGTERM)
> +            except OSError, err:
> +                if err.errno != errno.ESRCH:
> +                    raise
> +    def waitforworkers():
> +        for p in pids:
>              pid, st = os.wait()
>              if st and not problem:
> -                problem = _exitstatus(st)
> -        if problem:
> -            if problem < 0:
> -                os.kill(os.getpid(), -problem)
> -            sys.exit(problem)
> +                problem[0] = _exitstatus(st)

This single element list looks quite strange.  Does it have to do with
working around the closure or the scoping?  Just curious.

> +                killworkers()
> +    t = threading.Thread(target=waitforworkers)
> +    t.start()

Please pardon my ignorance, but the execution flow is starting to get
confusing.  If I understood correctly, the idea is to kill the other
children whenever one fails, right?

What about putting all workers in the same process group, but different
to the parent?  Something like:

    workers = _numworkers(ui)
    pgid = 0
    for pargs in partition(args, workers):
        pid = os.fork()
        if pid == 0:
            try:
                os.setpgid(0, pgid)
                os.close(rfd)
                for i, item in func(...):
                    os.write(...)
                os._exit(0)
            except KeyboardInterrupt:
                os.kill(0, signal.SIGTERM)  # I can kill my siblings
                os._exit(255)
        elif not pgid:
            # Place the rest of the children in the same group as the
            # first
            pgid = pid

I hope it helps.  Regards.

-- 
Isaac Jurado

"The noblest pleasure is the joy of understanding."
                                  Leonardo da Vinci


More information about the Mercurial-devel mailing list