D4473: wireprotoframing: buffer emitted data to reduce frame count

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Sep 12 10:43:36 EDT 2018


This revision was automatically updated to reflect the committed changes.
Closed by commit rHG84bf6ded9317: wireprotoframing: buffer emitted data to reduce frame count (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D4473?vs=10770&id=10934

REVISION DETAIL
  https://phab.mercurial-scm.org/D4473

AFFECTED FILES
  mercurial/wireprotoframing.py

CHANGE DETAILS

diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -511,6 +511,98 @@
                            flags=0,
                            payload=payload)
 
+class bufferingcommandresponseemitter(object):
+    """Helper object to emit command response frames intelligently.
+
+    Raw command response data is likely emitted in chunks much smaller
+    than what can fit in a single frame. This class exists to buffer
+    chunks until enough data is available to fit in a single frame.
+
+    TODO we'll need something like this when compression is supported.
+    So it might make sense to implement this functionality at the stream
+    level.
+    """
+    def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
+        self._stream = stream
+        self._requestid = requestid
+        self._maxsize = maxframesize
+        self._chunks = []
+        self._chunkssize = 0
+
+    def send(self, data):
+        """Send new data for emission.
+
+        Is a generator of new frames that were derived from the new input.
+
+        If the special input ``None`` is received, flushes all buffered
+        data to frames.
+        """
+
+        if data is None:
+            for frame in self._flush():
+                yield frame
+            return
+
+        # There is a ton of potential to do more complicated things here.
+        # Our immediate goal is to coalesce small chunks into big frames,
+        # not achieve the fewest number of frames possible. So we go with
+        # a simple implementation:
+        #
+        # * If a chunk is too large for a frame, we flush and emit frames
+        #   for the new chunk.
+        # * If a chunk can be buffered without total buffered size limits
+        #   being exceeded, we do that.
+        # * If a chunk causes us to go over our buffering limit, we flush
+        #   and then buffer the new chunk.
+
+        if len(data) > self._maxsize:
+            for frame in self._flush():
+                yield frame
+
+            # Now emit frames for the big chunk.
+            offset = 0
+            while True:
+                chunk = data[offset:offset + self._maxsize]
+                offset += len(chunk)
+
+                yield self._stream.makeframe(
+                    self._requestid,
+                    typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                    flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+                    payload=chunk)
+
+                if offset == len(data):
+                    return
+
+        # If we don't have enough to constitute a full frame, buffer and
+        # return.
+        if len(data) + self._chunkssize < self._maxsize:
+            self._chunks.append(data)
+            self._chunkssize += len(data)
+            return
+
+        # Else flush what we have and buffer the new chunk. We could do
+        # something more intelligent here, like break the chunk. Let's
+        # keep things simple for now.
+        for frame in self._flush():
+            yield frame
+
+        self._chunks.append(data)
+        self._chunkssize = len(data)
+
+    def _flush(self):
+        payload = b''.join(self._chunks)
+        assert len(payload) <= self._maxsize
+
+        self._chunks[:] = []
+        self._chunkssize = 0
+
+        yield self._stream.makeframe(
+            self._requestid,
+            typeid=FRAME_TYPE_COMMAND_RESPONSE,
+            flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+            payload=payload)
+
 class stream(object):
     """Represents a logical unidirectional series of frames."""
 
@@ -716,10 +808,14 @@
 
         def sendframes():
             emitted = False
+            emitter = bufferingcommandresponseemitter(stream, requestid)
             while True:
                 try:
                     o = next(objs)
                 except StopIteration:
+                    for frame in emitter.send(None):
+                        yield frame
+
                     if emitted:
                         yield createcommandresponseeosframe(stream, requestid)
                     break
@@ -743,11 +839,10 @@
                         yield createcommandresponseokframe(stream, requestid)
                         emitted = True
 
-                    # TODO buffer chunks so emitted frame payloads can be
-                    # larger.
-                    for frame in createbytesresponseframesfromgen(
-                        stream, requestid, cborutil.streamencode(o)):
-                        yield frame
+                    for chunk in cborutil.streamencode(o):
+                        for frame in emitter.send(chunk):
+                            yield frame
+
                 except Exception as e:
                     for frame in createerrorframe(stream, requestid,
                                                   '%s' % e,



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list