[PATCH 1 of 4 V2] chgcache: implement socketpair-based IPC
Yuya Nishihara
yuya at tcha.org
Mon Feb 27 10:37:04 EST 2017
On Wed, 22 Feb 2017 18:16:08 -0800, Jun Wu wrote:
> # HG changeset patch
> # User Jun Wu <quark at fb.com>
> # Date 1487802520 28800
> # Wed Feb 22 14:28:40 2017 -0800
> # Node ID aef9e96fb573b85f5731367a470f574dbe730839
> # Parent 80f04ba7f4d1f439d726068f02172f9a52b981fe
> # Available At https://bitbucket.org/quark-zju/hg-draft
> # hg pull https://bitbucket.org/quark-zju/hg-draft -r aef9e96fb573
> chgcache: implement socketpair-based IPC
>
> +class socketipc(object):
> + """A simple interprocess communication mechanism that sets up an channel
> + between the master server and (multiple) forked worker processes. The
> + forked workers do non-blocking, unreliable writes, while the master server
> + does blocking reads.
> +
> + To use the object, create it in the master server, read from a thread, and
> + write from forked processes:
> +
> + # pid=1000, master, main thread
> + ipc = socketipc() # initialize ipc before forking
> +
> + # pid=1000, master, a background thread
> + while True:
> + msg = ipc.recv() # blocking
> + ....
> +
> + # pid=1001, worker
> + ipc.send('foo') # non-blocking, silently ignore errors
> +
> + # pid=1002, worker
> + ipc.send('bar') # non-blocking, silently ignore errors
> + """
> +
> + def __init__(self):
> + self._in, self._out = socket.socketpair(socket.AF_UNIX,
> + socket.SOCK_DGRAM)
> + self._out.setblocking(False)
> +
> + def send(self, msg):
> + """send msg without blocking. fail silently on errors, ex. msg is too
> + long, or the queue is full.
> + """
> + try:
> + return self._out.send(msg)
> + except socket.error:
> + pass
> +
> + def recv(self):
> + """receive a complete msg. will block."""
> + select.select([self._in], [], [])
> + # get message length, see "man tty_ioctl", not POSIX compliant
> + intbuf = array.array('i', [0])
> + fcntl.ioctl(self._in, termios.FIONREAD, intbuf)
> + msglen = intbuf[0]
> + # allocate one more byte, so we can detect bad msglen (bad OS)
> + msg = self._in.recv(msglen + 1)
> + assert len(msg) == msglen
> + return msg
Looks okay, but can't we simply call recv() with reasonably large buffer size
(e.g. 8k) ?
Nit: if all peer ends were closed appropriately, recv() would return '' and the
assertion would fail.
> + def __del__(self):
> + self._in.close()
> + self._out.close()
It's generally a bad idea to free resources by GC. Can't we have .close()
method?
More information about the Mercurial-devel
mailing list