D4474: wireprotov2peer: stream decoded responses

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Sep 5 12:23:14 EDT 2018


indygreg updated this revision to Diff 10792.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D4474?vs=10771&id=10792

REVISION DETAIL
  https://phab.mercurial-scm.org/D4474

AFFECTED FILES
  mercurial/debugcommands.py
  mercurial/wireprotov2peer.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-command-capabilities.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-command-capabilities.t b/tests/test-wireproto-command-capabilities.t
--- a/tests/test-wireproto-command-capabilities.t
+++ b/tests/test-wireproto-command-capabilities.t
@@ -349,10 +349,7 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  response: [
-    {
-      b'status': b'ok'
-    },
+  response: gen[
     {
       b'commands': {
         b'branchmap': {
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -225,10 +225,7 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  response: [
-    {
-      b'status': b'ok'
-    },
+  response: gen[
     b'customreadonly bytes response'
   ]
 
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
--- a/mercurial/wireprotov2peer.py
+++ b/mercurial/wireprotov2peer.py
@@ -7,11 +7,12 @@
 
 from __future__ import absolute_import
 
+import threading
+
 from .i18n import _
 from . import (
     encoding,
     error,
-    util,
     wireprotoframing,
 )
 from .utils import (
@@ -34,20 +35,101 @@
     return b''.join(chunks)
 
 class commandresponse(object):
-    """Represents the response to a command request."""
+    """Represents the response to a command request.
+
+    Instances track the state of the command and hold its results.
+
+    An external entity is required to update the state of the object when
+    events occur.
+    """
 
     def __init__(self, requestid, command):
         self.requestid = requestid
         self.command = command
 
-        self.b = util.bytesio()
+        # Whether all remote input related to this command has been
+        # received.
+        self._inputcomplete = False
+
+        # We have a lock that is acquired when important object state is
+        # mutated. This is to prevent race conditions between 1 thread
+        # sending us new data and another consuming it.
+        self._lock = threading.RLock()
+
+        # An event is set when state of the object changes. This event
+        # is waited on by the generator emitting objects.
+        self._serviceable = threading.Event()
+
+        self._pendingevents = []
+        self._decoder = cborutil.bufferingdecoder()
+        self._seeninitial = False
+
+    def _oninputcomplete(self):
+        with self._lock:
+            self._inputcomplete = True
+            self._serviceable.set()
+
+    def _onresponsedata(self, data):
+        available, readcount, wanted = self._decoder.decode(data)
+
+        if not available:
+            return
+
+        with self._lock:
+            for o in self._decoder.getavailable():
+                if not self._seeninitial:
+                    self._handleinitial(o)
+                    continue
+
+                self._pendingevents.append(o)
+
+            self._serviceable.set()
 
-    def cborobjects(self):
-        """Obtain decoded CBOR objects from this response."""
-        self.b.seek(0)
+    def _handleinitial(self, o):
+        self._seeninitial = True
+        if o[b'status'] == 'ok':
+            return
+
+        atoms = [{'msg': o[b'error'][b'message']}]
+        if b'args' in o[b'error']:
+            atoms[0]['args'] = o[b'error'][b'args']
+
+        raise error.RepoError(formatrichmessage(atoms))
+
+    def objects(self):
+        """Obtained decoded objects from this response.
+
+        This is a generator of data structures that were decoded from the
+        command response.
+
+        Obtaining the next member of the generator may block due to waiting
+        on external data to become available.
 
-        for v in cborutil.decodeall(self.b.getvalue()):
-            yield v
+        If the server encountered an error in the middle of serving the data
+        or if another error occurred, an exception may be raised when
+        advancing the generator.
+        """
+        while True:
+            # TODO this can infinite loop if self._inputcomplete is never
+            # set. We likely want to tie the lifetime of this object/state
+            # to that of the background thread receiving frames and updating
+            # our state.
+            self._serviceable.wait(1.0)
+
+            with self._lock:
+                self._serviceable.clear()
+
+                # Make copies because objects could be mutated during
+                # iteration.
+                stop = self._inputcomplete
+                pending = list(self._pendingevents)
+                self._pendingevents[:] = []
+
+            for o in pending:
+                yield o
+
+            if stop:
+                break
 
 class clienthandler(object):
     """Object to handle higher-level client activities.
@@ -80,6 +162,8 @@
         rid = request.requestid
         self._requests[rid] = request
         self._futures[rid] = f
+        # TODO we need some kind of lifetime on response instances otherwise
+        # objects() may deadlock.
         self._responses[rid] = commandresponse(rid, command)
 
         return iter(())
@@ -119,8 +203,12 @@
         if action == 'error':
             e = error.RepoError(meta['message'])
 
+            if frame.requestid in self._responses:
+                self._responses[frame.requestid]._oninputcomplete()
+
             if frame.requestid in self._futures:
                 self._futures[frame.requestid].set_exception(e)
+                del self._futures[frame.requestid]
             else:
                 raise e
 
@@ -141,39 +229,32 @@
                 self._processresponsedata(frame, meta, response)
             except BaseException as e:
                 self._futures[frame.requestid].set_exception(e)
+                del self._futures[frame.requestid]
+                response._oninputcomplete()
         else:
             raise error.ProgrammingError(
                 'unhandled action from clientreactor: %s' % action)
 
     def _processresponsedata(self, frame, meta, response):
-        # This buffers all data until end of stream is received. This
-        # is bad for performance.
-        # TODO make response data streamable
-        response.b.write(meta['data'])
+        # This can raise. The caller can handle it.
+        response._onresponsedata(meta['data'])
 
         if meta['eos']:
-            # If the command has a decoder, resolve the future to the
-            # decoded value. Otherwise resolve to the rich response object.
-            decoder = COMMAND_DECODERS.get(response.command)
-
-            # TODO consider always resolving the overall status map.
-            if decoder:
-                objs = response.cborobjects()
-
-                overall = next(objs)
+            response._oninputcomplete()
+            del self._requests[frame.requestid]
 
-                if overall['status'] == 'ok':
-                    self._futures[frame.requestid].set_result(decoder(objs))
-                else:
-                    atoms = [{'msg': overall['error']['message']}]
-                    if 'args' in overall['error']:
-                        atoms[0]['args'] = overall['error']['args']
-                    e = error.RepoError(formatrichmessage(atoms))
-                    self._futures[frame.requestid].set_exception(e)
-            else:
-                self._futures[frame.requestid].set_result(response)
+        # If the command has a decoder, we wait until all input has been
+        # received before resolving the future. Otherwise we resolve the
+        # future immediately.
+        if frame.requestid not in self._futures:
+            return
 
-            del self._requests[frame.requestid]
+        if response.command not in COMMAND_DECODERS:
+            self._futures[frame.requestid].set_result(response.objects())
+            del self._futures[frame.requestid]
+        elif response._inputcomplete:
+            decoded = COMMAND_DECODERS[response.command](response.objects())
+            self._futures[frame.requestid].set_result(decoded)
             del self._futures[frame.requestid]
 
 def decodebranchmap(objs):
diff --git a/mercurial/debugcommands.py b/mercurial/debugcommands.py
--- a/mercurial/debugcommands.py
+++ b/mercurial/debugcommands.py
@@ -3240,7 +3240,7 @@
                     res = e.callcommand(command, args).result()
 
                 if isinstance(res, wireprotov2peer.commandresponse):
-                    val = list(res.cborobjects())
+                    val = res.objects()
                     ui.status(_('response: %s\n') %
                               stringutil.pprint(val, bprefix=True, indent=2))
                 else:



To: indygreg, #hg-reviewers
Cc: mjpieters, mercurial-devel


More information about the Mercurial-devel mailing list