[PATCH 1 of 3 side-word (+4)] util: introduce a bufferedinputpipe utility

Pierre-Yves David pierre-yves.david at ens-lyon.org
Sun May 31 07:22:32 UTC 2015


# HG changeset patch
# User Pierre-Yves David <pierre-yves.david at fb.com>
# Date 1433055324 25200
#      Sat May 30 23:55:24 2015 -0700
# Node ID 1a883412355af1e057324b4ab2de885473f54432
# Parent  7d24a41200d32b48ff63638c9309877c2e187c91
util: introduce a bufferedinputpipe utility

To restore real time server output through ssh, we need to using polling feature
(like select) on the pipes used to communicate with the ssh client. However
we cannot use select alongside python level buffering of these pipe (because we
need to know if the buffer is non-empty before calling select).

However, unbuffered performance are terrible, presumably because the 'readline'
call is issuing 'read(1)' call until it find a '\n'. To work around that we
introduces our own overlay that do buffering by hand, exposing the state of the
buffer to the outside world.

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -230,10 +230,104 @@ except NameError:
             return memoryview(sliceable)[offset:]
 
 import subprocess
 closefds = os.name == 'posix'
 
+_chunksize = 4096
+
+class bufferedinputpipe(object):
+    """a manually buffered input pipe
+
+    Python will not let us use buffered IO and lazy reading with 'polling' at
+    the same time. We cannot probe the buffer state and select will not detect
+    that data are ready to read if they are already buffered.
+
+    This class let us work around that by implementing its own buffering
+    (allowing efficient readline) while offering a way to know if the buffer is
+    empty from the output (allowing collaboration of the buffer with polling).
+    """
+
+    def __init__(self, input):
+        self._input = input
+        self._buffer = []
+        self._eof = False
+
+    @property
+    def hasbuffer(self):
+        """True is any data is currently buffered
+
+        This will be used externally a pre-step for polling IO. If there is
+        already data then no polling should be set in place."""
+        return bool(self._buffer)
+
+    @property
+    def closed(self):
+        return self._input.closed
+
+    def fileno(self):
+        return self._input.fileno()
+
+    def close(self):
+        return self._input.close()
+
+    def read(self, size):
+        while (not self._eof) and (self._lenbuf < size):
+            self._fillbuffer()
+        return self._frombuffer(size)
+
+    def readline(self, *args, **kwargs):
+        if 1 < len(self._buffer):
+            # this should not happen because both read and readline end with a
+            # _frombuffer call that collapse it.
+            self._buffer = [''.join(self._buffer)]
+        lfi = -1
+        if self._buffer:
+            lfi = self._buffer[-1].find('\n')
+        while (not self._eof) and lfi < 0:
+            self._fillbuffer()
+            if self._buffer:
+                lfi = self._buffer[-1].find('\n')
+        size = lfi + 1
+        if lfi < 0: # end of file
+            size = self._lenbuf
+        elif 1 < len(self._buffer):
+            # we need to take previous chunks into account
+            size += self._lenbuf - len(self._buffer[-1])
+        return self._frombuffer(size)
+
+    @property
+    def _lenbuf(self):
+        """return the current lengh of buffered data"""
+        return sum(len(d) for d in self._buffer)
+
+    def _frombuffer(self, size):
+        """return at most 'size' data from the buffer
+
+        The data are removed from the buffer."""
+        if size == 0 or not self._buffer:
+            return ''
+        buf = self._buffer[0]
+        if 1 < len(self._buffer):
+            buf = ''.join(self._buffer)
+
+        data = buf[:size]
+        buf = buf[len(data):]
+        if buf:
+            self._buffer = [buf]
+        else:
+            self._buffer = []
+        return data
+
+    def _fillbuffer(self):
+        """read data to the buffer"""
+        data = os.read(self._input.fileno(), _chunksize)
+        if not data:
+            self._eof = True
+        else:
+            # inefficient add
+            self._buffer.append(data)
+
 def popen2(cmd, env=None, newlines=False):
     # Setting bufsize to -1 lets the system decide the buffer size.
     # The default for bufsize is 0, meaning unbuffered. This leads to
     # poor performance on Mac OS X: http://bugs.python.org/issue4194
     p = subprocess.Popen(cmd, shell=True, bufsize=-1,


More information about the Mercurial-devel mailing list