D2870: wireproto: support for receiving multiple requests

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Mon Mar 19 20:01:14 EDT 2018


indygreg updated this revision to Diff 7146.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D2870?vs=7054&id=7146

REVISION DETAIL
  https://phab.mercurial-scm.org/D2870

AFFECTED FILES
  mercurial/wireprotoframing.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -196,6 +196,19 @@
             'message': b'expected command frame; got 2',
         })
 
+    def testunexpectedcommandargumentreceiving(self):
+        """Same as above but the command is receiving."""
+        results = list(sendframes(makereactor(), [
+            ffs(b'1 command-name have-data command'),
+            ffs(b'1 command-argument eoa ignored'),
+        ]))
+
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'received command argument frame for request that is '
+                       b'not expecting arguments: 1',
+        })
+
     def testunexpectedcommanddata(self):
         """Command argument frame when not running a command is an error."""
         result = self._sendsingleframe(makereactor(),
@@ -205,6 +218,19 @@
             'message': b'expected command frame; got 3',
         })
 
+    def testunexpectedcommanddatareceiving(self):
+        """Same as above except the command is receiving."""
+        results = list(sendframes(makereactor(), [
+            ffs(b'1 command-name have-args command'),
+            ffs(b'1 command-data eos ignored'),
+        ]))
+
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'received command data frame for request that is not '
+                       b'expecting data: 1',
+        })
+
     def testmissingcommandframeflags(self):
         """Command name frame must have flags set."""
         result = self._sendsingleframe(makereactor(),
@@ -214,19 +240,77 @@
             'message': b'missing frame flags on command frame',
         })
 
+    def testconflictingrequestid(self):
+        """Multiple fully serviced commands with same request ID is allowed."""
+        results = list(sendframes(makereactor(), [
+            ffs(b'1 command-name eos command'),
+            ffs(b'1 command-name eos command'),
+            ffs(b'1 command-name eos command'),
+        ]))
+        for i in range(3):
+            self.assertaction(results[i], 'runcommand')
+            self.assertEqual(results[i][1], {
+                'requestid': 1,
+                'command': b'command',
+                'args': {},
+                'data': None,
+            })
+
+    def testconflictingrequestid(self):
+        """Request ID for new command matching in-flight command is illegal."""
+        results = list(sendframes(makereactor(), [
+            ffs(b'1 command-name have-args command'),
+            ffs(b'1 command-name eos command'),
+        ]))
+
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'request with ID 1 already received',
+        })
+
+    def testinterleavedcommands(self):
+        results = list(sendframes(makereactor(), [
+            ffs(b'1 command-name have-args command1'),
+            ffs(b'3 command-name have-args command3'),
+            ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
+            ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
+            ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
+            ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
+        ]))
+
+        self.assertEqual([t[0] for t in results], [
+            'wantframe',
+            'wantframe',
+            'wantframe',
+            'wantframe',
+            'runcommand',
+            'runcommand',
+        ])
+
+        self.assertEqual(results[4][1], {
+            'requestid': 3,
+            'command': 'command3',
+            'args': {b'biz': b'baz', b'key': b'val'},
+            'data': None,
+        })
+        self.assertEqual(results[5][1], {
+            'requestid': 1,
+            'command': 'command1',
+            'args': {b'foo': b'bar', b'key1': b'val'},
+            'data': None,
+        })
+
     def testmissingargumentframe(self):
+        # This test attempts to test behavior when reactor has an incomplete
+        # command request waiting on argument data. But it doesn't handle that
+        # scenario yet. So this test does nothing of value.
         frames = [
             ffs(b'1 command-name have-args command'),
-            ffs(b'1 command-name 0 ignored'),
         ]
 
         results = list(sendframes(makereactor(), frames))
-        self.assertEqual(len(results), 2)
         self.assertaction(results[0], 'wantframe')
-        self.assertaction(results[1], 'error')
-        self.assertEqual(results[1][1], {
-            'message': b'expected command argument frame; got 1',
-        })
 
     def testincompleteargumentname(self):
         """Argument frame with incomplete name."""
@@ -259,17 +343,16 @@
         })
 
     def testmissingcommanddataframe(self):
+        # The reactor doesn't currently handle partially received commands.
+        # So this test is failing to do anything with request 1.
         frames = [
             ffs(b'1 command-name have-data command1'),
-            ffs(b'1 command-name eos command2'),
+            ffs(b'3 command-name eos command2'),
         ]
         results = list(sendframes(makereactor(), frames))
         self.assertEqual(len(results), 2)
         self.assertaction(results[0], 'wantframe')
