D3297: httppeer: implement command executor for version 2 peer

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Fri Apr 13 17:36:59 EDT 2018


indygreg updated this revision to Diff 8153.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3297?vs=8102&id=8153

REVISION DETAIL
  https://phab.mercurial-scm.org/D3297

AFFECTED FILES
  mercurial/httppeer.py

CHANGE DETAILS

diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -13,7 +13,9 @@
 import os
 import socket
 import struct
+import sys
 import tempfile
+import weakref
 
 from .i18n import _
 from .thirdparty import (
@@ -31,7 +33,6 @@
     statichttprepo,
     url as urlmod,
     util,
-    wireproto,
     wireprotoframing,
     wireprototypes,
     wireprotov1peer,
@@ -517,8 +518,262 @@
     def _abort(self, exception):
         raise exception
 
+def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
+    reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
+                                             buffersends=True)
+
+    url = '%s/%s' % (apiurl, permission)
+
+    if len(requests) > 1:
+        url += '/multirequest'
+    else:
+        url += '/%s' % requests[0][0]
+
+    # Request ID to (request, future)
+    requestmap = {}
+
+    for command, args, f in requests:
+        request, action, meta = reactor.callcommand(command, args)
+        assert action == 'noop'
+
+        requestmap[request.requestid] = (request, f)
+
+    action, meta = reactor.flushcommands()
+    assert action == 'sendframes'
+
+    # TODO stream this.
+    body = b''.join(map(bytes, meta['framegen']))
+
+    # TODO modify user-agent to reflect v2
+    headers = {
+        r'Accept': wireprotov2server.FRAMINGTYPE,
+        r'Content-Type': wireprotov2server.FRAMINGTYPE,
+    }
+
+    req = requestbuilder(pycompat.strurl(url), body, headers)
+    req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
+
+    try:
+        res = opener.open(req)
+    except urlerr.httperror as e:
+        if e.code == 401:
+            raise error.Abort(_('authorization failed'))
+
+        raise
+    except httplib.HTTPException as e:
+        ui.traceback()
+        raise IOError(None, e)
+
+    return reactor, requestmap, res
+
+class queuedcommandfuture(pycompat.futures.Future):
+    """Wraps result() on command futures to trigger submission on call."""
+
+    def result(self, timeout=None):
+        if self.done():
+            return pycompat.futures.Future.result(self, timeout)
+
+        self._peerexecutor.sendcommands()
+
+        # sendcommands() will restore the original __class__ and self.result
+        # will resolve to Future.result.
+        return self.result(timeout)
+
+ at zi.implementer(repository.ipeercommandexecutor)
+class httpv2executor(object):
+    def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
+        self._ui = ui
+        self._opener = opener
+        self._requestbuilder = requestbuilder
+        self._apiurl = apiurl
+        self._descriptor = descriptor
+        self._sent = False
+        self._closed = False
+        self._neededpermissions = set()
+        self._calls = []
+        self._futures = weakref.WeakSet()
+        self._responseexecutor = None
+        self._responsef = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excvalue, exctb):
+        self.close()
+
+    def callcommand(self, command, args):
+        if self._sent:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'commands are sent')
+
+        if self._closed:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'close()')
+
+        # The service advertises which commands are available. So if we attempt
+        # to call an unknown command or pass an unknown argument, we can screen
+        # for this.
+        if command not in self._descriptor['commands']:
+            raise error.ProgrammingError(
+                'wire protocol command %s is not available' % command)
+
+        cmdinfo = self._descriptor['commands'][command]
+        unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
+
+        if unknownargs:
+            raise error.ProgrammingError(
+                'wire protocol command %s does not accept argument: %s' % (
+                    command, ', '.join(sorted(unknownargs))))
+
+        self._neededpermissions |= set(cmdinfo['permissions'])
+
+        # TODO we /could/ also validate types here, since the API descriptor
+        # includes types...
+
+        f = pycompat.futures.Future()
+
+        # Monkeypatch it so result() triggers sendcommands(), otherwise result()
+        # could deadlock.
+        f.__class__ = queuedcommandfuture
+        f._peerexecutor = self
+
+        self._futures.add(f)
+        self._calls.append((command, args, f))
+
+        return f
+
+    def sendcommands(self):
+        if self._sent:
+            return
+
+        if not self._calls:
+            return
+
+        self._sent = True
+
+        # Unhack any future types so caller sees a clean type and so we
+        # break reference cycle.
+        for f in self._futures:
+            if isinstance(f, queuedcommandfuture):
+                f.__class__ = pycompat.futures.Future
+                f._peerexecutor = None
+
+        # Mark the future as running and filter out cancelled futures.
+        calls = [(command, args, f)
+                 for command, args, f in self._calls
+                 if f.set_running_or_notify_cancel()]
+
+        # Clear out references, prevent improper object usage.
+        self._calls = None
+
+        if not calls:
+            return
+
+        permissions = set(self._neededpermissions)
+
+        if 'push' in permissions and 'pull' in permissions:
+            permissions.remove('pull')
+
+        if len(permissions) > 1:
+            raise error.RepoError(_('cannot make request requiring multiple '
+                                    'permissions: %s') %
+                                  _(', ').join(sorted(permissions)))
+
+        permission = {
+            'push': 'rw',
+            'pull': 'ro',
+        }[permissions.pop()]
+
+        reactor, requests, resp = sendv2request(
+            self._ui, self._opener, self._requestbuilder, self._apiurl,
+            permission, calls)
+
+        # TODO we probably want to validate the HTTP code, media type, etc.
+
+        self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
+        self._responsef = self._responseexecutor.submit(self._handleresponse,
+                                                        reactor,
+                                                        requests,
+                                                        resp)
+
+    def close(self):
+        if self._closed:
+            return
+
+        self.sendcommands()
+
+        self._closed = True
+
+        if not self._responsef:
+            return
+
+        try:
+            self._responsef.result()
+        finally:
+            self._responseexecutor.shutdown(wait=True)
+            self._responsef = None
+            self._responseexecutor = None
+
+            # If any of our futures are still in progress, mark them as
+            # errored, otherwise a result() could wait indefinitely.
+            for f in self._futures:
+                if not f.done():
+                    f.set_exception(error.ResponseError(
+                        _('unfulfilled command response')))
+
+            self._futures = None
+
+    def _handleresponse(self, reactor, requests, resp):
+        # Called in a thread to read the response.
+
+        results = {k: [] for k in requests}
+
+        while True:
+            frame = wireprotoframing.readframe(resp)
+            if frame is None:
+                break
+
+            self._ui.note(_('received %r\n') % frame)
+
+            # Guard against receiving a frame with a request ID that we
+            # didn't issue. This should never happen.
+            request, f = requests.get(frame.requestid, [None, None])
+
+            action, meta = reactor.onframerecv(frame)
+
+            if action == 'responsedata':
+                assert request.requestid == meta['request'].requestid
+
+                result = results[request.requestid]
+
+                if meta['cbor']:
+                    payload = util.bytesio(meta['data'])
+
+                    decoder = cbor.CBORDecoder(payload)
+                    while payload.tell() + 1 < len(meta['data']):
+                        try:
+                            result.append(decoder.decode())
+                        except Exception:
+                            f.set_exception_info(*sys.exc_info()[1:])
+                            continue
+                else:
+                    result.append(meta['data'])
+
+                if meta['eos']:
+                    f.set_result(result)
+                    del results[request.requestid]
+
+            else:
+                e = error.ProgrammingError('unhandled action: %s' % action)
+
+                if f:
+                    f.set_exception(e)
+                else:
+                    raise e
+
 # TODO implement interface for version 2 peers
