D4474: wireprotov2peer: stream decoded responses

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Tue Sep 4 18:30:25 UTC 2018


indygreg created this revision.
Herald added subscribers: mercurial-devel, mjpieters.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  Previously, wire protocol version 2 would buffer all response data.
  Only once all data was received did we CBOR decode it and resolve
  the future associated with the command. This was obviously not
  desirable. In future commits that introduce large response payloads,
  this caused significant memory bloat and slowed down client
  operations due to waiting on the server.
  
  This commit refactors the response handling code so that response
  data can be streamed.
  
  Command response objects now contain a buffered CBOR decoder. As
  new data arrives, it is fed into the decoder. Decoded objects are
  made available to the generator as they are decoded.
  
  Because there is a separate thread processing incoming frames and
  feeding data into the response object, there is the potential for
  race conditions when mutating response objects. So a lock has been
  added to guard access to critical state variables.
  
  Because the generator emitting decoded objects needs to wait on
  those objects to become available, we've added an Event for the
  generator to wait on so it doesn't busy loop. This does mean
  there is the potential for deadlocks. And I'm pretty sure they can
  occur in some scenarios. We already have a handful of TODOs around
  this. But I've added some more. Fixing this will likely require
  moving the background thread receiving frames into clienthandler.
  We likely would have done this anyway when implementing the client
  bits for the SSH transport.
  
  Test output changes because the initial CBOR map holding the overall
  response state is now always handled internally by the response
  object.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D4474

AFFECTED FILES
  mercurial/debugcommands.py
  mercurial/wireprotov2peer.py
  tests/test-http-api-httpv2.t
  tests/test-wireproto-command-capabilities.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-command-capabilities.t b/tests/test-wireproto-command-capabilities.t
