D3223: wireproto: introduce a reactor for client-side state

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Apr 11 01:26:40 UTC 2018


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  We have a nice state machine of sorts for reacting to server-side
  events. Now it is time to implement the client equivalent.
  
  We introduce a "clientreactor." It allows callers to request
  that commands be issued. It has multiple modes of operation to
  reflect what the underlying transport supports. e.g. for SSH,
  we can perform wire sends immediately but for HTTP we need to
  buffer sends until all command requests are received. In addition,
  SSH allows sending multiple requests as long as the connection is
  open. But HTTP/1.1 only allows sending request data once.
  
  For SSH, we'll have one reactor per connection. For HTTP, we'll
  have one reactor per HTTP request. But because code that calls
  wire protocol commands should not be aware of how the underlying
  transport works, this will all be abstracted away by the peer
  interface.
  
  Our crude HTTP peer has been updated to use the reactor instead
  of formulating frames directly. No behavior should have changed
  here and tests seem to confirm that.
  
  Basic unit tests for the reactor behavior have been added.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3223

AFFECTED FILES
  mercurial/httppeer.py
  mercurial/wireprotoframing.py
  tests/test-wireproto-clientreactor.py

CHANGE DETAILS

diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py
new file mode 100644
--- /dev/null
+++ b/tests/test-wireproto-clientreactor.py
@@ -0,0 +1,66 @@
+from __future__ import absolute_import
+
+import unittest
+
+from mercurial import (
+    error,
+    wireprotoframing as framing,
+)
+
+class SingleSendTests(unittest.TestCase):
+    """A reactor that can only send once rejects subsequent sends."""
+    def testbasic(self):
+        reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        self.assertEqual(request.state, 'pending')
+        self.assertEqual(action, 'noop')
+
+        action, meta = reactor.flushcommands()
+        self.assertEqual(action, 'sendframes')
+
+        for frame in meta['framegen']:
+            self.assertEqual(request.state, 'sending')
+
+        self.assertEqual(request.state, 'sent')
+
+        with self.assertRaisesRegexp(error.ProgrammingError,
+                                     'cannot issue new commands'):
+            reactor.callcommand(b'foo', {})
+
+        with self.assertRaisesRegexp(error.ProgrammingError,
+                                     'cannot issue new commands'):
+            reactor.callcommand(b'foo', {})
+
+class NoBufferTests(unittest.TestCase):
+    """A reactor without send buffering sends requests immediately."""
+    def testbasic(self):
+        reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'command1', {})
+        self.assertEqual(request.requestid, 1)
+        self.assertEqual(action, 'sendframes')
+
+        self.assertEqual(request.state, 'pending')
+
+        for frame in meta['framegen']:
+            self.assertEqual(request.state, 'sending')
+
+        self.assertEqual(request.state, 'sent')
+
+        action, meta = reactor.flushcommands()
+        self.assertEqual(action, 'noop')
+
+        # And we can send another command.
+        request, action, meta = reactor.callcommand(b'command2', {})
+        self.assertEqual(request.requestid, 3)
+        self.assertEqual(action, 'sendframes')
+
+        for frame in meta['framegen']:
+            self.assertEqual(request.state, 'sending')
+
+        self.assertEqual(request.state, 'sent')
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -11,6 +11,7 @@
 
 from __future__ import absolute_import
 
+import collections
 import struct
 
 from .i18n import _
@@ -876,3 +877,133 @@
 
     def _onframeerrored(self, frame):
         return self._makeerrorresult(_('server already errored'))
