[PATCH 1 of 4 V2] chgcache: implement socketpair-based IPC

Yuya Nishihara yuya at tcha.org
Mon Feb 27 10:37:04 EST 2017


On Wed, 22 Feb 2017 18:16:08 -0800, Jun Wu wrote:
> # HG changeset patch
> # User Jun Wu <quark at fb.com>
> # Date 1487802520 28800
> #      Wed Feb 22 14:28:40 2017 -0800
> # Node ID aef9e96fb573b85f5731367a470f574dbe730839
> # Parent  80f04ba7f4d1f439d726068f02172f9a52b981fe
> # Available At https://bitbucket.org/quark-zju/hg-draft
> #              hg pull https://bitbucket.org/quark-zju/hg-draft -r aef9e96fb573
> chgcache: implement socketpair-based IPC

> 
> +class socketipc(object):
> +    """A simple interprocess communication mechanism that sets up an channel
> +    between the master server and (multiple) forked worker processes. The
> +    forked workers do non-blocking, unreliable writes, while the master server
> +    does blocking reads.
> +
> +    To use the object, create it in the master server, read from a thread, and
> +    write from forked processes:
> +
> +        # pid=1000, master, main thread
> +        ipc = socketipc() # initialize ipc before forking
> +
> +        # pid=1000, master, a background thread
> +        while True:
> +            msg = ipc.recv() # blocking
> +            ....
> +
> +        # pid=1001, worker
> +        ipc.send('foo') # non-blocking, silently ignore errors
> +
> +        # pid=1002, worker
> +        ipc.send('bar') # non-blocking, silently ignore errors
> +    """
> +
> +    def __init__(self):
> +        self._in, self._out = socket.socketpair(socket.AF_UNIX,
> +                                                socket.SOCK_DGRAM)
> +        self._out.setblocking(False)
> +
> +    def send(self, msg):
> +        """send msg without blocking. fail silently on errors, ex. msg is too
> +        long, or the queue is full.
> +        """
> +        try:
> +            return self._out.send(msg)
> +        except socket.error:
> +            pass
> +
> +    def recv(self):
> +        """receive a complete msg. will block."""
> +        select.select([self._in], [], [])
> +        # get message length, see "man tty_ioctl", not POSIX compliant
> +        intbuf = array.array('i', [0])
> +        fcntl.ioctl(self._in, termios.FIONREAD, intbuf)
> +        msglen = intbuf[0]
> +        # allocate one more byte, so we can detect bad msglen (bad OS)
> +        msg = self._in.recv(msglen + 1)
> +        assert len(msg) == msglen
> +        return msg

Looks okay, but can't we simply call recv() with reasonably large buffer size
(e.g. 8k) ?

Nit: if all peer ends were closed appropriately, recv() would return '' and the
assertion would fail.

> +    def __del__(self):
> +        self._in.close()
> +        self._out.close()

It's generally a bad idea to free resources by GC. Can't we have .close()
method?


More information about the Mercurial-devel mailing list