D4924: wireprotov2: remove functions for creating response frames from bytes

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Oct 10 10:58:19 EDT 2018


This revision was automatically updated to reflect the committed changes.
Closed by commit rHG966b5f7fd30b: wireprotov2: remove functions for creating response frames from bytes (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D4924?vs=11763&id=11788

REVISION DETAIL
  https://phab.mercurial-scm.org/D4924

AFFECTED FILES
  mercurial/wireprotoframing.py
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -225,19 +225,22 @@
         results.append(self._sendsingleframe(
             reactor, ffs(b'1 1 stream-begin command-request new '
                          b"cbor:{b'name': b'command'}")))
-        result = reactor.oncommandresponseready(outstream, 1, b'response1')
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 1, [b'response1'])
         self.assertaction(result, b'sendframes')
         list(result[1][b'framegen'])
         results.append(self._sendsingleframe(
             reactor, ffs(b'1 1 stream-begin command-request new '
                          b"cbor:{b'name': b'command'}")))
-        result = reactor.oncommandresponseready(outstream, 1, b'response2')
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 1, [b'response2'])
         self.assertaction(result, b'sendframes')
         list(result[1][b'framegen'])
         results.append(self._sendsingleframe(
             reactor, ffs(b'1 1 stream-begin command-request new '
                          b"cbor:{b'name': b'command'}")))
-        result = reactor.oncommandresponseready(outstream, 1, b'response3')
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 1, [b'response3'])
         self.assertaction(result, b'sendframes')
         list(result[1][b'framegen'])
 
@@ -364,10 +367,13 @@
         list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
         outstream = reactor.makeoutputstream()
-        result = reactor.oncommandresponseready(outstream, 1, b'response')
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 1, [b'response'])
         self.assertaction(result, b'sendframes')
         self.assertframesequal(result[1][b'framegen'], [
-            b'1 2 stream-begin command-response eos %sresponse' % OK,
+            b'1 2 stream-begin command-response continuation %s' % OK,
+            b'1 2 0 command-response continuation cbor:b"response"',
+            b'1 2 0 command-response eos ',
         ])
 
     def testmultiframeresponse(self):
@@ -380,12 +386,16 @@
         list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
         outstream = reactor.makeoutputstream()
-        result = reactor.oncommandresponseready(outstream, 1, first + second)
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 1, [first + second])
         self.assertaction(result, b'sendframes')
         self.assertframesequal(result[1][b'framegen'], [
             b'1 2 stream-begin command-response continuation %s' % OK,
+            b'1 2 0 command-response continuation Y\x80d',
             b'1 2 0 command-response continuation %s' % first,
-            b'1 2 0 command-response eos %s' % second,
+            b'1 2 0 command-response continuation %s' % second,
+            b'1 2 0 command-response continuation ',
+            b'1 2 0 command-response eos '
         ])
 
     def testservererror(self):
@@ -412,12 +422,15 @@
         self.assertaction(results[0], b'runcommand')
 
         outstream = reactor.makeoutputstream()
-        result = reactor.oncommandresponseready(outstream, 1, b'response')
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 1, [b'response'])
         self.assertaction(result, b'noop')
         result = reactor.oninputeof()
         self.assertaction(result, b'sendframes')
         self.assertframesequal(result[1][b'framegen'], [
-            b'1 2 stream-begin command-response eos %sresponse' % OK,
+            b'1 2 stream-begin command-response continuation %s' % OK,
+            b'1 2 0 command-response continuation cbor:b"response"',
+            b'1 2 0 command-response eos ',
         ])
 
     def testmultiplecommanddeferresponse(self):
@@ -427,15 +440,21 @@
         list(sendcommandframes(reactor, instream, 3, b'command2', {}))
 
         outstream = reactor.makeoutputstream()
-        result = reactor.oncommandresponseready(outstream, 1, b'response1')
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 1, [b'response1'])
         self.assertaction(result, b'noop')
-        result = reactor.oncommandresponseready(outstream, 3, b'response2')
+        result = reactor.oncommandresponsereadyobjects(
+            outstream, 3, [b'response2'])
         self.assertaction(result, b'noop')
         result = reactor.oninputeof()
         self.assertaction(result, b'sendframes')
         self.assertframesequal(result[1][b'framegen'], [
-            b'1 2 stream-begin command-response eos %sresponse1' % OK,
-            b'3 2 0 command-response eos %sresponse2' % OK,
+            b'1 2 stream-begin command-response continuation %s' % OK,
+            b'1 2 0 command-response continuation cbor:b"response1"',
+            b'1 2 0 command-response eos ',
+            b'3 2 0 command-response continuation %s' % OK,
+            b'3 2 0 command-response continuation cbor:b"response2"',
+            b'3 2 0 command-response eos ',
         ])
 
     def testrequestidtracking(self):
@@ -447,16 +466,22 @@
 
         # Register results for commands out of order.
         outstream = reactor.makeoutputstream()