+
+class commandrequest(object):
+    """Represents a request to run a command."""
+
+    def __init__(self, requestid, name, args, datafh=None):
+        self.requestid = requestid
+        self.name = name
+        self.args = args
+        self.datafh = datafh
+        self.state = 'pending'
+
+class clientreactor(object):
+    """Holds state of a client issuing frame-based protocol requests.
+
+    This is like ``serverreactor`` but for client-side state.
+
+    Each instance is bound to the lifetime of a connection. For persistent
+    connection transports using e.g. TCP sockets and speaking the raw
+    framing protocol, there will be a single instance for the lifetime of
+    the TCP socket. For transports where there are multiple discrete
+    interactions (say tunneled within in HTTP request), there will be a
+    separate instance for each distinct interaction.
+    """
+    def __init__(self, hasmultiplesend=False, buffersends=True):
+        """Create a new instance.
+
+        ``hasmultiplesend`` indicates whether multiple sends are supported
+        by the transport. When True, it is possible to send commands immediately
+        instead of buffering until the caller signals an intent to finish a
+        send operation.
+
+        ``buffercommands`` indicates whether sends should be buffered until the
+        last request has been issued.
+        """
+        self._hasmultiplesend = hasmultiplesend
+        self._buffersends = buffersends
+
+        self._canissuecommands = True
+        self._cansend = True
+
+        self._nextrequestid = 1
+        # We only support a single outgoing stream for now.
+        self._outgoingstream = stream(1)
+        self._pendingrequests = collections.deque()
+        self._activerequests = {}
+
+    def callcommand(self, name, args, datafh=None):
+        """Request that a command be executed.
+
+        Receives the command name, a dict of arguments to pass to the command,
+        and an optional file object containing the raw data for the command.
+
+        Returns a 3-tuple of (request, action, action data).
+        """
+        if not self._canissuecommands:
+            raise error.ProgrammingError('cannot issue new commands')
+
+        requestid = self._nextrequestid
+        self._nextrequestid += 2
+
+        request = commandrequest(requestid, name, args, datafh=datafh)
+
+        if self._buffersends:
+            self._pendingrequests.append(request)
+            return request, 'noop', {}
+        else:
+            if not self._cansend:
+                raise error.ProgrammingError('sends cannot be performed on '
+                                             'this instance')
+
+            if not self._hasmultiplesend:
+                self._cansend = False
+                self._canissuecommands = False
+
+            return request, 'sendframes', {
+                'framegen': self._makecommandframes(request),
+            }
+
+    def flushcommands(self):
+        """Request that all queued commands be sent.
+
+        If any commands are buffered, this will instruct the caller to send
+        them over the wire. If no commands are buffered it instructs the client
+        to no-op.
+
+        If instances aren't configured for multiple sends, no new command
+        requests are allowed after this is called.
+        """
+        if not self._pendingrequests:
+            return 'noop', {}
+
+        if not self._cansend:
+            raise error.ProgrammingError('sends cannot be performed on this '
+                                         'instance')
+
+        # If the instance only allows sending once, mark that we have fired
+        # our one shot.
+        if not self._hasmultiplesend:
+            self._canissuecommands = False
+            self._cansend = False
+
+        def makeframes():
+            while self._pendingrequests:
+                request = self._pendingrequests.popleft()
+                for frame in self._makecommandframes(request):
+                    yield frame
+
+        return 'sendframes', {
+            'framegen': makeframes(),
+        }
+
+    def _makecommandframes(self, request):
+        """Emit frames to issue a command request.
+
+        As a side-effect, update request accounting to reflect its changed
+        state.
+        """
+        self._activerequests[request.requestid] = request
+        request.state = 'sending'
+
+        res = createcommandframes(self._outgoingstream,
+                                  request.requestid,
+                                  request.name,
+                                  request.args,
+                                  request.datafh)
+
+        for frame in res:
+            yield frame
+
+        request.state = 'sent'
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -515,11 +515,16 @@
 
         # TODO this should be part of a generic peer for the frame-based
         # protocol.
-        stream = wireprotoframing.stream(1)
-        frames = wireprotoframing.createcommandframes(stream, 1,
-                                                      name, args)
+        reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
+                                                 buffersends=True)
 
-        body = b''.join(map(bytes, frames))
+        request, action, meta = reactor.callcommand(name, args)
+        assert action == 'noop'
+
+        action, meta = reactor.flushcommands()
+        assert action == 'sendframes'
+
+        body = b''.join(map(bytes, meta['framegen']))
         req = self._requestbuilder(pycompat.strurl(url), body, headers)
         req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
 



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list