-        self.assertaction(results[1], 'error')
-        self.assertEqual(results[1][1], {
-            'message': b'expected command data frame; got 1',
-        })
+        self.assertaction(results[1], 'runcommand')
 
     def testmissingcommanddataframeflags(self):
         frames = [
@@ -284,6 +367,18 @@
             'message': b'command data frame without flags',
         })
 
+    def testframefornonreceivingrequest(self):
+        """Receiving a frame for a command that is not receiving is illegal."""
+        results = list(sendframes(makereactor(), [
+            ffs(b'1 command-name eos command1'),
+            ffs(b'3 command-name have-data command3'),
+            ffs(b'1 command-argument eoa ignored'),
+        ]))
+        self.assertaction(results[2], 'error')
+        self.assertEqual(results[2][1], {
+            'message': b'received frame for request that is not receiving: 1',
+        })
+
     def testsimpleresponse(self):
         """Bytes response to command sends result frames."""
         reactor = makereactor()
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -401,12 +401,12 @@
   s>     Server: testing stub value\r\n
   s>     Date: $HTTP_DATE$\r\n
   s>     Content-Type: text/plain\r\n
-  s>     Content-Length: 332\r\n
+  s>     Content-Length: 322\r\n
   s>     \r\n
   s>     received: 1 2 1 command1\n
-  s>     ["wantframe", {"state": "command-receiving-args"}]\n
+  s>     ["wantframe", {"state": "command-receiving"}]\n
   s>     received: 2 0 1 \x03\x00\x04\x00fooval1\n
-  s>     ["wantframe", {"state": "command-receiving-args"}]\n
+  s>     ["wantframe", {"state": "command-receiving"}]\n
   s>     received: 2 2 1 \x04\x00\x03\x00bar1val\n
   s>     ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null, "requestid": 1}]\n
   s>     received: <no frame>\n
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -327,6 +327,23 @@
 
     noop
        Indicates no additional action is required.
+
+    Known Issues
+    ------------
+
+    There are no limits to the number of partially received commands or their
+    size. A malicious client could stream command request data and exhaust the
+    server's memory.
+
+    Partially received commands are not acted upon when end of input is
+    reached. Should the server error if it receives a partial request?
+    Should the client send a message to abort a partially transmitted request
+    to facilitate graceful shutdown?
+
+    Active requests that haven't been responded to aren't tracked. This means
+    that if we receive a command and instruct its dispatch, another command
+    with its request ID can come in over the wire and there will be a race
+    between who responds to what.
     """
 
     def __init__(self, deferoutput=False):
@@ -342,14 +359,8 @@
         self._deferoutput = deferoutput
         self._state = 'idle'
         self._bufferedframegens = []
-        self._activerequestid = None
-        self._activecommand = None
-        self._activeargs = None
-        self._activedata = None
-        self._expectingargs = None
-        self._expectingdata = None
-        self._activeargname = None
-        self._activeargchunks = None
+        # request id -> dict of commands that are actively being received.
+        self._receivingcommands = {}
 
     def onframerecv(self, requestid, frametype, frameflags, payload):
         """Process a frame that has been received off the wire.
@@ -359,8 +370,7 @@
         """
         handlers = {
             'idle': self._onframeidle,
-            'command-receiving-args': self._onframereceivingargs,
-            'command-receiving-data': self._onframereceivingdata,
+            'command-receiving': self._onframecommandreceiving,
             'errored': self._onframeerrored,
         }
 
@@ -391,6 +401,8 @@
         No more frames will be received. All pending activity should be
         completed.
         """
+        # TODO should we do anything about in-flight commands?
+
         if not self._deferoutput or not self._bufferedframegens:
             return 'noop', {}
 
@@ -414,12 +426,20 @@
             'message': msg,
         }
 
-    def _makeruncommandresult(self):
+    def _makeruncommandresult(self, requestid):
+        entry = self._receivingcommands[requestid]
+        del self._receivingcommands[requestid]
+
+        if self._receivingcommands:
+            self._state = 'command-receiving'
+        else:
+            self._state = 'idle'
+
         return 'runcommand', {
-            'requestid': self._activerequestid,
-            'command': self._activecommand,
-            'args': self._activeargs,
-            'data': self._activedata.getvalue() if self._activedata else None,
+            'requestid': requestid,
+            'command': entry['command'],
+            'args': entry['args'],
+            'data': entry['data'].getvalue() if entry['data'] else None,
         }
 
     def _makewantframeresult(self):
@@ -435,34 +455,76 @@
             return self._makeerrorresult(
                 _('expected command frame; got %d') % frametype)
 
