[PATCH 1 of 2] bundle2: add a interrupt mechanism

Pierre-Yves David pierre-yves.david at ens-lyon.org
Fri Oct 17 12:50:35 CDT 2014


# HG changeset patch
# User Pierre-Yves David <pierre-yves.david at fb.com>
# Date 1413308867 25200
#      Tue Oct 14 10:47:47 2014 -0700
# Node ID 4ab700f8a545b047ffeb5096e04ce56b67d2808d
# Parent  3d7027385482576bab77d1ca8b4b0b2b3784b7e6
bundle2: add a interrupt mechanism

It is now possible to emit a single part in the middle of a payload production.
This part will be processed with limitation (only access to a `ui` object). The
goal is to let the server raise exception and output while a part is being
processed. The source motivation is to transmit exception that occurs while
generating a part.

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -692,10 +692,65 @@ class bundlepart(object):
                 yield chunk
                 chunk = buff.read(preferedchunksize)
         elif len(self.data):
             yield self.data
 
+
+flaginterrupt = -1
+
+class interrupthandler(unpackermixin):
+    """read one part and process it with restricted capability
+
+    This allows to transmit exception raised on the producer size during part
+    iteration while the consumer is reading a part.
+
+    Part processed in this manner only have access to a ui object,"""
+
+    def __init__(self, ui, fp):
+        super(interrupthandler, self).__init__(fp)
+        self.ui = ui
+
+    def _readpartheader(self):
+        """reads a part header size and return the bytes blob
+
+        returns None if empty"""
+        headersize = self._unpack(_fpartheadersize)[0]
+        if headersize < 0:
+            raise error.BundleValueError('negative part header size: %i'
+                                         % headersize)
+        self.ui.debug('part header size: %i\n' % headersize)
+        if headersize:
+            return self._readexact(headersize)
+        return None
+
+    def __call__(self):
+        self.ui.debug('bundle2 stream interruption, looking for a part.\n')
+        headerblock = self._readpartheader()
+        if headerblock is None:
+            self.ui.debug('no part found during iterruption.\n')
+            return
+        part = unbundlepart(self.ui, headerblock, self._fp)
+        op = interruptoperation(self.ui)
+        _processpart(op, part)
+
+class interruptoperation(object):
+    """A limited operation to be use by part handler during interruption
+
+    It only have access to an ui object.
+    """
+
+    def __init__(self, ui):
+        self.ui = ui
+        self.reply = None
+
+    @property
+    def repo(self):
+        raise RuntimeError('no repo access from stream interruption')
+
+    def gettransaction(self):
+        raise TransactionUnavailable('no repo access from stream interruption')
+
 class unbundlepart(unpackermixin):
     """a bundle part read from a bundle"""
 
     def __init__(self, ui, header, fp):
         super(unbundlepart, self).__init__(fp)
@@ -769,14 +824,19 @@ class unbundlepart(unpackermixin):
         ## part payload
         def payloadchunks():
             payloadsize = self._unpack(_fpayloadsize)[0]
             self.ui.debug('payload chunk size: %i\n' % payloadsize)
             while payloadsize:
-                if payloadsize < 0:
-                    msg = 'negative payload chunk size: %i' % payloadsize
+                if payloadsize == flaginterrupt:
+                    # interruption detection, the handler will now read a
+                    # single part and process it.
+                    interrupthandler(self.ui, self._fp)()
+                elif payloadsize < 0:
+                    msg = 'negative payload chunk size: %i' %  payloadsize
                     raise error.BundleValueError(msg)
-                yield self._readexact(payloadsize)
+                else:
+                    yield self._readexact(payloadsize)
                 payloadsize = self._unpack(_fpayloadsize)[0]
                 self.ui.debug('payload chunk size: %i\n' % payloadsize)
         self._payloadstream = util.chunkbuffer(payloadchunks())
         # we read the data, tell it
         self._initialized = True


More information about the Mercurial-devel mailing list