[PATCH 1 of 4 V2] chgcache: implement socketpair-based IPC

Jun Wu quark at fb.com
Mon Feb 27 12:49:01 EST 2017


Excerpts from Yuya Nishihara's message of 2017-02-28 00:37:04 +0900:
> On Wed, 22 Feb 2017 18:16:08 -0800, Jun Wu wrote:
> > # HG changeset patch
> > # User Jun Wu <quark at fb.com>
> > # Date 1487802520 28800
> > #      Wed Feb 22 14:28:40 2017 -0800
> > # Node ID aef9e96fb573b85f5731367a470f574dbe730839
> > # Parent  80f04ba7f4d1f439d726068f02172f9a52b981fe
> > # Available At https://bitbucket.org/quark-zju/hg-draft 
> > #              hg pull https://bitbucket.org/quark-zju/hg-draft  -r aef9e96fb573
> > chgcache: implement socketpair-based IPC
> 
> > 
> > +class socketipc(object):
> > +    """A simple interprocess communication mechanism that sets up an channel
> > +    between the master server and (multiple) forked worker processes. The
> > +    forked workers do non-blocking, unreliable writes, while the master server
> > +    does blocking reads.
> > +
> > +    To use the object, create it in the master server, read from a thread, and
> > +    write from forked processes:
> > +
> > +        # pid=1000, master, main thread
> > +        ipc = socketipc() # initialize ipc before forking
> > +
> > +        # pid=1000, master, a background thread
> > +        while True:
> > +            msg = ipc.recv() # blocking
> > +            ....
> > +
> > +        # pid=1001, worker
> > +        ipc.send('foo') # non-blocking, silently ignore errors
> > +
> > +        # pid=1002, worker
> > +        ipc.send('bar') # non-blocking, silently ignore errors
> > +    """
> > +
> > +    def __init__(self):
> > +        self._in, self._out = socket.socketpair(socket.AF_UNIX,
> > +                                                socket.SOCK_DGRAM)
> > +        self._out.setblocking(False)
> > +
> > +    def send(self, msg):
> > +        """send msg without blocking. fail silently on errors, ex. msg is too
> > +        long, or the queue is full.
> > +        """
> > +        try:
> > +            return self._out.send(msg)
> > +        except socket.error:
> > +            pass
> > +
> > +    def recv(self):
> > +        """receive a complete msg. will block."""
> > +        select.select([self._in], [], [])
> > +        # get message length, see "man tty_ioctl", not POSIX compliant
> > +        intbuf = array.array('i', [0])
> > +        fcntl.ioctl(self._in, termios.FIONREAD, intbuf)
> > +        msglen = intbuf[0]
> > +        # allocate one more byte, so we can detect bad msglen (bad OS)
> > +        msg = self._in.recv(msglen + 1)
> > +        assert len(msg) == msglen
> > +        return msg
> 
> Looks okay, but can't we simply call recv() with reasonably large buffer size
> (e.g. 8k) ?

That's actually a good idea. I was not very comfortable with the non-POSIX
API too.

> Nit: if all peer ends were closed appropriately, recv() would return '' and the
> assertion would fail.

Currently, that's impossible - the master does not close the fds.

> 
> > +    def __del__(self):
> > +        self._in.close()
> > +        self._out.close()
> 
> It's generally a bad idea to free resources by GC. Can't we have .close()
> method?

Actually I had tried but it seems over complicated. "close" and "recv" may
be called from different threads, thus thread lock may be necessary. And it
requires a way to interrupt a blocking "recv" from "close", which is quite
painful to do "correctly" (see the reply to that patch). It seems shm does
not introduce these kinds of troubles.


More information about the Mercurial-devel mailing list