D3269: wireproto: implement batching on peer executor interface

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Fri Apr 13 15:14:36 EDT 2018


indygreg updated this revision to Diff 8122.
indygreg edited the summary of this revision.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3269?vs=8035&id=8122

REVISION DETAIL
  https://phab.mercurial-scm.org/D3269

AFFECTED FILES
  mercurial/setdiscovery.py
  mercurial/wireprotov1peer.py

CHANGE DETAILS

diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py
--- a/mercurial/wireprotov1peer.py
+++ b/mercurial/wireprotov1peer.py
@@ -9,6 +9,7 @@
 
 import hashlib
 import sys
+import weakref
 
 from .i18n import _
 from .node import (
@@ -180,13 +181,36 @@
 
     return ';'.join(cmds)
 
+class unsentfuture(pycompat.futures.Future):
+    """A Future variation to represent an unsent command.
+
+    Because we buffer commands and don't submit them immediately, calling
+    ``result()`` on an unsent future could deadlock. Futures for buffered
+    commands are represented by this type, which wraps ``result()`` to
+    call ``sendcommands()``.
+    """
+
+    def result(self, timeout=None):
+        if self.done():
+            return pycompat.futures.Future.result(self, timeout)
+
+        self._peerexecutor.sendcommands()
+
+        # This looks like it will infinitely recurse. However,
+        # sendcommands() should modify __class__. This call serves as a check
+        # on that.
+        return self.result(timeout)
+
 @zi.implementer(repository.ipeercommandexecutor)
 class peerexecutor(object):
     def __init__(self, peer):
         self._peer = peer
         self._sent = False
         self._closed = False
         self._calls = []
+        self._futures = weakref.WeakSet()
+        self._responseexecutor = None
+        self._responsef = None
 
     def __enter__(self):
         return self
@@ -214,20 +238,35 @@
         # Commands are either batchable or they aren't. If a command
         # isn't batchable, we send it immediately because the executor
         # can no longer accept new commands after a non-batchable command.
-        # If a command is batchable, we queue it for later.
+        # If a command is batchable, we queue it for later. But we have
+        # to account for the case of a non-batchable command arriving after
+        # a batchable one and refuse to service it.
+
+        def addcall():
+            f = pycompat.futures.Future()
+            self._futures.add(f)
+            self._calls.append((command, args, fn, f))
+            return f
 
         if getattr(fn, 'batchable', False):
-            pass
+            f = addcall()
+
+            # But since we don't issue it immediately, we wrap its result()
+            # to trigger sending so we avoid deadlocks.
+            f.__class__ = unsentfuture
+            f._peerexecutor = self
         else:
             if self._calls:
                 raise error.ProgrammingError(
                     '%s is not batchable and cannot be called on a command '
                     'executor along with other commands' % command)
 
-        # We don't support batching yet. So resolve it immediately.
-        f = pycompat.futures.Future()
-        self._calls.append((command, args, fn, f))
-        self.sendcommands()
+            f = addcall()
+
+            # Non-batchable commands can never coexist with another command
+            # in this executor. So send the command immediately.
+            self.sendcommands()
+
         return f
 
     def sendcommands(self):
@@ -239,10 +278,18 @@
 
         self._sent = True
 
+        # Unhack any future types so caller seens a clean type and to break
+        # cycle between us and futures.
+        for f in self._futures:
+            if isinstance(f, unsentfuture):
+                f.__class__ = pycompat.futures.Future
+                f._peerexecutor = None
+
         calls = self._calls
         # Mainly to destroy references to futures.
         self._calls = None
 
+        # Simple case of a single command. We call it synchronously.
         if len(calls) == 1:
             command, args, fn, f = calls[0]
 
@@ -259,14 +306,99 @@
 
             return
 
-        raise error.ProgrammingError('support for multiple commands not '
-                                     'yet implemented')
+        # Batch commands are a bit harder. First, we have to deal with the
+        # @batchable coroutine. That's a bit annoying. Furthermore, we also
+        # need to preserve streaming. i.e. it should be possible for the
+        # futures to resolve as data is coming in off the wire without having
+        # to wait for the final byte of the final response. We do this by
+        # spinning up a thread to read the responses.
+
+        requests = []
+        states = []
+
+        for command, args, fn, f in calls:
+            # Future was cancelled. Ignore it.
+            if not f.set_running_or_notify_cancel():
+                continue
+
+            try:
+                batchable = fn.batchable(fn.__self__,
+                                         **pycompat.strkwargs(args))
+            except Exception:
+                f.set_exception_info(*sys.exc_info()[1:])
+                return
+
+            # Encoded arguments and future holding remote result.
+            try:
+                encodedargs, fremote = next(batchable)
+            except Exception:
+                f.set_exception_info(*sys.exc_info()[1:])
+                return
+
+            requests.append((command, encodedargs))
+            states.append((command, f, batchable, fremote))
+
+        if not requests:
+            return
+
+        # This will emit responses in order they were executed.
+        wireresults = self._peer._submitbatch(requests)
+
+        # The use of a thread pool executor here is a bit weird for something
+        # that only spins up a single thread. However, thread management is
+        # hard and it is easy to encounter race conditions, deadlocks, etc.
+        # concurrent.futures already solves these problems and its thread pool
+        # executor has minimal overhead. So we use it.
+        self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
+        self._responsef = self._responseexecutor.submit(self._readbatchresponse,
+                                                        states, wireresults)
 
     def close(self):
         self.sendcommands()
 
+        if self._closed:
+            return
+
         self._closed = True
 
+        if not self._responsef:
+            return
+
+        # We need to wait on our in-flight response and then shut down the
+        # executor once we have a result.
+        try:
+            self._responsef.result()
+        finally:
+            self._responseexecutor.shutdown(wait=True)
+            self._responsef = None
+            self._responseexecutor = None
+
+            # If any of our futures are still in progress, mark them as
+            # errored. Otherwise a result() could wait indefinitely.
+            for f in self._futures:
+                if not f.done():
+                    f.set_exception(error.ResponseError(
+                        _('unfulfilled batch command response')))
+
+            self._futures = None
+
+    def _readbatchresponse(self, states, wireresults):
+        # Executes in a thread to read data off the wire.
+
+        for command, f, batchable, fremote in states:
+            # Grab raw result off the wire and teach the internal future
+            # about it.
+            remoteresult = next(wireresults)
+            fremote.set(remoteresult)
+
+            # And ask the coroutine to decode that value.
+            try:
+                result = next(batchable)
+            except Exception:
+                f.set_exception_info(*sys.exc_info()[1:])
+            else:
+                f.set_result(result)
+
 class wirepeer(repository.legacypeer):
     """Client-side interface for communicating with a peer repository.
 
diff --git a/mercurial/setdiscovery.py b/mercurial/setdiscovery.py
--- a/mercurial/setdiscovery.py
+++ b/mercurial/setdiscovery.py
@@ -155,11 +155,14 @@
     sample = _limitsample(ownheads, initialsamplesize)
     # indices between sample and externalized version must match
     sample = list(sample)
-    batch = remote.iterbatch()
-    batch.heads()
-    batch.known(dag.externalizeall(sample))
-    batch.submit()
-    srvheadhashes, yesno = batch.results()
+
+    with remote.commandexecutor() as e:
+        fheads = e.callcommand('heads', {})
+        fknown = e.callcommand('known', {
+            'nodes': dag.externalizeall(sample),
+        })
+
+    srvheadhashes, yesno = fheads.result(), fknown.result()
 
     if cl.tip() == nullid:
         if srvheadhashes != [nullid]:



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list