D2947: wireproto: explicit API to create outgoing streams

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Mon Mar 26 17:35:50 EDT 2018


indygreg updated this revision to Diff 7326.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D2947?vs=7309&id=7326

REVISION DETAIL
  https://phab.mercurial-scm.org/D2947

AFFECTED FILES
  mercurial/wireprotoframing.py
  mercurial/wireprotoserver.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -375,7 +375,7 @@
         """Multiple fully serviced commands with same request ID is allowed."""
         reactor = makereactor()
         results = []
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         results.append(self._sendsingleframe(
             reactor, ffs(b'1 1 stream-begin command-name eos command')))
         result = reactor.onbytesresponseready(outstream, 1, b'response1')
@@ -530,7 +530,7 @@
         instream = framing.stream(1)
         list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         result = reactor.onbytesresponseready(outstream, 1, b'response')
         self.assertaction(result, 'sendframes')
         self.assertframesequal(result[1]['framegen'], [
@@ -546,7 +546,7 @@
         instream = framing.stream(1)
         list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         result = reactor.onbytesresponseready(outstream, 1, first + second)
         self.assertaction(result, 'sendframes')
         self.assertframesequal(result[1]['framegen'], [
@@ -559,7 +559,7 @@
         instream = framing.stream(1)
         list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
 
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         result = reactor.onapplicationerror(outstream, 1, b'some message')
         self.assertaction(result, 'sendframes')
         self.assertframesequal(result[1]['framegen'], [
@@ -575,7 +575,7 @@
         self.assertEqual(len(results), 1)
         self.assertaction(results[0], 'runcommand')
 
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         result = reactor.onbytesresponseready(outstream, 1, b'response')
         self.assertaction(result, 'noop')
         result = reactor.oninputeof()
@@ -590,7 +590,7 @@
         list(sendcommandframes(reactor, instream, 1, b'command1', {}))
         list(sendcommandframes(reactor, instream, 3, b'command2', {}))
 
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         result = reactor.onbytesresponseready(outstream, 1, b'response1')
         self.assertaction(result, 'noop')
         result = reactor.onbytesresponseready(outstream, 3, b'response2')
@@ -610,7 +610,7 @@
         list(sendcommandframes(reactor, instream, 5, b'command3', {}))
 
         # Register results for commands out of order.
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         reactor.onbytesresponseready(outstream, 3, b'response3')
         reactor.onbytesresponseready(outstream, 1, b'response1')
         reactor.onbytesresponseready(outstream, 5, b'response5')
@@ -640,7 +640,7 @@
         reactor = makereactor()
         instream = framing.stream(1)
         list(sendcommandframes(reactor, instream, 1, b'command1', {}))
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         reactor.onbytesresponseready(outstream, 1, b'response')
 
         # We've registered the response but haven't sent it. From the
@@ -672,7 +672,7 @@
         reactor = makereactor()
         instream = framing.stream(1)
         list(sendcommandframes(reactor, instream, 1, b'command1', {}))
-        outstream = framing.stream(2)
+        outstream = reactor.makeoutputstream()
         res = reactor.onbytesresponseready(outstream, 1, b'response')
         list(res[1]['framegen'])
 
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -472,7 +472,7 @@
   s>     \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
   s>     \r\n
   s>     25\r\n
-  s>     \x1d\x00\x00\x03\x00\x02\x01Bcustomreadonly bytes response
+  s>     \x1d\x00\x00\x03\x00\x02\x00Bcustomreadonly bytes response
   s>     \r\n
   s>     0\r\n
   s>     \r\n
@@ -511,7 +511,7 @@
   s>     \x00\x00\x00\x03\x00\x02\x01B
   s>     \r\n
   s>     26\r\n
-  s>     \x1e\x00\x00\x01\x00\x02\x01Bbookmarks	\n
+  s>     \x1e\x00\x00\x01\x00\x02\x00Bbookmarks	\n
   s>     namespaces	\n
   s>     phases	
   s>     \r\n
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -432,6 +432,8 @@
     reactor = wireprotoframing.serverreactor(deferoutput=True)
     seencommand = False
 
+    outstream = reactor.makeoutputstream()
+
     while True:
         frame = wireprotoframing.readframe(req.bodyfh)
         if not frame:
@@ -444,8 +446,8 @@
             continue
         elif action == 'runcommand':
             sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
-                                           reqcommand, reactor, meta,
-                                           issubsequent=seencommand)
+                                           reqcommand, reactor, outstream,
+                                           meta, issubsequent=seencommand)
 
             if sentoutput:
                 return
@@ -476,7 +478,7 @@
                                      % action)
 
 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
-                      command, issubsequent):
+                      outstream, command, issubsequent):
     """Dispatch a wire protocol command made from HTTPv2 requests.
 
     The authenticated permission (``authedperm``) along with the original
@@ -546,10 +548,9 @@
 
     res.status = b'200 OK'
     res.headers[b'Content-Type'] = FRAMINGTYPE
-    stream = wireprotoframing.stream(2)
 
     if isinstance(rsp, wireprototypes.bytesresponse):
-        action, meta = reactor.onbytesresponseready(stream,
+        action, meta = reactor.onbytesresponseready(outstream,
                                                     command['requestid'],
                                                     rsp.data)
     else:
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -533,9 +533,11 @@
         """
         self._deferoutput = deferoutput
         self._state = 'idle'
+        self._nextoutgoingstreamid = 2
         self._bufferedframegens = []
         # stream id -> stream instance for all active streams from the client.
         self._incomingstreams = {}
+        self._outgoingstreams = {}
         # request id -> dict of commands that are actively being received.
         self._receivingcommands = {}
         # Request IDs that have been received and are actively being processed.
@@ -638,6 +640,16 @@
                                          application=True),
         }
 
+    def makeoutputstream(self):
+        """Create a stream to be used for sending data to the client."""
+        streamid = self._nextoutgoingstreamid
+        self._nextoutgoingstreamid += 2
+
+        s = stream(streamid)
+        self._outgoingstreams[streamid] = s
+
+        return s
+
     def _makeerrorresult(self, msg):
         return 'error', {
             'message': msg,



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list