[PATCH 3 of 7 iterbatch] peer: add an iterbatcher interface

Augie Fackler raf at durin42.com
Mon Mar 7 23:25:39 EST 2016


# HG changeset patch
# User Augie Fackler <augie at google.com>
# Date 1456875565 18000
#      Tue Mar 01 18:39:25 2016 -0500
# Node ID 409accaf401bbad8a3a3d1523bb455a02f6a4560
# Parent  071d05cce9156434dee4696287119f284631701b
# EXP-Topic batch
peer: add an iterbatcher interface

This is very much like ordinary batch(), but it will let me add a mode
for batch where we have pathologically large requests which are then
handled streamily. This will be a significant improvement for things
like remotefilelog, which may want to request thousands of entities at
once.

diff --git a/mercurial/peer.py b/mercurial/peer.py
--- a/mercurial/peer.py
+++ b/mercurial/peer.py
@@ -41,6 +41,14 @@ class batcher(object):
     def submit(self):
         raise NotImplementedError()
 
+class iterbatcher(batcher):
+
+    def submit(self):
+        raise NotImplementedError()
+
+    def results(self):
+        raise NotImplementedError()
+
 class localbatch(batcher):
     '''performs the queued calls directly'''
     def __init__(self, local):
@@ -50,6 +58,19 @@ class localbatch(batcher):
         for name, args, opts, resref in self.calls:
             resref.set(getattr(self.local, name)(*args, **opts))
 
+class localiterbatcher(iterbatcher):
+    def __init__(self, local):
+        super(iterbatcher, self).__init__()
+        self.local = local
+
+    def submit(self):
+        # submit for a local iter batcher is a noop
+        pass
+
+    def results(self):
+        for name, args, opts, resref in self.calls:
+            yield getattr(self.local, name)(*args, **opts)
+
 def batchable(f):
     '''annotation for batchable methods
 
@@ -91,6 +112,14 @@ class peerrepository(object):
     def batch(self):
         return localbatch(self)
 
+    def iterbatch(self):
+        """Batch requests but allow iterating over the results.
+
+        This is to allow interleaving responses with things like
+        progress updates for clients.
+        """
+        return localiterbatcher(self)
+
     def capable(self, name):
         '''tell whether repo supports named capability.
         return False if not supported.
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -114,6 +114,25 @@ class remotebatch(peer.batcher):
             encresref.set(encres)
             resref.set(batchable.next())
 
+class remoteiterbatcher(peer.iterbatcher):
+    def __init__(self, remote):
+        super(remoteiterbatcher, self).__init__()
+        self._remote = remote
+
+    def submit(self):
+        """Break the batch request into many patch calls and pipeline them.
+
+        This is mostly valuable over http where request sizes can be
+        limited, but can be used in other places as well.
+        """
+        rb = self._remote.batch()
+        rb.calls = self.calls
+        rb.submit()
+
+    def results(self):
+        for name, args, opts, resref in self.calls:
+            yield resref.value
+
 # Forward a couple of names from peer to make wireproto interactions
 # slightly more sensible.
 batchable = peer.batchable
@@ -193,6 +212,9 @@ class wirepeer(peer.peerrepository):
     def _submitone(self, op, args):
         return self._call(op, **args)
 
+    def iterbatch(self):
+        return remoteiterbatcher(self)
+
     @batchable
     def lookup(self, key):
         self.requirecap('lookup', _('look up remote revision'))


More information about the Mercurial-devel mailing list