[PATCH 1 of 4 V2] chgcache: implement socketpair-based IPC

Jun Wu quark at fb.com
Thu Feb 23 02:16:08 UTC 2017


# HG changeset patch
# User Jun Wu <quark at fb.com>
# Date 1487802520 28800
#      Wed Feb 22 14:28:40 2017 -0800
# Node ID aef9e96fb573b85f5731367a470f574dbe730839
# Parent  80f04ba7f4d1f439d726068f02172f9a52b981fe
# Available At https://bitbucket.org/quark-zju/hg-draft
#              hg pull https://bitbucket.org/quark-zju/hg-draft -r aef9e96fb573
chgcache: implement socketpair-based IPC

The new chgcache module will be used to store some state of chg, which is
useful to preload repo-related contents and speed up operations.

Motivation:

  Currently, chg only preloads extensions and one chg server is responsible
  for multiple repos (to avoid unnecessary memory usage).

  With the new module, eventually the master chg server will have a state
  storing multiple components of multiple repos, like the changelog index
  (with radix tree prebuilt), the dirstate, the phasecache, the bookmarks,
  the obsstore, etc.

How it works:

  The preloading logic is intended to work as follows:

    1. a request comes in
    2. master fork(), worker starts to respond to request
    3. worker sends "repo location" to master at exit
    4. master preloads that repo, asynchronously
    5. another request comes in
    6. master fork(), and the worker gets the preloaded state for free
       (note 1: if 4 did not complete, the worker won't get the state)
    7. worker double checks the state is valid (ex. check file size, mtime)

Why a new module:

  The feature is not too trivial, therefore it's developed in a standalone
  module to have better separation from the core chg logic.

  As discussed at [1], it's better to start as a new module, instead of an
  extension.

This patch adds the basic building block - the IPC utility, so chg workers
could send information to chg master.

  The choice of IPC is DGRAM socketpair, because:

    - Why not pipes:
      DGRAM eliminates the complexity dealing with message boundaries, as
      long as the message size is small. Our messages are supposed to be
      paths, which could be considered "small".
      Note: We don't really need to do bidirectional communication. It's
      only worker -> master.

    - Why not shared memory:
      Shared memory requires polling in the master, and requires extra
      message consistency check.

    - Why not Python stdlib multiprocessing.(Array|Queue|*):
      multiprocessing requires special fork(), which runs its special object
      serialization, which has extra costs. Our IPC is simple, and we have
      some special needs: non-blocking at the sender side, blocking at the
      receiver side. And we probably want to just use "os.fork()". So a tens
      of lines utility is probably the right choice.

The patch was tested using the following steps:

  1. Copy the class to a single .py file. Make an instance of it.
  2. Create two children by using os.fork()
  3. Use the socketipc object to send messages back to the parent.
  4. Read messages in the parent.
  5. Make sure that:
     - the children won't be blocked, even if the parent does not read
     - the parent won't receive incomplete messages
     - the parent may lose messages
     - super long messages won't be sent

[1]: https://www.mercurial-scm.org/pipermail/mercurial-devel/2017-February/093036.html

diff --git a/mercurial/chgcache.py b/mercurial/chgcache.py
new file mode 100644
--- /dev/null
+++ b/mercurial/chgcache.py
@@ -0,0 +1,70 @@
+# chgcache.py - dynamic caching state for chg
+#
+# Copyright 2017 Facebook, Inc.
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+import array
+import fcntl
+import select
+import socket
+import termios
+
+# -- shared by both the forked workers, and the master ------------------------
+
+class socketipc(object):
+    """A simple interprocess communication mechanism that sets up an channel
+    between the master server and (multiple) forked worker processes. The
+    forked workers do non-blocking, unreliable writes, while the master server
+    does blocking reads.
+
+    To use the object, create it in the master server, read from a thread, and
+    write from forked processes:
+
+        # pid=1000, master, main thread
+        ipc = socketipc() # initialize ipc before forking
+
+        # pid=1000, master, a background thread
+        while True:
+            msg = ipc.recv() # blocking
+            ....
+
+        # pid=1001, worker
+        ipc.send('foo') # non-blocking, silently ignore errors
+
+        # pid=1002, worker
+        ipc.send('bar') # non-blocking, silently ignore errors
+    """
+
+    def __init__(self):
+        self._in, self._out = socket.socketpair(socket.AF_UNIX,
+                                                socket.SOCK_DGRAM)
+        self._out.setblocking(False)
+
+    def send(self, msg):
+        """send msg without blocking. fail silently on errors, ex. msg is too
+        long, or the queue is full.
+        """
+        try:
+            return self._out.send(msg)
+        except socket.error:
+            pass
+
+    def recv(self):
+        """receive a complete msg. will block."""
+        select.select([self._in], [], [])
+        # get message length, see "man tty_ioctl", not POSIX compliant
+        intbuf = array.array('i', [0])
+        fcntl.ioctl(self._in, termios.FIONREAD, intbuf)
+        msglen = intbuf[0]
+        # allocate one more byte, so we can detect bad msglen (bad OS)
+        msg = self._in.recv(msglen + 1)
+        assert len(msg) == msglen
+        return msg
+
+    def __del__(self):
+        self._in.close()
+        self._out.close()


More information about the Mercurial-devel mailing list