[PATCH 3 of 7] commandserver: add new forking server implemented without using SocketServer
Jun Wu
quark at fb.com
Fri Jul 15 11:27:56 EDT 2016
Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900:
> # HG changeset patch
> # User Yuya Nishihara <yuya at tcha.org>
> # Date 1463884998 -32400
> # Sun May 22 11:43:18 2016 +0900
> # Node ID e56508ea57c23af68d05d941d344321b6ade8184
> # Parent e374627f118e8aca6243d263154aab9309abf1fc
> commandserver: add new forking server implemented without using SocketServer
>
> SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as:
>
> - race condition that leads to 100% CPU usage (Python 2.6)
> https://bugs.python.org/issue21491
> - can't wait for children belonging to different process groups (Python 2.6)
> - leaves at least one zombie process (Python 2.6, 2.7)
> https://bugs.python.org/issue11109
>
> The first two are critical because we do setpgid(0, 0) in child process to
> isolate terminal signals. The last one isn't, but ForkingMixIn seems to be
> doing silly. So there are two choices:
>
> a) backport and maintain SocketServer until we can drop support for Python 2.x
> b) replace SocketServer by simpler one and eliminate glue codes
>
> I chose (b) because it's great time for getting rid of utterly complicated
> SocketServer stuff, and preparing for future move towards prefork service.
>
> New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It
> is monolithic but much simpler than SocketServer. unixservicehandler provides
> customizing points for chg, and it will be shared with future prefork service.
>
> Old unixservice class is still used by chgserver. It will be removed later.
>
> Thanks to Jun Wu for investigating these issues.
>
> diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py
> --- a/mercurial/commandserver.py
> +++ b/mercurial/commandserver.py
> @@ -11,6 +11,9 @@ import errno
> import gc
> import os
> import random
> +import select
> +import signal
> +import socket
> import struct
> import sys
> import traceback
> @@ -385,6 +388,41 @@ def _serverequest(ui, repo, conn, create
> # trigger __del__ since ForkingMixIn uses os._exit
> gc.collect()
>
> +class unixservicehandler(object):
> + """Set of pluggable operations for unix-mode services
> +
> + Almost all methods except for createcmdserver() are called in the main
> + process. You can't pass mutable resource back from createcmdserver().
> + """
> +
> + pollinterval = None
> +
> + def __init__(self, ui):
> + self.ui = ui
Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui"
whenever possible.
For chgunixservicehandler, we can change its signature a bit and pass the
configs from the caller: __init__(self, idletimeout, skiphash)
> + def bindsocket(self, sock, address):
> + util.bindunixsocket(sock, address)
> +
> + def unlinksocket(self, address):
> + os.unlink(address)
> +
> + def printbanner(self, address):
> + self.ui.status(_('listening at %s\n') % address)
> + self.ui.flush() # avoid buffering of status message
To get rid of "self.ui", it could be a boolean, or a function returning a
string.
> + def shouldexit(self):
> + """True if server should shut down; checked per pollinterval"""
> + return False
> +
> + def newconnection(self):
> + """Called when main process notices new connection"""
> + pass
> +
> + def createcmdserver(self, repo, conn, fin, fout):
And add a ui parameter here so we can remove "self.ui = ui". As "repo" is
here already, it will feel more consistent.
> + """Create new command server instance; called in the process that
> + serves for the current connection"""
> + return server(self.ui, repo, fin, fout)
> +
> class _requesthandler(socketserver.BaseRequestHandler):
> def handle(self):
> _serverequest(self.server.ui, self.server.repo, self.request,
> @@ -424,9 +462,96 @@ class unixservice(object):
> finally:
> self._cleanup()
>
> +class unixforkingservice(unixservice):
> + def __init__(self, ui, repo, opts, handler=None):
> + super(unixforkingservice, self).__init__(ui, repo, opts)
> + self._servicehandler = handler or unixservicehandler(ui)
> + self._sock = None
> + self._oldsigchldhandler = None
> + self._workerpids = set() # updated by signal handler; do not iterate
> +
> + def init(self):
> + self._sock = socket.socket(socket.AF_UNIX)
> + self._servicehandler.bindsocket(self._sock, self.address)
> + self._sock.listen(5)
Maybe make 5 a class constant, or as we have ui, read from some config
option. But that can be a later patch.
> + o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
> + self._oldsigchldhandler = o
> + self._servicehandler.printbanner(self.address)
> +
> + def _cleanup(self):
> + signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
> + self._sock.close()
> + self._servicehandler.unlinksocket(self.address)
> + # don't kill child processes as they have active clients, just wait
> + self._reapworkers(0)
> +
> + def run(self):
> + try:
> + self._mainloop()
> + finally:
> + self._cleanup()
> +
> + def _mainloop(self):
> + h = self._servicehandler
> + while not h.shouldexit():
> + try:
> + ready = select.select([self._sock], [], [], h.pollinterval)[0]
> + if not ready:
> + continue
> + conn, _addr = self._sock.accept()
> + except (select.error, socket.error) as inst:
> + if inst.args[0] == errno.EINTR:
> + continue
> + raise
> +
> + pid = os.fork()
> + if pid:
> + try:
> + self.ui.debug('forked worker process (pid=%d)\n' % pid)
> + self._workerpids.add(pid)
> + h.newconnection()
> + finally:
> + conn.close() # release handle in parent process
> + else:
> + try:
> + self._serveworker(conn)
> + conn.close()
> + os._exit(0)
> + except: # never return, hence no re-raises
> + try:
> + self.ui.traceback(force=True)
> + finally:
> + os._exit(255)
> +
> + def _sigchldhandler(self, signal, frame):
> + self._reapworkers(os.WNOHANG)
> +
> + def _reapworkers(self, options):
> + while self._workerpids:
> + try:
> + pid, _status = os.waitpid(-1, options)
> + except OSError as inst:
> + if inst.errno == errno.EINTR:
> + continue
> + if inst.errno != errno.ECHILD:
> + raise
> + # no child processes at all (reaped by other waitpid()?)
> + self._workerpids.clear()
> + return
> + if pid == 0:
> + # no waitable child processes
> + return
> + self.ui.debug('worker process exited (pid=%d)\n' % pid)
> + self._workerpids.discard(pid)
> +
> + def _serveworker(self, conn):
I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb
and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it
to mainloop (makes sense to me since it's simple and the fork logic is
already there, and I think we don't care about subclass now).
> + signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
> + h = self._servicehandler
> + _serverequest(self.ui, self.repo, conn, h.createcmdserver)
> +
> _servicemap = {
> 'pipe': pipeservice,
> - 'unix': unixservice,
> + 'unix': unixforkingservice,
> }
>
> def createservice(ui, repo, opts):
More information about the Mercurial-devel
mailing list