D3379: wireprotov2: move response handling out of httppeer

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Mon Apr 16 19:09:48 EDT 2018


This revision was automatically updated to reflect the committed changes.
Closed by commit rHGa656cba08a04: wireprotov2: move response handling out of httppeer (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3379?vs=8291&id=8322

REVISION DETAIL
  https://phab.mercurial-scm.org/D3379

AFFECTED FILES
  mercurial/httppeer.py
  mercurial/wireprotov2peer.py
  tests/test-wireproto-command-known.t
  tests/test-wireproto-command-pushkey.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-command-pushkey.t b/tests/test-wireproto-command-pushkey.t
--- a/tests/test-wireproto-command-pushkey.t
+++ b/tests/test-wireproto-command-pushkey.t
@@ -53,7 +53,7 @@
   received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) (glob)
   s>     0\r\n
   s>     \r\n
-  response: []
+  response: [True]
 
   $ sendhttpv2peer << EOF
   > command listkeys
diff --git a/tests/test-wireproto-command-known.t b/tests/test-wireproto-command-known.t
--- a/tests/test-wireproto-command-known.t
+++ b/tests/test-wireproto-command-known.t
@@ -50,7 +50,7 @@
   received frame(size=1; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor)
   s>     0\r\n
   s>     \r\n
-  response: []
+  response: [b'']
 
 Single known node works
 
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
new file mode 100644
--- /dev/null
+++ b/mercurial/wireprotov2peer.py
@@ -0,0 +1,135 @@
+# wireprotov2peer.py - client side code for wire protocol version 2
+#
+# Copyright 2018 Gregory Szorc <gregory.szorc at gmail.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+from .i18n import _
+from .thirdparty import (
+    cbor,
+)
+from . import (
+    error,
+    util,
+    wireprotoframing,
+)
+
+class clienthandler(object):
+    """Object to handle higher-level client activities.
+
+    The ``clientreactor`` is used to hold low-level state about the frame-based
+    protocol, such as which requests and streams are active. This type is used
+    for higher-level operations, such as reading frames from a socket, exposing
+    and managing a higher-level primitive for representing command responses,
+    etc. This class is what peers should probably use to bridge wire activity
+    with the higher-level peer API.
+    """
+
+    def __init__(self, ui, clientreactor):
+        self._ui = ui
+        self._reactor = clientreactor
+        self._requests = {}
+        self._futures = {}
+        self._responses = {}
+
+    def callcommand(self, command, args, f):
+        """Register a request to call a command.
+
+        Returns an iterable of frames that should be sent over the wire.
+        """
+        request, action, meta = self._reactor.callcommand(command, args)
+
+        if action != 'noop':
+            raise error.ProgrammingError('%s not yet supported' % action)
+
+        rid = request.requestid
+        self._requests[rid] = request
+        self._futures[rid] = f
+        self._responses[rid] = {
+            'cbor': False,
+            'b': util.bytesio(),
+        }
+
+        return iter(())
+
+    def flushcommands(self):
+        """Flush all queued commands.
+
+        Returns an iterable of frames that should be sent over the wire.
+        """
+        action, meta = self._reactor.flushcommands()
+
+        if action != 'sendframes':
+            raise error.ProgrammingError('%s not yet supported' % action)
+
+        return meta['framegen']
+
+    def readframe(self, fh):
+        """Attempt to read and process a frame.
+
+        Returns None if no frame was read. Presumably this means EOF.
+        """
+        frame = wireprotoframing.readframe(fh)
+        if frame is None:
+            # TODO tell reactor?
+            return
+
+        self._ui.note(_('received %r\n') % frame)
+        self._processframe(frame)
+
+        return True
+
+    def _processframe(self, frame):
+        """Process a single read frame."""
+
+        action, meta = self._reactor.onframerecv(frame)
+
+        if action == 'error':
+            e = error.RepoError(meta['message'])
+
+            if frame.requestid in self._futures:
+                self._futures[frame.requestid].set_exception(e)
+            else:
+                raise e
+
+        if frame.requestid not in self._requests:
+            raise error.ProgrammingError(
+                'received frame for unknown request; this is either a bug in '
+                'the clientreactor not screening for this or this instance was '
+                'never told about this request: %r' % frame)
+
+        response = self._responses[frame.requestid]
+
+        if action == 'responsedata':
+            response['b'].write(meta['data'])
+
+            if meta['cbor']:
+                response['cbor'] = True
+
+            if meta['eos']:
+                if meta['cbor']:
+                    # If CBOR, decode every object.
+                    b = response['b']
+
+                    size = b.tell()
+                    b.seek(0)
+
+                    decoder = cbor.CBORDecoder(b)
+
+                    result = []
+                    while b.tell() < size:
+                        result.append(decoder.decode())
+                else:
+                    result = [response['b'].getvalue()]
+
+                self._futures[frame.requestid].set_result(result)
+
+                del self._requests[frame.requestid]
+                del self._futures[frame.requestid]
+
+        else:
+            raise error.ProgrammingError(
+                'unhandled action from clientreactor: %s' % action)
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -13,7 +13,6 @@
 import os
 import socket
 import struct
