D3379: wireprotov2: move response handling out of httppeer
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Mon Apr 16 19:09:48 EDT 2018
This revision was automatically updated to reflect the committed changes.
Closed by commit rHGa656cba08a04: wireprotov2: move response handling out of httppeer (authored by indygreg, committed by ).
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D3379?vs=8291&id=8322
REVISION DETAIL
https://phab.mercurial-scm.org/D3379
AFFECTED FILES
mercurial/httppeer.py
mercurial/wireprotov2peer.py
tests/test-wireproto-command-known.t
tests/test-wireproto-command-pushkey.t
CHANGE DETAILS
diff --git a/tests/test-wireproto-command-pushkey.t b/tests/test-wireproto-command-pushkey.t
--- a/tests/test-wireproto-command-pushkey.t
+++ b/tests/test-wireproto-command-pushkey.t
@@ -53,7 +53,7 @@
received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) (glob)
s> 0\r\n
s> \r\n
- response: []
+ response: [True]
$ sendhttpv2peer << EOF
> command listkeys
diff --git a/tests/test-wireproto-command-known.t b/tests/test-wireproto-command-known.t
--- a/tests/test-wireproto-command-known.t
+++ b/tests/test-wireproto-command-known.t
@@ -50,7 +50,7 @@
received frame(size=1; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor)
s> 0\r\n
s> \r\n
- response: []
+ response: [b'']
Single known node works
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
new file mode 100644
--- /dev/null
+++ b/mercurial/wireprotov2peer.py
@@ -0,0 +1,135 @@
+# wireprotov2peer.py - client side code for wire protocol version 2
+#
+# Copyright 2018 Gregory Szorc <gregory.szorc at gmail.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+from .i18n import _
+from .thirdparty import (
+ cbor,
+)
+from . import (
+ error,
+ util,
+ wireprotoframing,
+)
+
+class clienthandler(object):
+ """Object to handle higher-level client activities.
+
+ The ``clientreactor`` is used to hold low-level state about the frame-based
+ protocol, such as which requests and streams are active. This type is used
+ for higher-level operations, such as reading frames from a socket, exposing
+ and managing a higher-level primitive for representing command responses,
+ etc. This class is what peers should probably use to bridge wire activity
+ with the higher-level peer API.
+ """
+
+ def __init__(self, ui, clientreactor):
+ self._ui = ui
+ self._reactor = clientreactor
+ self._requests = {}
+ self._futures = {}
+ self._responses = {}
+
+ def callcommand(self, command, args, f):
+ """Register a request to call a command.
+
+ Returns an iterable of frames that should be sent over the wire.
+ """
+ request, action, meta = self._reactor.callcommand(command, args)
+
+ if action != 'noop':
+ raise error.ProgrammingError('%s not yet supported' % action)
+
+ rid = request.requestid
+ self._requests[rid] = request
+ self._futures[rid] = f
+ self._responses[rid] = {
+ 'cbor': False,
+ 'b': util.bytesio(),
+ }
+
+ return iter(())
+
+ def flushcommands(self):
+ """Flush all queued commands.
+
+ Returns an iterable of frames that should be sent over the wire.
+ """
+ action, meta = self._reactor.flushcommands()
+
+ if action != 'sendframes':
+ raise error.ProgrammingError('%s not yet supported' % action)
+
+ return meta['framegen']
+
+ def readframe(self, fh):
+ """Attempt to read and process a frame.
+
+ Returns None if no frame was read. Presumably this means EOF.
+ """
+ frame = wireprotoframing.readframe(fh)
+ if frame is None:
+ # TODO tell reactor?
+ return
+
+ self._ui.note(_('received %r\n') % frame)
+ self._processframe(frame)
+
+ return True
+
+ def _processframe(self, frame):
+ """Process a single read frame."""
+
+ action, meta = self._reactor.onframerecv(frame)
+
+ if action == 'error':
+ e = error.RepoError(meta['message'])
+
+ if frame.requestid in self._futures:
+ self._futures[frame.requestid].set_exception(e)
+ else:
+ raise e
+
+ if frame.requestid not in self._requests:
+ raise error.ProgrammingError(
+ 'received frame for unknown request; this is either a bug in '
+ 'the clientreactor not screening for this or this instance was '
+ 'never told about this request: %r' % frame)
+
+ response = self._responses[frame.requestid]
+
+ if action == 'responsedata':
+ response['b'].write(meta['data'])
+
+ if meta['cbor']:
+ response['cbor'] = True
+
+ if meta['eos']:
+ if meta['cbor']:
+ # If CBOR, decode every object.
+ b = response['b']
+
+ size = b.tell()
+ b.seek(0)
+
+ decoder = cbor.CBORDecoder(b)
+
+ result = []
+ while b.tell() < size:
+ result.append(decoder.decode())
+ else:
+ result = [response['b'].getvalue()]
+
+ self._futures[frame.requestid].set_result(result)
+
+ del self._requests[frame.requestid]
+ del self._futures[frame.requestid]
+
+ else:
+ raise error.ProgrammingError(
+ 'unhandled action from clientreactor: %s' % action)
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -13,7 +13,6 @@
import os
import socket
import struct
-import sys
import tempfile
import weakref
@@ -36,6 +35,7 @@
wireprotoframing,
wireprototypes,
wireprotov1peer,
+ wireprotov2peer,
wireprotov2server,
)
@@ -522,27 +522,20 @@
reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
buffersends=True)
+ handler = wireprotov2peer.clienthandler(ui, reactor)
+
url = '%s/%s' % (apiurl, permission)
if len(requests) > 1:
url += '/multirequest'
else:
url += '/%s' % requests[0][0]
- # Request ID to (request, future)
- requestmap = {}
-
for command, args, f in requests:
- request, action, meta = reactor.callcommand(command, args)
- assert action == 'noop'
-
- requestmap[request.requestid] = (request, f)
-
- action, meta = reactor.flushcommands()
- assert action == 'sendframes'
+ assert not list(handler.callcommand(command, args, f))
# TODO stream this.
- body = b''.join(map(bytes, meta['framegen']))
+ body = b''.join(map(bytes, handler.flushcommands()))
# TODO modify user-agent to reflect v2
headers = {
@@ -564,7 +557,7 @@
ui.traceback()
raise IOError(None, e)
- return reactor, requestmap, res
+ return handler, res
class queuedcommandfuture(pycompat.futures.Future):
"""Wraps result() on command futures to trigger submission on call."""
@@ -684,17 +677,15 @@
'pull': 'ro',
}[permissions.pop()]
- reactor, requests, resp = sendv2request(
+ handler, resp = sendv2request(
self._ui, self._opener, self._requestbuilder, self._apiurl,
permission, calls)
# TODO we probably want to validate the HTTP code, media type, etc.
self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
self._responsef = self._responseexecutor.submit(self._handleresponse,
- reactor,
- requests,
- resp)
+ handler, resp)
def close(self):
if self._closed:
@@ -723,62 +714,11 @@
self._futures = None
- def _handleresponse(self, reactor, requests, resp):
+ def _handleresponse(self, handler, resp):
# Called in a thread to read the response.
- results = {k: [] for k in requests}
-
- while True:
- frame = wireprotoframing.readframe(resp)
- if frame is None:
- break
-
- self._ui.note(_('received %r\n') % frame)
-
- # Guard against receiving a frame with a request ID that we
- # didn't issue. This should never happen.
- request, f = requests.get(frame.requestid, [None, None])
-
- action, meta = reactor.onframerecv(frame)
-
- if action == 'responsedata':
- assert request.requestid == meta['request'].requestid
-
- result = results[request.requestid]
-
- if meta['cbor']:
- payload = util.bytesio(meta['data'])
-
- decoder = cbor.CBORDecoder(payload)
- while payload.tell() + 1 < len(meta['data']):
- try:
- result.append(decoder.decode())
- except Exception:
- pycompat.future_set_exception_info(
- f, sys.exc_info()[1:])
- continue
- else:
- result.append(meta['data'])
-
- if meta['eos']:
- f.set_result(result)
- del results[request.requestid]
-
- elif action == 'error':
- e = error.RepoError(meta['message'])
-
- if f:
- f.set_exception(e)
- else:
- raise e
-
- else:
- e = error.ProgrammingError('unhandled action: %s' % action)
-
- if f:
- f.set_exception(e)
- else:
- raise e
+ while handler.readframe(resp):
+ pass
# TODO implement interface for version 2 peers
@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
To: indygreg, #hg-reviewers, durin42
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list