[PATCH 3 of 7] commandserver: add new forking server implemented without using SocketServer

Jun Wu quark at fb.com
Fri Jul 15 11:27:56 EDT 2016


Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900:
> # HG changeset patch
> # User Yuya Nishihara <yuya at tcha.org>
> # Date 1463884998 -32400
> #      Sun May 22 11:43:18 2016 +0900
> # Node ID e56508ea57c23af68d05d941d344321b6ade8184
> # Parent  e374627f118e8aca6243d263154aab9309abf1fc
> commandserver: add new forking server implemented without using SocketServer
> 
> SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as:
> 
>  - race condition that leads to 100% CPU usage (Python 2.6)
>    https://bugs.python.org/issue21491 
>  - can't wait for children belonging to different process groups (Python 2.6)
>  - leaves at least one zombie process (Python 2.6, 2.7)
>    https://bugs.python.org/issue11109 
> 
> The first two are critical because we do setpgid(0, 0) in child process to
> isolate terminal signals. The last one isn't, but ForkingMixIn seems to be
> doing silly. So there are two choices:
> 
>  a) backport and maintain SocketServer until we can drop support for Python 2.x
>  b) replace SocketServer by simpler one and eliminate glue codes
> 
> I chose (b) because it's great time for getting rid of utterly complicated
> SocketServer stuff, and preparing for future move towards prefork service.
> 
> New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It
> is monolithic but much simpler than SocketServer. unixservicehandler provides
> customizing points for chg, and it will be shared with future prefork service.
> 
> Old unixservice class is still used by chgserver. It will be removed later.
> 
> Thanks to Jun Wu for investigating these issues.
> 
> diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py
> --- a/mercurial/commandserver.py
> +++ b/mercurial/commandserver.py
> @@ -11,6 +11,9 @@ import errno
>  import gc
>  import os
>  import random
> +import select
> +import signal
> +import socket
>  import struct
>  import sys
>  import traceback
> @@ -385,6 +388,41 @@ def _serverequest(ui, repo, conn, create
>              # trigger __del__ since ForkingMixIn uses os._exit
>              gc.collect()
>  
> +class unixservicehandler(object):
> +    """Set of pluggable operations for unix-mode services
> +
> +    Almost all methods except for createcmdserver() are called in the main
> +    process. You can't pass mutable resource back from createcmdserver().
> +    """
> +
> +    pollinterval = None
> +
> +    def __init__(self, ui):
> +        self.ui = ui

Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui"
whenever possible.

For chgunixservicehandler, we can change its signature a bit and pass the
configs from the caller: __init__(self, idletimeout, skiphash)

> +    def bindsocket(self, sock, address):
> +        util.bindunixsocket(sock, address)
> +
> +    def unlinksocket(self, address):
> +        os.unlink(address)
> +
> +    def printbanner(self, address):
> +        self.ui.status(_('listening at %s\n') % address)
> +        self.ui.flush()  # avoid buffering of status message

To get rid of "self.ui", it could be a boolean, or a function returning a
string.

> +    def shouldexit(self):
> +        """True if server should shut down; checked per pollinterval"""
> +        return False
> +
> +    def newconnection(self):
> +        """Called when main process notices new connection"""
> +        pass
> +
> +    def createcmdserver(self, repo, conn, fin, fout):

And add a ui parameter here so we can remove "self.ui = ui". As "repo" is
here already, it will feel more consistent.

> +        """Create new command server instance; called in the process that
> +        serves for the current connection"""
> +        return server(self.ui, repo, fin, fout)
> +
>  class _requesthandler(socketserver.BaseRequestHandler):
>      def handle(self):
>          _serverequest(self.server.ui, self.server.repo, self.request,
> @@ -424,9 +462,96 @@ class unixservice(object):
>          finally:
>              self._cleanup()
>  
> +class unixforkingservice(unixservice):
> +    def __init__(self, ui, repo, opts, handler=None):
> +        super(unixforkingservice, self).__init__(ui, repo, opts)
> +        self._servicehandler = handler or unixservicehandler(ui)
> +        self._sock = None
> +        self._oldsigchldhandler = None
> +        self._workerpids = set()  # updated by signal handler; do not iterate
> +
> +    def init(self):
> +        self._sock = socket.socket(socket.AF_UNIX)
> +        self._servicehandler.bindsocket(self._sock, self.address)
> +        self._sock.listen(5)

Maybe make 5 a class constant, or as we have ui, read from some config
option. But that can be a later patch.

> +        o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
> +        self._oldsigchldhandler = o
> +        self._servicehandler.printbanner(self.address)
> +
> +    def _cleanup(self):
> +        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
> +        self._sock.close()
> +        self._servicehandler.unlinksocket(self.address)
> +        # don't kill child processes as they have active clients, just wait
> +        self._reapworkers(0)
> +
> +    def run(self):
> +        try:
> +            self._mainloop()
> +        finally:
> +            self._cleanup()
> +
> +    def _mainloop(self):
> +        h = self._servicehandler
> +        while not h.shouldexit():
> +            try:
> +                ready = select.select([self._sock], [], [], h.pollinterval)[0]
> +                if not ready:
> +                    continue
> +                conn, _addr = self._sock.accept()
> +            except (select.error, socket.error) as inst:
> +                if inst.args[0] == errno.EINTR:
> +                    continue
> +                raise
> +
> +            pid = os.fork()
> +            if pid:
> +                try:
> +                    self.ui.debug('forked worker process (pid=%d)\n' % pid)
> +                    self._workerpids.add(pid)
> +                    h.newconnection()
> +                finally:
> +                    conn.close()  # release handle in parent process
> +            else:
> +                try:
> +                    self._serveworker(conn)
> +                    conn.close()
> +                    os._exit(0)
> +                except:  # never return, hence no re-raises
> +                    try:
> +                        self.ui.traceback(force=True)
> +                    finally:
> +                        os._exit(255)
> +
> +    def _sigchldhandler(self, signal, frame):
> +        self._reapworkers(os.WNOHANG)
> +
> +    def _reapworkers(self, options):
> +        while self._workerpids:
> +            try:
> +                pid, _status = os.waitpid(-1, options)
> +            except OSError as inst:
> +                if inst.errno == errno.EINTR:
> +                    continue
> +                if inst.errno != errno.ECHILD:
> +                    raise
> +                # no child processes at all (reaped by other waitpid()?)
> +                self._workerpids.clear()
> +                return
> +            if pid == 0:
> +                # no waitable child processes
> +                return
> +            self.ui.debug('worker process exited (pid=%d)\n' % pid)
> +            self._workerpids.discard(pid)
> +
> +    def _serveworker(self, conn):

I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb
and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it
to mainloop (makes sense to me since it's simple and the fork logic is
already there, and I think we don't care about subclass now).

> +        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
> +        h = self._servicehandler
> +        _serverequest(self.ui, self.repo, conn, h.createcmdserver)
> +
>  _servicemap = {
>      'pipe': pipeservice,
> -    'unix': unixservice,
> +    'unix': unixforkingservice,
>      }
>  
>  def createservice(ui, repo, opts):


More information about the Mercurial-devel mailing list