D3223: wireproto: introduce a reactor for client-side state

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Apr 11 13:00:09 EDT 2018


This revision was automatically updated to reflect the committed changes.
Closed by commit rHG01361be9e2dc: wireproto: introduce a reactor for client-side state (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3223?vs=7941&id=7989

REVISION DETAIL
  https://phab.mercurial-scm.org/D3223

AFFECTED FILES
  mercurial/httppeer.py
  mercurial/wireprotoframing.py
  tests/test-wireproto-clientreactor.py

CHANGE DETAILS

diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py
new file mode 100644
--- /dev/null
+++ b/tests/test-wireproto-clientreactor.py
@@ -0,0 +1,66 @@
+from __future__ import absolute_import
+
+import unittest
+
+from mercurial import (
+    error,
+    wireprotoframing as framing,
+)
+
+class SingleSendTests(unittest.TestCase):
+    """A reactor that can only send once rejects subsequent sends."""
+    def testbasic(self):
+        reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
+
+        request, action, meta = reactor.callcommand(b'foo', {})
+        self.assertEqual(request.state, 'pending')
+        self.assertEqual(action, 'noop')
+
+        action, meta = reactor.flushcommands()
+        self.assertEqual(action, 'sendframes')
+
+        for frame in meta['framegen']:
+            self.assertEqual(request.state, 'sending')
+
+        self.assertEqual(request.state, 'sent')
+
+        with self.assertRaisesRegexp(error.ProgrammingError,
+                                     'cannot issue new commands'):
+            reactor.callcommand(b'foo', {})
+
+        with self.assertRaisesRegexp(error.ProgrammingError,
+                                     'cannot issue new commands'):
+            reactor.callcommand(b'foo', {})
+
+class NoBufferTests(unittest.TestCase):
+    """A reactor without send buffering sends requests immediately."""
+    def testbasic(self):
+        reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
+
+        request, action, meta = reactor.callcommand(b'command1', {})
+        self.assertEqual(request.requestid, 1)
+        self.assertEqual(action, 'sendframes')
+
+        self.assertEqual(request.state, 'pending')
+
+        for frame in meta['framegen']:
+            self.assertEqual(request.state, 'sending')
+
+        self.assertEqual(request.state, 'sent')
+
+        action, meta = reactor.flushcommands()
+        self.assertEqual(action, 'noop')
+
+        # And we can send another command.
+        request, action, meta = reactor.callcommand(b'command2', {})
+        self.assertEqual(request.requestid, 3)
+        self.assertEqual(action, 'sendframes')
+
+        for frame in meta['framegen']:
+            self.assertEqual(request.state, 'sending')
+
+        self.assertEqual(request.state, 'sent')
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -11,6 +11,7 @@
 
 from __future__ import absolute_import
 
+import collections
 import struct
 
 from .i18n import _
@@ -876,3 +877,133 @@
 
     def _onframeerrored(self, frame):
         return self._makeerrorresult(_('server already errored'))
+
+class commandrequest(object):
+    """Represents a request to run a command."""
+
+    def __init__(self, requestid, name, args, datafh=None):
+        self.requestid = requestid
+        self.name = name
+        self.args = args
+        self.datafh = datafh
+        self.state = 'pending'
+
+class clientreactor(object):
+    """Holds state of a client issuing frame-based protocol requests.
+
+    This is like ``serverreactor`` but for client-side state.
+
+    Each instance is bound to the lifetime of a connection. For persistent
+    connection transports using e.g. TCP sockets and speaking the raw
+    framing protocol, there will be a single instance for the lifetime of
+    the TCP socket. For transports where there are multiple discrete
+    interactions (say tunneled within in HTTP request), there will be a
+    separate instance for each distinct interaction.
+    """
+    def __init__(self, hasmultiplesend=False, buffersends=True):
+        """Create a new instance.
+
+        ``hasmultiplesend`` indicates whether multiple sends are supported
+        by the transport. When True, it is possible to send commands immediately
+        instead of buffering until the caller signals an intent to finish a
+        send operation.
+
+        ``buffercommands`` indicates whether sends should be buffered until the
+        last request has been issued.
+        """
+        self._hasmultiplesend = hasmultiplesend
+        self._buffersends = buffersends
+
+        self._canissuecommands = True
+        self._cansend = True
+
+        self._nextrequestid = 1
+        # We only support a single outgoing stream for now.
+        self._outgoingstream = stream(1)
+        self._pendingrequests = collections.deque()
+        self._activerequests = {}
+
+    def callcommand(self, name, args, datafh=None):
+        """Request that a command be executed.
+
+        Receives the command name, a dict of arguments to pass to the command,
+        and an optional file object containing the raw data for the command.
+
+        Returns a 3-tuple of (request, action, action data).
+        """
+        if not self._canissuecommands:
+            raise error.ProgrammingError('cannot issue new commands')
+
+        requestid = self._nextrequestid
+        self._nextrequestid += 2
+
+        request = commandrequest(requestid, name, args, datafh=datafh)
+
+        if self._buffersends:
+            self._pendingrequests.append(request)
+            return request, 'noop', {}
+        else:
+            if not self._cansend:
+                raise error.ProgrammingError('sends cannot be performed on '
+                                             'this instance')
+
+            if not self._hasmultiplesend:
+                self._cansend = False
+                self._canissuecommands = False
+
+            return request, 'sendframes', {
+                'framegen': self._makecommandframes(request),
+            }
+
+    def flushcommands(self):
+        """Request that all queued commands be sent.
+
+        If any commands are buffered, this will instruct the caller to send
+        them over the wire. If no commands are buffered it instructs the client
+        to no-op.
+
+        If instances aren't configured for multiple sends, no new command
+        requests are allowed after this is called.
+        """
+        if not self._pendingrequests:
+            return 'noop', {}
+
+        if not self._cansend:
+            raise error.ProgrammingError('sends cannot be performed on this '
+                                         'instance')
+
+        # If the instance only allows sending once, mark that we have fired
+        # our one shot.
+        if not self._hasmultiplesend:
+            self._canissuecommands = False
+            self._cansend = False
+
+        def makeframes():
+            while self._pendingrequests:
+                request = self._pendingrequests.popleft()
+                for frame in self._makecommandframes(request):
+                    yield frame
+
+        return 'sendframes', {
+            'framegen': makeframes(),
+        }
+
+    def _makecommandframes(self, request):
+        """Emit frames to issue a command request.
+
+        As a side-effect, update request accounting to reflect its changed
+        state.
+        """
+        self._activerequests[request.requestid] = request
+        request.state = 'sending'
+
+        res = createcommandframes(self._outgoingstream,
+                                  request.requestid,
+                                  request.name,
+                                  request.args,
+                                  request.datafh)
+
+        for frame in res:
+            yield frame
+
+        request.state = 'sent'
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -515,11 +515,16 @@
 
         # TODO this should be part of a generic peer for the frame-based
         # protocol.
-        stream = wireprotoframing.stream(1)
-        frames = wireprotoframing.createcommandframes(stream, 1,
-                                                      name, args)
+        reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
+                                                 buffersends=True)
 
-        body = b''.join(map(bytes, frames))
+        request, action, meta = reactor.callcommand(name, args)
+        assert action == 'noop'
+
+        action, meta = reactor.flushcommands()
+        assert action == 'sendframes'
+
+        body = b''.join(map(bytes, meta['framegen']))
         req = self._requestbuilder(pycompat.strurl(url), body, headers)
         req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
 



To: indygreg, #hg-reviewers, durin42
Cc: mercurial-devel


More information about the Mercurial-devel mailing list