[PATCH 3 of 7] commandserver: add new forking server implemented without using SocketServer

Yuya Nishihara yuya at tcha.org
Thu Jul 14 11:20:41 EDT 2016


# HG changeset patch
# User Yuya Nishihara <yuya at tcha.org>
# Date 1463884998 -32400
#      Sun May 22 11:43:18 2016 +0900
# Node ID e56508ea57c23af68d05d941d344321b6ade8184
# Parent  e374627f118e8aca6243d263154aab9309abf1fc
commandserver: add new forking server implemented without using SocketServer

SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as:

 - race condition that leads to 100% CPU usage (Python 2.6)
   https://bugs.python.org/issue21491
 - can't wait for children belonging to different process groups (Python 2.6)
 - leaves at least one zombie process (Python 2.6, 2.7)
   https://bugs.python.org/issue11109

The first two are critical because we do setpgid(0, 0) in child process to
isolate terminal signals. The last one isn't, but ForkingMixIn seems to be
doing silly. So there are two choices:

 a) backport and maintain SocketServer until we can drop support for Python 2.x
 b) replace SocketServer by simpler one and eliminate glue codes

I chose (b) because it's great time for getting rid of utterly complicated
SocketServer stuff, and preparing for future move towards prefork service.

New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It
is monolithic but much simpler than SocketServer. unixservicehandler provides
customizing points for chg, and it will be shared with future prefork service.

Old unixservice class is still used by chgserver. It will be removed later.

Thanks to Jun Wu for investigating these issues.

diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py
--- a/mercurial/commandserver.py
+++ b/mercurial/commandserver.py
@@ -11,6 +11,9 @@ import errno
 import gc
 import os
 import random
+import select
+import signal
+import socket
 import struct
 import sys
 import traceback
@@ -385,6 +388,41 @@ def _serverequest(ui, repo, conn, create
             # trigger __del__ since ForkingMixIn uses os._exit
             gc.collect()
 
+class unixservicehandler(object):
+    """Set of pluggable operations for unix-mode services
+
+    Almost all methods except for createcmdserver() are called in the main
+    process. You can't pass mutable resource back from createcmdserver().
+    """
+
+    pollinterval = None
+
+    def __init__(self, ui):
+        self.ui = ui
+
+    def bindsocket(self, sock, address):
+        util.bindunixsocket(sock, address)
+
+    def unlinksocket(self, address):
+        os.unlink(address)
+
+    def printbanner(self, address):
+        self.ui.status(_('listening at %s\n') % address)
+        self.ui.flush()  # avoid buffering of status message
+
+    def shouldexit(self):
+        """True if server should shut down; checked per pollinterval"""
+        return False
+
+    def newconnection(self):
+        """Called when main process notices new connection"""
+        pass
+
+    def createcmdserver(self, repo, conn, fin, fout):
+        """Create new command server instance; called in the process that
+        serves for the current connection"""
+        return server(self.ui, repo, fin, fout)
+
 class _requesthandler(socketserver.BaseRequestHandler):
     def handle(self):
         _serverequest(self.server.ui, self.server.repo, self.request,
@@ -424,9 +462,96 @@ class unixservice(object):
         finally:
             self._cleanup()
 
+class unixforkingservice(unixservice):
+    def __init__(self, ui, repo, opts, handler=None):
+        super(unixforkingservice, self).__init__(ui, repo, opts)
+        self._servicehandler = handler or unixservicehandler(ui)
+        self._sock = None
+        self._oldsigchldhandler = None
+        self._workerpids = set()  # updated by signal handler; do not iterate
+
+    def init(self):
+        self._sock = socket.socket(socket.AF_UNIX)
+        self._servicehandler.bindsocket(self._sock, self.address)
+        self._sock.listen(5)
+        o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
+        self._oldsigchldhandler = o
+        self._servicehandler.printbanner(self.address)
+
+    def _cleanup(self):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        self._sock.close()
+        self._servicehandler.unlinksocket(self.address)
+        # don't kill child processes as they have active clients, just wait
+        self._reapworkers(0)
+
+    def run(self):
+        try:
+            self._mainloop()
+        finally:
+            self._cleanup()
+
+    def _mainloop(self):
+        h = self._servicehandler
+        while not h.shouldexit():
+            try:
+                ready = select.select([self._sock], [], [], h.pollinterval)[0]
+                if not ready:
+                    continue
+                conn, _addr = self._sock.accept()
+            except (select.error, socket.error) as inst:
+                if inst.args[0] == errno.EINTR:
+                    continue
+                raise
+
+            pid = os.fork()
+            if pid:
+                try:
+                    self.ui.debug('forked worker process (pid=%d)\n' % pid)
+                    self._workerpids.add(pid)
+                    h.newconnection()
+                finally:
+                    conn.close()  # release handle in parent process
+            else:
+                try:
+                    self._serveworker(conn)
+                    conn.close()
+                    os._exit(0)
+                except:  # never return, hence no re-raises
+                    try:
+                        self.ui.traceback(force=True)
+                    finally:
+                        os._exit(255)
+
+    def _sigchldhandler(self, signal, frame):
+        self._reapworkers(os.WNOHANG)
+
+    def _reapworkers(self, options):
+        while self._workerpids:
+            try:
+                pid, _status = os.waitpid(-1, options)
+            except OSError as inst:
+                if inst.errno == errno.EINTR:
+                    continue
+                if inst.errno != errno.ECHILD:
+                    raise
+                # no child processes at all (reaped by other waitpid()?)
+                self._workerpids.clear()
+                return
+            if pid == 0:
+                # no waitable child processes
+                return
+            self.ui.debug('worker process exited (pid=%d)\n' % pid)
+            self._workerpids.discard(pid)
+
+    def _serveworker(self, conn):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        h = self._servicehandler
+        _serverequest(self.ui, self.repo, conn, h.createcmdserver)
+
 _servicemap = {
     'pipe': pipeservice,
-    'unix': unixservice,
+    'unix': unixforkingservice,
     }
 
 def createservice(ui, repo, opts):


More information about the Mercurial-devel mailing list