D3388: wireprotov2: add support for more response types

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Sun Apr 15 13:37:40 EDT 2018


indygreg updated this revision to Diff 8304.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3388?vs=8300&id=8304

REVISION DETAIL
  https://phab.mercurial-scm.org/D3388

AFFECTED FILES
  mercurial/wireprotoframing.py
  mercurial/wireprototypes.py
  mercurial/wireprotov2server.py

CHANGE DETAILS

diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py
--- a/mercurial/wireprotov2server.py
+++ b/mercurial/wireprotov2server.py
@@ -306,6 +306,15 @@
         action, meta = reactor.oncommandresponseready(outstream,
                                                       command['requestid'],
                                                       encoded)
+    elif isinstance(rsp, wireprototypes.v2streamingresponse):
+        action, meta = reactor.oncommandresponsereadygen(outstream,
+                                                         command['requestid'],
+                                                         rsp.gen)
+    elif isinstance(rsp, wireprototypes.v2errorresponse):
+        action, meta = reactor.oncommanderror(outstream,
+                                              command['requestid'],
+                                              rsp.message,
+                                              rsp.args)
     else:
         action, meta = reactor.onservererror(
             _('unhandled response type from wire proto command'))
diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -106,6 +106,22 @@
     def __init__(self, v):
         self.value = v
 
+class v2errorresponse(object):
+    """Represents a command error for version 2 transports."""
+    def __init__(self, message, args=None):
+        self.message = message
+        self.args = args
+
+class v2streamingresponse(object):
+    """A response whose data is supplied by a generator.
+
+    The generator can either consist of data structures to CBOR
+    encode or a stream of already-encoded bytes.
+    """
+    def __init__(self, gen, compressible=True):
+        self.gen = gen
+        self.compressible = compressible
+
 # list of nodes encoding / decoding
 def decodelist(l, sep=' '):
     if l:
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -386,6 +386,56 @@
         if done:
             break
 
+def createbytesresponseframesfromgen(stream, requestid, gen,
+                                     maxframesize=DEFAULT_MAX_FRAME_SIZE):
+    overall = cbor.dumps({b'status': b'ok'}, canonical=True)
+
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                           flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+                           payload=overall)
+
+    cb = util.chunkbuffer(gen)
+
+    flags = 0
+
+    while True:
+        chunk = cb.read(maxframesize)
+        if not chunk:
+            break
+
+        yield stream.makeframe(requestid=requestid,
+                               typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                               flags=flags,
+                               payload=chunk)
+
+        flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
+
+    flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
+    flags |= FLAG_COMMAND_RESPONSE_EOS
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                           flags=flags,
+                           payload=b'')
+
+def createcommanderrorresponse(stream, requestid, message, args=None):
+    m = {
+        b'status': b'error',
+        b'error': {
+            b'message': message,
+        }
+    }
+
+    if args:
+        m[b'error'][b'args'] = args
+
+    overall = cbor.dumps(m, canonical=True)
+
+    yield stream.makeframe(requestid=requestid,
+                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
+                           flags=FLAG_COMMAND_RESPONSE_EOS,
+                           payload=overall)
+
 def createerrorframe(stream, requestid, msg, errtype):
     # TODO properly handle frame size limits.
     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 @@
                 'framegen': result,
             }
 
+    def oncommandresponsereadygen(self, stream, requestid, gen):
+        """Signal that a bytes response is ready, with data as a generator."""
+        ensureserverstream(stream)
+
+        def sendframes():
+            for frame in createbytesresponseframesfromgen(stream, requestid,
+                                                          gen):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        return self._handlesendframes(sendframes())
+
     def oninputeof(self):
         """Signals that end of input has been received.
 
@@ -655,13 +718,39 @@
             'framegen': makegen(),
         }
 
+    def _handlesendframes(self, framegen):
+        if self._deferoutput:
+            self._bufferedframegens.append(framegen)
+            return 'noop', {}
+        else:
+            return 'sendframes', {
+                'framegen': framegen,
+            }
+
     def onservererror(self, stream, requestid, msg):
         ensureserverstream(stream)
 
-        return 'sendframes', {
-            'framegen': createerrorframe(stream, requestid, msg,
-                                         errtype='server'),
-        }
+        def sendframes():
+            for frame in createerrorframe(stream, requestid, msg,
+                                          errtype='server'):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        return self._handlesendframes(sendframes())
+
+    def oncommanderror(self, stream, requestid, message, args=None):
+        """Called when a command encountered an error before sending output."""
+        ensureserverstream(stream)
+
+        def sendframes():
+            for frame in createcommanderrorresponse(stream, requestid, message,
+                                                    args):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        return self._handlesendframes(sendframes())
 
     def makeoutputstream(self):
         """Create a stream to be used for sending data to the client."""



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list