D2860: wireproto: buffer output frames when in half duplex mode
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Mon Mar 19 20:01:14 EDT 2018
indygreg updated this revision to Diff 7144.
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D2860?vs=7052&id=7144
REVISION DETAIL
https://phab.mercurial-scm.org/D2860
AFFECTED FILES
mercurial/wireprotoframing.py
mercurial/wireprotoserver.py
tests/test-http-api-httpv2.t
tests/test-wireproto-serverreactor.py
CHANGE DETAILS
diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -9,8 +9,8 @@
ffs = framing.makeframefromhumanstring
-def makereactor():
- return framing.serverreactor()
+def makereactor(deferoutput=False):
+ return framing.serverreactor(deferoutput=deferoutput)
def sendframes(reactor, gen):
"""Send a generator of frame bytearray to a reactor.
@@ -95,6 +95,9 @@
'data': None,
})
+ result = reactor.oninputeof()
+ self.assertaction(result, 'noop')
+
def test1argument(self):
reactor = makereactor()
results = list(sendcommandframes(reactor, b'mycommand',
@@ -310,6 +313,37 @@
b'error-response application some message',
])
+ def test1commanddeferresponse(self):
+ """Responses when in deferred output mode are delayed until EOF."""
+ reactor = makereactor(deferoutput=True)
+ results = list(sendcommandframes(reactor, b'mycommand', {}))
+ self.assertEqual(len(results), 1)
+ self.assertaction(results[0], 'runcommand')
+
+ result = reactor.onbytesresponseready(b'response')
+ self.assertaction(result, 'noop')
+ result = reactor.oninputeof()
+ self.assertaction(result, 'sendframes')
+ self.assertframesequal(result[1]['framegen'], [
+ b'bytes-response eos response',
+ ])
+
+ def testmultiplecommanddeferresponse(self):
+ reactor = makereactor(deferoutput=True)
+ list(sendcommandframes(reactor, b'command1', {}))
+ list(sendcommandframes(reactor, b'command2', {}))
+
+ result = reactor.onbytesresponseready(b'response1')
+ self.assertaction(result, 'noop')
+ result = reactor.onbytesresponseready(b'response2')
+ self.assertaction(result, 'noop')
+ result = reactor.oninputeof()
+ self.assertaction(result, 'sendframes')
+ self.assertframesequal(result[1]['framegen'], [
+ b'bytes-response eos response1',
+ b'bytes-response eos response2'
+ ])
+
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -401,14 +401,15 @@
s> Server: testing stub value\r\n
s> Date: $HTTP_DATE$\r\n
s> Content-Type: text/plain\r\n
- s> Content-Length: 291\r\n
+ s> Content-Length: 310\r\n
s> \r\n
s> received: 1 2 command1\n
s> ["wantframe", {"state": "command-receiving-args"}]\n
s> received: 2 0 \x03\x00\x04\x00fooval1\n
s> ["wantframe", {"state": "command-receiving-args"}]\n
s> received: 2 2 \x04\x00\x03\x00bar1val\n
s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n
- s> received: <no frame>
+ s> received: <no frame>\n
+ s> {"action": "noop"}
$ cat error.log
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -401,6 +401,10 @@
states.append(json.dumps((action, meta), sort_keys=True,
separators=(', ', ': ')))
+ action, meta = reactor.oninputeof()
+ meta['action'] = action
+ states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
+
res.status = b'200 OK'
res.headers[b'Content-Type'] = b'text/plain'
res.setbodybytes(b'\n'.join(states))
@@ -411,7 +415,10 @@
Called when the HTTP request contains unified frame-based protocol
frames for evaluation.
"""
- reactor = wireprotoframing.serverreactor()
+ # TODO Some HTTP clients are full duplex and can receive data before
+ # the entire request is transmitted. Figure out a way to indicate support
+ # for that so we can opt into full duplex mode.
+ reactor = wireprotoframing.serverreactor(deferoutput=True)
seencommand = False
while True:
@@ -448,6 +455,19 @@
raise error.ProgrammingError(
'unhandled action from frame processor: %s' % action)
+ action, meta = reactor.oninputeof()
+ if action == 'sendframes':
+ # We assume we haven't started sending the response yet. If we're
+ # wrong, the response type will raise an exception.
+ res.status = b'200 OK'
+ res.headers[b'Content-Type'] = FRAMINGTYPE
+ res.setbodygen(meta['framegen'])
+ elif action == 'noop':
+ pass
+ else:
+ raise error.ProgrammingError('unhandled action from frame processor: %s'
+ % action)
+
def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
command):
"""Dispatch a wire protocol command made from HTTPv2 requests.
@@ -504,6 +524,8 @@
if action == 'sendframes':
res.setbodygen(meta['framegen'])
+ elif action == 'noop':
+ pass
else:
raise error.ProgrammingError('unhandled event from reactor: %s' %
action)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -308,10 +308,24 @@
wantframe
Indicates that nothing of interest happened and the server is waiting on
more frames from the client before anything interesting can be done.
+
+ noop
+ Indicates no additional action is required.
"""
- def __init__(self):
+ def __init__(self, deferoutput=False):
+ """Construct a new server reactor.
+
+ ``deferoutput`` can be used to indicate that no output frames should be
+ instructed to be sent until input has been exhausted. In this mode,
+ events that would normally generate output frames (such as a command
+ response being ready) will instead defer instructing the consumer to
+ send those frames. This is useful for half-duplex transports where the
+ sender cannot receive until all data has been transmitted.
+ """
+ self._deferoutput = deferoutput
self._state = 'idle'
+ self._bufferedframegens = []
self._activecommand = None
self._activeargs = None
self._activedata = None
@@ -344,8 +358,33 @@
The raw bytes response is passed as an argument.
"""
+ framegen = createbytesresponseframesfrombytes(data)
+
+ if self._deferoutput:
+ self._bufferedframegens.append(framegen)
+ return 'noop', {}
+ else:
+ return 'sendframes', {
+ 'framegen': framegen,
+ }
+
+ def oninputeof(self):
+ """Signals that end of input has been received.
+
+ No more frames will be received. All pending activity should be
+ completed.
+ """
+ if not self._deferoutput or not self._bufferedframegens:
+ return 'noop', {}
+
+ # If we buffered all our responses, emit those.
+ def makegen():
+ for gen in self._bufferedframegens:
+ for frame in gen:
+ yield frame
+
return 'sendframes', {
- 'framegen': createbytesresponseframesfrombytes(data),
+ 'framegen': makegen(),
}
def onapplicationerror(self, msg):
To: indygreg, #hg-reviewers
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list