D2860: wireproto: buffer output frames when in half duplex mode

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Mon Mar 19 20:01:14 EDT 2018


indygreg updated this revision to Diff 7144.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D2860?vs=7052&id=7144

REVISION DETAIL
  https://phab.mercurial-scm.org/D2860

AFFECTED FILES
  mercurial/wireprotoframing.py
  mercurial/wireprotoserver.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -9,8 +9,8 @@
 
 ffs = framing.makeframefromhumanstring
 
-def makereactor():
-    return framing.serverreactor()
+def makereactor(deferoutput=False):
+    return framing.serverreactor(deferoutput=deferoutput)
 
 def sendframes(reactor, gen):
     """Send a generator of frame bytearray to a reactor.
@@ -95,6 +95,9 @@
             'data': None,
         })
 
+        result = reactor.oninputeof()
+        self.assertaction(result, 'noop')
+
     def test1argument(self):
         reactor = makereactor()
         results = list(sendcommandframes(reactor, b'mycommand',
@@ -310,6 +313,37 @@
             b'error-response application some message',
         ])
 
+    def test1commanddeferresponse(self):
+        """Responses when in deferred output mode are delayed until EOF."""
+        reactor = makereactor(deferoutput=True)
+        results = list(sendcommandframes(reactor, b'mycommand', {}))
+        self.assertEqual(len(results), 1)
+        self.assertaction(results[0], 'runcommand')
+
+        result = reactor.onbytesresponseready(b'response')
+        self.assertaction(result, 'noop')
+        result = reactor.oninputeof()
+        self.assertaction(result, 'sendframes')
+        self.assertframesequal(result[1]['framegen'], [
+            b'bytes-response eos response',
+        ])
+
+    def testmultiplecommanddeferresponse(self):
+        reactor = makereactor(deferoutput=True)
+        list(sendcommandframes(reactor, b'command1', {}))
+        list(sendcommandframes(reactor, b'command2', {}))
+
+        result = reactor.onbytesresponseready(b'response1')
+        self.assertaction(result, 'noop')
+        result = reactor.onbytesresponseready(b'response2')
+        self.assertaction(result, 'noop')
+        result = reactor.oninputeof()
+        self.assertaction(result, 'sendframes')
+        self.assertframesequal(result[1]['framegen'], [
+            b'bytes-response eos response1',
+            b'bytes-response eos response2'
+        ])
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -401,14 +401,15 @@
   s>     Server: testing stub value\r\n
   s>     Date: $HTTP_DATE$\r\n
   s>     Content-Type: text/plain\r\n
-  s>     Content-Length: 291\r\n
+  s>     Content-Length: 310\r\n
   s>     \r\n
   s>     received: 1 2 command1\n
   s>     ["wantframe", {"state": "command-receiving-args"}]\n
   s>     received: 2 0 \x03\x00\x04\x00fooval1\n
   s>     ["wantframe", {"state": "command-receiving-args"}]\n
   s>     received: 2 2 \x04\x00\x03\x00bar1val\n
   s>     ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n
-  s>     received: <no frame>
+  s>     received: <no frame>\n
+  s>     {"action": "noop"}
 
   $ cat error.log
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -401,6 +401,10 @@
         states.append(json.dumps((action, meta), sort_keys=True,
                                  separators=(', ', ': ')))
 
+    action, meta = reactor.oninputeof()
+    meta['action'] = action
+    states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
+
     res.status = b'200 OK'
     res.headers[b'Content-Type'] = b'text/plain'
     res.setbodybytes(b'\n'.join(states))
@@ -411,7 +415,10 @@
     Called when the HTTP request contains unified frame-based protocol
     frames for evaluation.
     """
-    reactor = wireprotoframing.serverreactor()
+    # TODO Some HTTP clients are full duplex and can receive data before
+    # the entire request is transmitted. Figure out a way to indicate support
+    # for that so we can opt into full duplex mode.
+    reactor = wireprotoframing.serverreactor(deferoutput=True)
     seencommand = False
 
     while True:
@@ -448,6 +455,19 @@
             raise error.ProgrammingError(
                 'unhandled action from frame processor: %s' % action)
 
+    action, meta = reactor.oninputeof()
+    if action == 'sendframes':
+        # We assume we haven't started sending the response yet. If we're
+        # wrong, the response type will raise an exception.
+        res.status = b'200 OK'
+        res.headers[b'Content-Type'] = FRAMINGTYPE
+        res.setbodygen(meta['framegen'])
+    elif action == 'noop':
+        pass
+    else:
+        raise error.ProgrammingError('unhandled action from frame processor: %s'
+                                     % action)
+
 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
                       command):
     """Dispatch a wire protocol command made from HTTPv2 requests.
@@ -504,6 +524,8 @@
 
     if action == 'sendframes':
         res.setbodygen(meta['framegen'])
+    elif action == 'noop':
+        pass
     else:
         raise error.ProgrammingError('unhandled event from reactor: %s' %
                                      action)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -308,10 +308,24 @@
     wantframe
        Indicates that nothing of interest happened and the server is waiting on
        more frames from the client before anything interesting can be done.
+
+    noop
+       Indicates no additional action is required.
     """
 
-    def __init__(self):
+    def __init__(self, deferoutput=False):
+        """Construct a new server reactor.
+
+        ``deferoutput`` can be used to indicate that no output frames should be
+        instructed to be sent until input has been exhausted. In this mode,
+        events that would normally generate output frames (such as a command
+        response being ready) will instead defer instructing the consumer to
+        send those frames. This is useful for half-duplex transports where the
+        sender cannot receive until all data has been transmitted.
+        """
+        self._deferoutput = deferoutput
         self._state = 'idle'
+        self._bufferedframegens = []
         self._activecommand = None
         self._activeargs = None
         self._activedata = None
@@ -344,8 +358,33 @@
 
         The raw bytes response is passed as an argument.
         """
+        framegen = createbytesresponseframesfrombytes(data)
+
+        if self._deferoutput:
+            self._bufferedframegens.append(framegen)
+            return 'noop', {}
+        else:
+            return 'sendframes', {
+                'framegen': framegen,
+            }
+
+    def oninputeof(self):
+        """Signals that end of input has been received.
+
+        No more frames will be received. All pending activity should be
+        completed.
+        """
+        if not self._deferoutput or not self._bufferedframegens:
+            return 'noop', {}
+
+        # If we buffered all our responses, emit those.
+        def makegen():
+            for gen in self._bufferedframegens:
+                for frame in gen:
+                    yield frame
+
         return 'sendframes', {
-            'framegen': createbytesresponseframesfrombytes(data),
+            'framegen': makegen(),
         }
 
     def onapplicationerror(self, msg):



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list