[PATCH 5 of 8] commandserver: add IPC channel to teach repository path on command finished

Yuya Nishihara yuya at tcha.org
Thu Dec 6 07:45:12 EST 2018


# HG changeset patch
# User Yuya Nishihara <yuya at tcha.org>
# Date 1540991943 -32400
#      Wed Oct 31 22:19:03 2018 +0900
# Node ID 5f4354d1aa370a028eee8bcc6de245b1212cf35c
# Parent  af9746ae6d62457788853eda54374b192b7a7134
commandserver: add IPC channel to teach repository path on command finished

The idea is to load recently-used repositories first in the master process,
and fork(). The forked worker can reuse a warm repository if it's preloaded.

There are a couple of ways of in-memory repository caching. They have pros
and cons:

 a. "preload by master"
    pros: can use a single cache dict, maximizing cache hit rate
    cons: need to reload a repo in master process (because worker process
          dies per command)
 b. "prefork"
    pros: can cache a repo without reloading (as worker processes persist)
    cons: lower cache hit rate since each worker has to maintain its own cache
 c. "shared memory" (or separate key-value store server)
    pros: no need to reload a repo in master process, ideally
    cons: need to serialize objects to sharable form

Since my primary goal is to get rid of the cost of loading obsstore without
massive rewrites, (c) doesn't work. (b) isn't ideal since it would require
much more SDRAMs than (a). So I take (a).

The idea credits to Jun Wu.

diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py
--- a/mercurial/commandserver.py
+++ b/mercurial/commandserver.py
@@ -506,12 +506,19 @@ class unixforkingservice(object):
             raise error.Abort(_('no socket path specified with --address'))
         self._servicehandler = handler or unixservicehandler(ui)
         self._sock = None
+        self._mainipc = None
+        self._workeripc = None
         self._oldsigchldhandler = None
         self._workerpids = set()  # updated by signal handler; do not iterate
         self._socketunlinked = None
 
     def init(self):
         self._sock = socket.socket(socket.AF_UNIX)
+        # IPC channel from many workers to one main process; this is actually
+        # a uni-directional pipe, but is backed by a DGRAM socket so each
+        # message can be easily separated.
+        o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
+        self._mainipc, self._workeripc = o
         self._servicehandler.bindsocket(self._sock, self.address)
         if util.safehasattr(procutil, 'unblocksignal'):
             procutil.unblocksignal(signal.SIGCHLD)
@@ -527,6 +534,8 @@ class unixforkingservice(object):
     def _cleanup(self):
         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
         self._sock.close()
+        self._mainipc.close()
+        self._workeripc.close()
         self._unlinksocket()
         # don't kill child processes as they have active clients, just wait
         self._reapworkers(0)
@@ -543,6 +552,8 @@ class unixforkingservice(object):
         selector = selectors.DefaultSelector()
         selector.register(self._sock, selectors.EVENT_READ,
                           self._acceptnewconnection)
+        selector.register(self._mainipc, selectors.EVENT_READ,
+                          self._handlemainipc)
         while True:
             if not exiting and h.shouldexit():
                 # clients can no longer connect() to the domain socket, so
@@ -592,8 +603,10 @@ class unixforkingservice(object):
             try:
                 selector.close()
                 sock.close()
+                self._mainipc.close()
                 self._runworker(conn)
                 conn.close()
+                self._workeripc.close()
                 os._exit(0)
             except:  # never return, hence no re-raises
                 try:
@@ -601,6 +614,17 @@ class unixforkingservice(object):
                 finally:
                     os._exit(255)
 
+    def _handlemainipc(self, sock, selector):
+        """Process messages sent from a worker"""
+        try:
+            path = sock.recv(32768)  # large enough to receive path
+        except socket.error as inst:
+            if inst.args[0] == errno.EINTR:
+                return
+            raise
+
+        self.ui.log(b'cmdserver', b'repository: %s\n', path)
+
     def _sigchldhandler(self, signal, frame):
         self._reapworkers(os.WNOHANG)
 
@@ -628,6 +652,22 @@ class unixforkingservice(object):
         h = self._servicehandler
         try:
             _serverequest(self.ui, self.repo, conn, h.createcmdserver,
-                          prereposetups=None)  # TODO: pass in hook functions
+                          prereposetups=[self._reposetup])
         finally:
             gc.collect()  # trigger __del__ since worker process uses os._exit
+
+    def _reposetup(self, ui, repo):
+        if not repo.local():
+            return
+
+        class unixcmdserverrepo(repo.__class__):
+            def close(self):
+                super(unixcmdserverrepo, self).close()
+                try:
+                    self._cmdserveripc.send(self.root)
+                except socket.error:
+                    self.ui.log(b'cmdserver',
+                                b'failed to send repo root to master\n')
+
+        repo.__class__ = unixcmdserverrepo
+        repo._cmdserveripc = self._workeripc
diff --git a/tests/test-chg.t b/tests/test-chg.t
--- a/tests/test-chg.t
+++ b/tests/test-chg.t
@@ -230,7 +230,6 @@ print only the last 10 lines, since we a
 preserved:
 
   $ cat log/server.log.1 log/server.log | tail -10 | filterlog
-  YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
   YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
   YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
@@ -238,5 +237,6 @@ preserved:
   YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
   YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
   YYYY/MM/DD HH:MM:SS (PID)> validate: []
+  YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload
   YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.


More information about the Mercurial-devel mailing list