-import sys
 import tempfile
 import weakref
 
@@ -36,6 +35,7 @@
     wireprotoframing,
     wireprototypes,
     wireprotov1peer,
+    wireprotov2peer,
     wireprotov2server,
 )
 
@@ -522,27 +522,20 @@
     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
                                              buffersends=True)
 
+    handler = wireprotov2peer.clienthandler(ui, reactor)
+
     url = '%s/%s' % (apiurl, permission)
 
     if len(requests) > 1:
         url += '/multirequest'
     else:
         url += '/%s' % requests[0][0]
 
-    # Request ID to (request, future)
-    requestmap = {}
-
     for command, args, f in requests:
-        request, action, meta = reactor.callcommand(command, args)
-        assert action == 'noop'
-
-        requestmap[request.requestid] = (request, f)
-
-    action, meta = reactor.flushcommands()
-    assert action == 'sendframes'
+        assert not list(handler.callcommand(command, args, f))
 
     # TODO stream this.
-    body = b''.join(map(bytes, meta['framegen']))
+    body = b''.join(map(bytes, handler.flushcommands()))
 
     # TODO modify user-agent to reflect v2
     headers = {
@@ -564,7 +557,7 @@
         ui.traceback()
         raise IOError(None, e)
 
-    return reactor, requestmap, res
+    return handler, res
 
 class queuedcommandfuture(pycompat.futures.Future):
     """Wraps result() on command futures to trigger submission on call."""
@@ -684,17 +677,15 @@
             'pull': 'ro',
         }[permissions.pop()]
 
-        reactor, requests, resp = sendv2request(
+        handler, resp = sendv2request(
             self._ui, self._opener, self._requestbuilder, self._apiurl,
             permission, calls)
 
         # TODO we probably want to validate the HTTP code, media type, etc.
 
         self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
         self._responsef = self._responseexecutor.submit(self._handleresponse,
-                                                        reactor,
-                                                        requests,
-                                                        resp)
+                                                        handler, resp)
 
     def close(self):
         if self._closed:
@@ -723,62 +714,11 @@
 
             self._futures = None
 
-    def _handleresponse(self, reactor, requests, resp):
+    def _handleresponse(self, handler, resp):
         # Called in a thread to read the response.
 
-        results = {k: [] for k in requests}
-
-        while True:
-            frame = wireprotoframing.readframe(resp)
-            if frame is None:
-                break
-
-            self._ui.note(_('received %r\n') % frame)
-
-            # Guard against receiving a frame with a request ID that we
-            # didn't issue. This should never happen.
-            request, f = requests.get(frame.requestid, [None, None])
-
-            action, meta = reactor.onframerecv(frame)
-
-            if action == 'responsedata':
-                assert request.requestid == meta['request'].requestid
-
-                result = results[request.requestid]
-
-                if meta['cbor']:
-                    payload = util.bytesio(meta['data'])
-
-                    decoder = cbor.CBORDecoder(payload)
-                    while payload.tell() + 1 < len(meta['data']):
-                        try:
-                            result.append(decoder.decode())
-                        except Exception:
-                            pycompat.future_set_exception_info(
-                                f, sys.exc_info()[1:])
-                            continue
-                else:
-                    result.append(meta['data'])
-
-                if meta['eos']:
-                    f.set_result(result)
-                    del results[request.requestid]
-
-            elif action == 'error':
-                e = error.RepoError(meta['message'])
-
-                if f:
-                    f.set_exception(e)
-                else:
-                    raise e
-
-            else:
-                e = error.ProgrammingError('unhandled action: %s' % action)
-
-                if f:
-                    f.set_exception(e)
-                else:
-                    raise e
+        while handler.readframe(resp):
+            pass
 
 # TODO implement interface for version 2 peers
 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,



To: indygreg, #hg-reviewers, durin42
Cc: mercurial-devel


More information about the Mercurial-devel mailing list