- at zi.implementer(repository.ipeerconnection, repository.ipeercapabilities)
+ at zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
+                repository.ipeerrequests)
 class httpv2peer(object):
     def __init__(self, ui, repourl, apipath, opener, requestbuilder,
                  apidescriptor):
@@ -529,6 +784,7 @@
 
         self._url = repourl
         self._apipath = apipath
+        self._apiurl = '%s/%s' % (repourl, apipath)
         self._opener = opener
         self._requestbuilder = requestbuilder
         self._descriptor = apidescriptor
@@ -580,85 +836,13 @@
 
     # End of ipeercapabilities.
 
-    # TODO require to be part of a batched primitive, use futures.
     def _call(self, name, **args):
-        """Call a wire protocol command with arguments."""
-
-        # Having this early has a side-effect of importing wireprotov2server,
-        # which has the side-effect of ensuring commands are registered.
-
-        # TODO modify user-agent to reflect v2.
-        headers = {
-            r'Accept': wireprotov2server.FRAMINGTYPE,
-            r'Content-Type': wireprotov2server.FRAMINGTYPE,
-        }
-
-        # TODO permissions should come from capabilities results.
-        permission = wireproto.commandsv2[name].permission
-        if permission not in ('push', 'pull'):
-            raise error.ProgrammingError('unknown permission type: %s' %
-                                         permission)
-
-        permission = {
-            'push': 'rw',
-            'pull': 'ro',
-        }[permission]
-
-        url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
-
-        # TODO this should be part of a generic peer for the frame-based
-        # protocol.
-        reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
-                                                 buffersends=True)
-
-        request, action, meta = reactor.callcommand(name, args)
-        assert action == 'noop'
-
-        action, meta = reactor.flushcommands()
-        assert action == 'sendframes'
+        with self.commandexecutor() as e:
+            return e.callcommand(name, args).result()
 
-        body = b''.join(map(bytes, meta['framegen']))
-        req = self._requestbuilder(pycompat.strurl(url), body, headers)
-        req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
-
-        # TODO unify this code with httppeer.
-        try:
-            res = self._opener.open(req)
-        except urlerr.httperror as e:
-            if e.code == 401:
-                raise error.Abort(_('authorization failed'))
-
-            raise
-        except httplib.HTTPException as e:
-            self.ui.traceback()
-            raise IOError(None, e)
-
-        # TODO validate response type, wrap response to handle I/O errors.
-        # TODO more robust frame receiver.
-        results = []
-
-        while True:
-            frame = wireprotoframing.readframe(res)
-            if frame is None:
-                break
-
-            self.ui.note(_('received %r\n') % frame)
-
-            action, meta = reactor.onframerecv(frame)
-
-            if action == 'responsedata':
-                if meta['cbor']:
-                    payload = util.bytesio(meta['data'])
-
-                    decoder = cbor.CBORDecoder(payload)
-                    while payload.tell() + 1 < len(meta['data']):
-                        results.append(decoder.decode())
-                else:
-                    results.append(meta['data'])
-            else:
-                error.ProgrammingError('unhandled action: %s' % action)
-
-        return results
+    def commandexecutor(self):
+        return httpv2executor(self.ui, self._opener, self._requestbuilder,
+                              self._apiurl, self._descriptor)
 
 # Registry of API service names to metadata about peers that handle it.
 #



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list