-        self._activerequestid = requestid
-        self._activecommand = payload
-        self._activeargs = {}
-        self._activedata = None
+        if requestid in self._receivingcommands:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('request with ID %d already received') % requestid)
+
+        expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
+        expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+
+        self._receivingcommands[requestid] = {
+            'command': payload,
+            'args': {},
+            'data': None,
+            'expectingargs': expectingargs,
+            'expectingdata': expectingdata,
+        }
 
         if frameflags & FLAG_COMMAND_NAME_EOS:
-            return self._makeruncommandresult()
-
-        self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
-        self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+            return self._makeruncommandresult(requestid)
 
-        if self._expectingargs:
-            self._state = 'command-receiving-args'
-            return self._makewantframeresult()
-        elif self._expectingdata:
-            self._activedata = util.bytesio()
-            self._state = 'command-receiving-data'
+        if expectingargs or expectingdata:
+            self._state = 'command-receiving'
             return self._makewantframeresult()
         else:
             self._state = 'errored'
             return self._makeerrorresult(_('missing frame flags on '
                                            'command frame'))
 
-    def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
-        if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
+    def _onframecommandreceiving(self, requestid, frametype, frameflags,
+                                 payload):
+        # It could be a new command request. Process it as such.
+        if frametype == FRAME_TYPE_COMMAND_NAME:
+            return self._onframeidle(requestid, frametype, frameflags, payload)
+
+        # All other frames should be related to a command that is currently
+        # receiving.
+        if requestid not in self._receivingcommands:
             self._state = 'errored'
-            return self._makeerrorresult(_('expected command argument '
-                                           'frame; got %d') % frametype)
+            return self._makeerrorresult(
+                _('received frame for request that is not receiving: %d') %
+                  requestid)
+
+        entry = self._receivingcommands[requestid]
+
+        if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
+            if not entry['expectingargs']:
+                self._state = 'errored'
+                return self._makeerrorresult(_(
+                    'received command argument frame for request that is not '
+                    'expecting arguments: %d') % requestid)
+
+            return self._handlecommandargsframe(requestid, entry, frametype,
+                                                frameflags, payload)
+
+        elif frametype == FRAME_TYPE_COMMAND_DATA:
+            if not entry['expectingdata']:
+                self._state = 'errored'
+                return self._makeerrorresult(_(
+                    'received command data frame for request that is not '
+                    'expecting data: %d') % requestid)
+
+            if entry['data'] is None:
+                entry['data'] = util.bytesio()
+
+            return self._handlecommanddataframe(requestid, entry, frametype,
+                                                frameflags, payload)
+
+    def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
+                                payload):
+        # The frame and state of command should have already been validated.
+        assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
 
         offset = 0
         namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
@@ -483,10 +545,6 @@
         # and wait for the next frame.
         if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
             raise error.ProgrammingError('not yet implemented')
-            self._activeargname = argname
-            self._activeargchunks = [argvalue]
-            self._state = 'command-arg-continuation'
-            return self._makewantframeresult()
 
         # Common case: the argument value is completely contained in this
         # frame.
@@ -496,36 +554,30 @@
             return self._makeerrorresult(_('malformed argument frame: '
                                            'partial argument value'))
 
-        self._activeargs[argname] = argvalue
+        entry['args'][argname] = argvalue
 
         if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
-            if self._expectingdata:
-                self._state = 'command-receiving-data'
-                self._activedata = util.bytesio()
+            if entry['expectingdata']:
                 # TODO signal request to run a command once we don't
                 # buffer data frames.
                 return self._makewantframeresult()
             else:
-                self._state = 'waiting'
-                return self._makeruncommandresult()
+                return self._makeruncommandresult(requestid)
         else:
             return self._makewantframeresult()
 
-    def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
-        if frametype != FRAME_TYPE_COMMAND_DATA:
-            self._state = 'errored'
-            return self._makeerrorresult(_('expected command data frame; '
-                                           'got %d') % frametype)
+    def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
+                                payload):
+        assert frametype == FRAME_TYPE_COMMAND_DATA
 
         # TODO support streaming data instead of buffering it.
-        self._activedata.write(payload)
+        entry['data'].write(payload)
 
         if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
             return self._makewantframeresult()
         elif frameflags & FLAG_COMMAND_DATA_EOS:
-            self._activedata.seek(0)
-            self._state = 'idle'
-            return self._makeruncommandresult()
+            entry['data'].seek(0)
+            return self._makeruncommandresult(requestid)
         else:
             self._state = 'errored'
             return self._makeerrorresult(_('command data frame without '



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list