D3224: wireproto: client reactor support for receiving frames
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Wed Apr 11 13:00:19 EDT 2018
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG55b5ba8d4e68: wireproto: client reactor support for receiving frames (authored by indygreg, committed by ).
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D3224?vs=7942&id=7990
REVISION DETAIL
https://phab.mercurial-scm.org/D3224
AFFECTED FILES
mercurial/httppeer.py
mercurial/wireprotoframing.py
tests/test-wireproto-clientreactor.py
CHANGE DETAILS
diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py
--- a/tests/test-wireproto-clientreactor.py
+++ b/tests/test-wireproto-clientreactor.py
@@ -7,6 +7,21 @@
wireprotoframing as framing,
)
+ffs = framing.makeframefromhumanstring
+
+def sendframe(reactor, frame):
+ """Send a frame bytearray to a reactor."""
+ header = framing.parseheader(frame)
+ payload = frame[framing.FRAME_HEADER_SIZE:]
+ assert len(payload) == header.length
+
+ return reactor.onframerecv(framing.frame(header.requestid,
+ header.streamid,
+ header.streamflags,
+ header.typeid,
+ header.flags,
+ payload))
+
class SingleSendTests(unittest.TestCase):
"""A reactor that can only send once rejects subsequent sends."""
def testbasic(self):
@@ -61,6 +76,35 @@
self.assertEqual(request.state, 'sent')
+class BadFrameRecvTests(unittest.TestCase):
+ def testoddstream(self):
+ reactor = framing.clientreactor()
+
+ action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
+ self.assertEqual(action, 'error')
+ self.assertEqual(meta['message'],
+ 'received frame with odd numbered stream ID: 1')
+
+ def testunknownstream(self):
+ reactor = framing.clientreactor()
+
+ action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
+ self.assertEqual(action, 'error')
+ self.assertEqual(meta['message'],
+ 'received frame on unknown stream without beginning '
+ 'of stream flag set')
+
+ def testunhandledframetype(self):
+ reactor = framing.clientreactor(buffersends=False)
+
+ request, action, meta = reactor.callcommand(b'foo', {})
+ for frame in meta['framegen']:
+ pass
+
+ with self.assertRaisesRegexp(error.ProgrammingError,
+ 'unhandled frame type'):
+ sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
+
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -922,6 +922,7 @@
self._outgoingstream = stream(1)
self._pendingrequests = collections.deque()
self._activerequests = {}
+ self._incomingstreams = {}
def callcommand(self, name, args, datafh=None):
"""Request that a command be executed.
@@ -1007,3 +1008,63 @@
yield frame
request.state = 'sent'
+
+ def onframerecv(self, frame):
+ """Process a frame that has been received off the wire.
+
+ Returns a 2-tuple of (action, meta) describing further action the
+ caller needs to take as a result of receiving this frame.
+ """
+ if frame.streamid % 2:
+ return 'error', {
+ 'message': (
+ _('received frame with odd numbered stream ID: %d') %
+ frame.streamid),
+ }
+
+ if frame.streamid not in self._incomingstreams:
+ if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
+ return 'error', {
+ 'message': _('received frame on unknown stream '
+ 'without beginning of stream flag set'),
+ }
+
+ if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
+ raise error.ProgrammingError('support for decoding stream '
+ 'payloads not yet implemneted')
+
+ if frame.streamflags & STREAM_FLAG_END_STREAM:
+ del self._incomingstreams[frame.streamid]
+
+ if frame.requestid not in self._activerequests:
+ return 'error', {
+ 'message': (_('received frame for inactive request ID: %d') %
+ frame.requestid),
+ }
+
+ request = self._activerequests[frame.requestid]
+ request.state = 'receiving'
+
+ handlers = {
+ FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
+ }
+
+ meth = handlers.get(frame.typeid)
+ if not meth:
+ raise error.ProgrammingError('unhandled frame type: %d' %
+ frame.typeid)
+
+ return meth(request, frame)
+
+ def _onbytesresponseframe(self, request, frame):
+ if frame.flags & FLAG_BYTES_RESPONSE_EOS:
+ request.state = 'received'
+ del self._activerequests[request.requestid]
+
+ return 'responsedata', {
+ 'request': request,
+ 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
+ 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
+ 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
+ 'data': frame.payload,
+ }
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -551,18 +551,19 @@
self.ui.note(_('received %r\n') % frame)
- if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE:
- if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR:
- payload = util.bytesio(frame.payload)
+ action, meta = reactor.onframerecv(frame)
+
+ if action == 'responsedata':
+ if meta['cbor']:
+ payload = util.bytesio(meta['data'])
decoder = cbor.CBORDecoder(payload)
- while payload.tell() + 1 < len(frame.payload):
+ while payload.tell() + 1 < len(meta['data']):
results.append(decoder.decode())
else:
- results.append(frame.payload)
+ results.append(meta['data'])
else:
- error.ProgrammingError('unhandled frame type: %d' %
- frame.typeid)
+ error.ProgrammingError('unhandled action: %s' % action)
return results
To: indygreg, #hg-reviewers, durin42
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list