-        reactor.oncommandresponseready(outstream, 3, b'response3')
-        reactor.oncommandresponseready(outstream, 1, b'response1')
-        reactor.oncommandresponseready(outstream, 5, b'response5')
+        reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
+        reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
+        reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
 
         result = reactor.oninputeof()
         self.assertaction(result, b'sendframes')
         self.assertframesequal(result[1][b'framegen'], [
-            b'3 2 stream-begin command-response eos %sresponse3' % OK,
-            b'1 2 0 command-response eos %sresponse1' % OK,
-            b'5 2 0 command-response eos %sresponse5' % OK,
+            b'3 2 stream-begin command-response continuation %s' % OK,
+            b'3 2 0 command-response continuation cbor:b"response3"',
+            b'3 2 0 command-response eos ',
+            b'1 2 0 command-response continuation %s' % OK,
+            b'1 2 0 command-response continuation cbor:b"response1"',
+            b'1 2 0 command-response eos ',
+            b'5 2 0 command-response continuation %s' % OK,
+            b'5 2 0 command-response continuation cbor:b"response5"',
+            b'5 2 0 command-response eos ',
         ])
 
     def testduplicaterequestonactivecommand(self):
@@ -477,7 +502,7 @@
         instream = framing.stream(1)
         list(sendcommandframes(reactor, instream, 1, b'command1', {}))
         outstream = reactor.makeoutputstream()
-        reactor.oncommandresponseready(outstream, 1, b'response')
+        reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
 
         # We've registered the response but haven't sent it. From the
         # perspective of the reactor, the command is still active.
@@ -494,7 +519,7 @@
         instream = framing.stream(1)
         list(sendcommandframes(reactor, instream, 1, b'command1', {}))
         outstream = reactor.makeoutputstream()
-        res = reactor.oncommandresponseready(outstream, 1, b'response')
+        res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
         list(res[1][b'framegen'])
 
         results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -365,75 +365,6 @@
             if done:
                 break
 
-def createcommandresponseframesfrombytes(stream, requestid, data,
-                                         maxframesize=DEFAULT_MAX_FRAME_SIZE):
-    """Create a raw frame to send a bytes response from static bytes input.
-
-    Returns a generator of bytearrays.
-    """
-    # Automatically send the overall CBOR response map.
-    overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
-    if len(overall) > maxframesize:
-        raise error.ProgrammingError('not yet implemented')
-
-    # Simple case where we can fit the full response in a single frame.
-    if len(overall) + len(data) <= maxframesize:
-        flags = FLAG_COMMAND_RESPONSE_EOS
-        yield stream.makeframe(requestid=requestid,
-                               typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                               flags=flags,
-                               payload=overall + data)
-        return
-
-    # It's easier to send the overall CBOR map in its own frame than to track
-    # offsets.
-    yield stream.makeframe(requestid=requestid,
-                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                           flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
-                           payload=overall)
-
-    offset = 0
-    while True:
-        chunk = data[offset:offset + maxframesize]
-        offset += len(chunk)
-        done = offset == len(data)
-
-        if done:
-            flags = FLAG_COMMAND_RESPONSE_EOS
-        else:
-            flags = FLAG_COMMAND_RESPONSE_CONTINUATION
-
-        yield stream.makeframe(requestid=requestid,
-                               typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                               flags=flags,
-                               payload=chunk)
-
-        if done:
-            break
-
-def createbytesresponseframesfromgen(stream, requestid, gen,
-                                     maxframesize=DEFAULT_MAX_FRAME_SIZE):
-    """Generator of frames from a generator of byte chunks.
-
-    This assumes that another frame will follow whatever this emits. i.e.
-    this always emits the continuation flag and never emits the end-of-stream
-    flag.
-    """
-    cb = util.chunkbuffer(gen)
-    flags = FLAG_COMMAND_RESPONSE_CONTINUATION
-
-    while True:
-        chunk = cb.read(maxframesize)
-        if not chunk:
-            break
-
-        yield stream.makeframe(requestid=requestid,
-                               typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                               flags=flags,
-                               payload=chunk)
-
-        flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
-
 def createcommandresponseokframe(stream, requestid):
     overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
 
@@ -1020,30 +951,6 @@
 
         return meth(frame)
 
-    def oncommandresponseready(self, stream, requestid, data):
-        """Signal that a bytes response is ready to be sent to the client.
-
-        The raw bytes response is passed as an argument.
-        """
-        ensureserverstream(stream)
-
-        def sendframes():
-            for frame in createcommandresponseframesfrombytes(stream, requestid,
-                                                              data):
-                yield frame
-
-            self._activecommands.remove(requestid)
-
-        result = sendframes()
-
-        if self._deferoutput:
-            self._bufferedframegens.append(result)
-            return 'noop', {}
-        else:
-            return 'sendframes', {
-                'framegen': result,
-            }
-
     def oncommandresponsereadyobjects(self, stream, requestid, objs):
         """Signal that objects are ready to be sent to the client.
 
@@ -1053,6 +960,10 @@
         """
         ensureserverstream(stream)
 
+        # A more robust solution would be to check for objs.{next,__next__}.
+        if isinstance(objs, list):
+            objs = iter(objs)
+
         # We need to take care over exception handling. Uncaught exceptions
         # when generating frames could lead to premature end of the frame
         # stream and the possibility of the server or client process getting



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list