D4916: wireprotov2: handle sender protocol settings frames
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Wed Oct 10 10:57:39 EDT 2018
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG327d40b94bed: wireprotov2: handle sender protocol settings frames (authored by indygreg, committed by ).
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D4916?vs=11755&id=11780
REVISION DETAIL
https://phab.mercurial-scm.org/D4916
AFFECTED FILES
mercurial/wireprotoframing.py
tests/test-wireproto-serverreactor.py
CHANGE DETAILS
diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -9,6 +9,9 @@
util,
wireprotoframing as framing,
)
+from mercurial.utils import (
+ cborutil,
+)
ffs = framing.makeframefromhumanstring
@@ -193,7 +196,8 @@
ffs(b'1 1 stream-begin command-data 0 ignored'))
self.assertaction(result, b'error')
self.assertEqual(result[1], {
- b'message': b'expected command request frame; got 2',
+ b'message': b'expected sender protocol settings or command request '
+ b'frame; got 2',
})
def testunexpectedcommanddatareceiving(self):
@@ -494,6 +498,105 @@
results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
self.assertaction(results[0], b'runcommand')
+ def testprotocolsettingsnoflags(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame must have '
+ b'continuation or end of stream flag set',
+ })
+
+ def testprotocolsettingsconflictflags(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame cannot have both '
+ b'continuation and end of stream flags set',
+ })
+
+ def testprotocolsettingsemptypayload(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings eos '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame did not contain CBOR '
+ b'data',
+ })
+
+ def testprotocolsettingsmultipleobjects(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings eos '
+ b'\x46foobar\x43foo'))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame contained multiple '
+ b'CBOR values',
+ })
+
+ def testprotocolsettingscontentencodings(self):
+ reactor = makereactor()
+
+ result = self._sendsingleframe(
+ reactor,
+ ffs(b'0 1 stream-begin sender-protocol-settings eos '
+ b'cbor:{b"contentencodings": [b"a", b"b"]}'))
+ self.assertaction(result, b'wantframe')
+
+ self.assertEqual(reactor._state, b'idle')
+ self.assertEqual(reactor._sendersettings[b'contentencodings'],
+ [b'a', b'b'])
+
+ def testprotocolsettingsmultipleframes(self):
+ reactor = makereactor()
+
+ data = b''.join(cborutil.streamencode({
+ b'contentencodings': [b'value1', b'value2'],
+ }))
+
+ results = list(sendframes(reactor, [
+ ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
+ data[0:5]),
+ ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
+ ]))
+
+ self.assertEqual(len(results), 2)
+
+ self.assertaction(results[0], b'wantframe')
+ self.assertaction(results[1], b'wantframe')
+
+ self.assertEqual(reactor._state, b'idle')
+ self.assertEqual(reactor._sendersettings[b'contentencodings'],
+ [b'value1', b'value2'])
+
+ def testprotocolsettingsbadcbor(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
+ self.assertaction(result, b'error')
+
+ def testprotocolsettingsnoninitial(self):
+ # Cannot have protocol settings frames as non-initial frames.
+ reactor = makereactor()
+
+ stream = framing.stream(1)
+ results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
+ self.assertEqual(len(results), 1)
+ self.assertaction(results[0], b'runcommand')
+
+ result = self._sendsingleframe(
+ reactor,
+ ffs(b'0 1 0 sender-protocol-settings eos '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'expected command request frame; got 8',
+ })
+
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -674,6 +674,10 @@
'numbered streams; %d is not even' %
stream.streamid)
+DEFAULT_PROTOCOL_SETTINGS = {
+ 'contentencodings': [b'identity'],
+}
+
class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
@@ -750,7 +754,7 @@
sender cannot receive until all data has been transmitted.
"""
self._deferoutput = deferoutput
- self._state = 'idle'
+ self._state = 'initial'
self._nextoutgoingstreamid = 2
self._bufferedframegens = []
# stream id -> stream instance for all active streams from the client.
@@ -763,6 +767,11 @@
# set.
self._activecommands = set()
+ self._protocolsettingsdecoder = None
+
+ # Sender protocol settings are optional. Set implied default values.
+ self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
+
def onframerecv(self, frame):
"""Process a frame that has been received off the wire.
@@ -794,6 +803,8 @@
del self._incomingstreams[frame.streamid]
handlers = {
+ 'initial': self._onframeinitial,
+ 'protocol-settings-receiving': self._onframeprotocolsettings,
'idle': self._onframeidle,
'command-receiving': self._onframecommandreceiving,
'errored': self._onframeerrored,
@@ -1062,6 +1073,85 @@
_('received command request frame with neither new nor '
'continuation flags set'))
+ def _onframeinitial(self, frame):
+ # Called when we receive a frame when in the "initial" state.
+ if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+ self._state = 'protocol-settings-receiving'
+ self._protocolsettingsdecoder = cborutil.bufferingdecoder()
+ return self._onframeprotocolsettings(frame)
+
+ elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+ self._state = 'idle'
+ return self._onframeidle(frame)
+
+ else:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('expected sender protocol settings or command request '
+ 'frame; got %d') % frame.typeid)
+
+ def _onframeprotocolsettings(self, frame):
+ assert self._state == 'protocol-settings-receiving'
+ assert self._protocolsettingsdecoder is not None
+
+ if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('expected sender protocol settings frame; got %d') %
+ frame.typeid)
+
+ more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
+ eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
+
+ if more and eos:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame cannot have both '
+ 'continuation and end of stream flags set'))
+
+ if not more and not eos:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame must have continuation or '
+ 'end of stream flag set'))
+
+ # TODO establish limits for maximum amount of data that can be
+ # buffered.
+ try:
+ self._protocolsettingsdecoder.decode(frame.payload)
+ except Exception as e:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('error decoding CBOR from sender protocol settings frame: %s')
+ % stringutil.forcebytestr(e))
+
+ if more:
+ return self._makewantframeresult()
+
+ assert eos
+
+ decoded = self._protocolsettingsdecoder.getavailable()
+ self._protocolsettingsdecoder = None
+
+ if not decoded:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame did not contain CBOR data'))
+ elif len(decoded) > 1:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame contained multiple CBOR '
+ 'values'))
+
+ d = decoded[0]
+
+ if b'contentencodings' in d:
+ self._sendersettings['contentencodings'] = d[b'contentencodings']
+
+ self._state = 'idle'
+
+ return self._makewantframeresult()
+
def _onframeidle(self, frame):
# The only frame type that should be received in this state is a
# command request.
To: indygreg, #hg-reviewers
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list