--- a/tests/test-wireproto-command-capabilities.t
+++ b/tests/test-wireproto-command-capabilities.t
@@ -349,10 +349,7 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  response: [
-    {
-      b'status': b'ok'
-    },
+  response: gen[
     {
       b'commands': {
         b'branchmap': {
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -225,10 +225,7 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  response: [
-    {
-      b'status': b'ok'
-    },
+  response: gen[
     b'customreadonly bytes response'
   ]
 
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
--- a/mercurial/wireprotov2peer.py
+++ b/mercurial/wireprotov2peer.py
@@ -7,11 +7,12 @@
 
 from __future__ import absolute_import
 
+import threading
+
 from .i18n import _
 from . import (
     encoding,
     error,
-    util,
     wireprotoframing,
 )
 from .utils import (
@@ -34,20 +35,101 @@
     return b''.join(chunks)
 
 class commandresponse(object):
-    """Represents the response to a command request."""
+    """Represents the response to a command request.
+
+    Instances track the state of the command and hold its results.
+
+    An external entity is required to update the state of the object when
+    events occur.
+    """
 
     def __init__(self, requestid, command):
         self.requestid = requestid
         self.command = command
 
-        self.b = util.bytesio()
+        # Whether all remote input related to this command has been
+        # received.
+        self._inputcomplete = False
+
+        # We have a lock that is acquired when important object state is
+        # mutated. This is to prevent race conditions between 1 thread
+        # sending us new data and another consuming it.
+        self._lock = threading.RLock()
+
+        # An event is set when state of the object changes. This event
+        # is waited on by the generator emitting objects.
+        self._serviceable = threading.Event()
+
+        self._pendingevents = []
+        self._decoder = cborutil.bufferingdecoder()
+        self._seeninitial = False
+
+    def _oninputcomplete(self):
+        with self._lock:
+            self._inputcomplete = True
+            self._serviceable.set()
+
+    def _onresponsedata(self, data):
+        available, readcount, wanted = self._decoder.decode(data)
+
+        if not available:
+            return
+
+        with self._lock:
+            for o in self._decoder.getavailable():
+                if not self._seeninitial:
+                    self._handleinitial(o)
+                    continue
+
+                self._pendingevents.append(o)
+
+            self._serviceable.set()
 
-    def cborobjects(self):
-        """Obtain decoded CBOR objects from this response."""
-        self.b.seek(0)
+    def _handleinitial(self, o):
+        self._seeninitial = True
+        if o[b'status'] == 'ok':
+            return
+
+        atoms = [{'msg': o[b'error'][b'message']}]
+        if b'args' in o[b'error']:
+            atoms[0]['args'] = o[b'error'][b'args']
+
+        raise error.RepoError(formatrichmessage(atoms))
+
+    def objects(self):
+        """Obtained decoded objects from this response.
+
+        This is a generator of data structures that were decoded from the
+        command response.
+
+        Obtaining the next member of the generator may block due to waiting
+        on external data to become available.
 
-        for v in cborutil.decodeall(self.b.getvalue()):
-            yield v
+        If the server encountered an error in the middle of serving the data
+        or if another error occurred, an exception may be raised when
+        advancing the generator.
+        """
+        while True:
+            # TODO this can infinite loop if self._inputcomplete is never
+            # set. We likely want to tie the lifetime of this object/state
+            # to that of the background thread receiving frames and updating
+            # our state.
+            self._serviceable.wait(1.0)
+
+            with self._lock:
+                self._serviceable.clear()
+
+                # Make copies because objects could be mutated during
+                # iteration.
+                stop = self._inputcomplete
+                pending = list(self._pendingevents)
+                self._pendingevents[:] = []
+
+            for o in pending:
+                yield o
+
+            if stop:
+                break
 
 class clienthandler(object):
     """Object to handle higher-level client activities.
@@ -80,6 +162,8 @@
         rid = request.requestid
         self._requests[rid] = request
         self._futures[rid] = f
+        # TODO we need some kind of lifetime on response instances otherwise
+        # objects() may deadlock.
         self._responses[rid] = commandresponse(rid, command)
 
         return iter(())
@@ -119,8 +203,12 @@
         if action == 'error':
             e = error.RepoError(meta['message'])
 
+            if frame.requestid in self._responses:
+                self._responses[frame.requestid]._oninputcomplete()
+
             if frame.requestid in self._futures:
                 self._futures[frame.requestid].set_exception(e)
+                del self._futures[frame.requestid]
             else:
                 raise e
 
@@ -141,39 +229,32 @@
                 self._processresponsedata(frame, meta, response)
             except BaseException as e:
                 self._futures[frame.requestid].set_exception(e)
+                del self._futures[frame.requestid]
+                response._oninputcomplete()
         else:
             raise error.ProgrammingError(
                 'unhandled action from clientreactor: %s' % action)
 
     def _processresponsedata(self, frame, meta, response):
-        # This buffers all data until end of stream is received. This
-        # is bad for performance.
-        # TODO make response data streamable
-        response.b.write(meta['data'])
+        # This can raise. The caller can handle it.
+        response._onresponsedata(meta['data'])
 
         if meta['eos']:
-            # If the command has a decoder, resolve the future to the
-            # decoded value. Otherwise resolve to the rich response object.
-            decoder = COMMAND_DECODERS.get(response.command)
-
-            # TODO consider always resolving the overall status map.
-            if decoder:
-                objs = response.cborobjects()
-
-                overall = next(objs)
+            response._oninputcomplete()
+            del self._requests[frame.requestid]
 
-                if overall['status'] == 'ok':
-                    self._futures[frame.requestid].set_result(decoder(objs))
-                else:
-                    atoms = [{'msg': overall['error']['message']}]
-                    if 'args' in overall['error']:
-                        atoms[0]['args'] = overall['error']['args']
-                    e = error.RepoError(formatrichmessage(atoms))
-                    self._futures[frame.requestid].set_exception(e)
-            else:
-                self._futures[frame.requestid].set_result(response)
+        # If the command has a decoder, we wait until all input has been
+        # received before resolving the future. Otherwise we resolve the
+        # future immediately.
+        if frame.requestid not in self._futures:
+            return
 
-            del self._requests[frame.requestid]
+        if response.command not in COMMAND_DECODERS:
+            self._futures[frame.requestid].set_result(response.objects())
+            del self._futures[frame.requestid]
+        elif response._inputcomplete:
+            decoded = COMMAND_DECODERS[response.command](response.objects())
+            self._futures[frame.requestid].set_result(decoded)
             del self._futures[frame.requestid]
 
 def decodebranchmap(objs):
diff --git a/mercurial/debugcommands.py b/mercurial/debugcommands.py
--- a/mercurial/debugcommands.py
+++ b/mercurial/debugcommands.py
@@ -3240,7 +3240,7 @@
                     res = e.callcommand(command, args).result()
 
                 if isinstance(res, wireprotov2peer.commandresponse):
-                    val = list(res.cborobjects())
+                    val = res.objects()
                     ui.status(_('response: %s\n') %
                               stringutil.pprint(val, bprefix=True, indent=2))
                 else:



To: indygreg, #hg-reviewers
Cc: mjpieters, mercurial-devel


More information about the Mercurial-devel mailing list