[PATCH 1 of 2] bundle2: add a interrupt mechanism

Pierre-Yves David pierre-yves.david at ens-lyon.org
Tue Oct 21 20:08:45 UTC 2014


# HG changeset patch
# User Pierre-Yves David <pierre-yves.david at fb.com>
# Date 1413308867 25200
#      Tue Oct 14 10:47:47 2014 -0700
# Branch stable
# Node ID 1c1b98ea77615d601d8234f70728da9a21692c93
# Parent  c1ae0b2c1719f56b906472efea8b20ca0774c968
bundle2: add a interrupt mechanism

It is now possible to emit a single part in the middle of a payload production.
This part will be processed with limitation (only access to a `ui` object). The
goal is to let the server raise exception and output while a part is being
processed. The source motivation is to transmit exception that occurs while
generating a part.

This change is was the motivation to bump the bundle2 format from HG2X to HG2Y.
Somehow, the format bump made it into 3.2 without it. So this change go on
stable. It is low risk as bundle2 is still disabled by default.

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -693,10 +693,65 @@ class bundlepart(object):
                 yield chunk
                 chunk = buff.read(preferedchunksize)
         elif len(self.data):
             yield self.data
 
+
+flaginterrupt = -1
+
+class interrupthandler(unpackermixin):
+    """read one part and process it with restricted capability
+
+    This allows to transmit exception raised on the producer size during part
+    iteration while the consumer is reading a part.
+
+    Part processed in this manner only have access to a ui object,"""
+
+    def __init__(self, ui, fp):
+        super(interrupthandler, self).__init__(fp)
+        self.ui = ui
+
+    def _readpartheader(self):
+        """reads a part header size and return the bytes blob
+
+        returns None if empty"""
+        headersize = self._unpack(_fpartheadersize)[0]
+        if headersize < 0:
+            raise error.BundleValueError('negative part header size: %i'
+                                         % headersize)
+        self.ui.debug('part header size: %i\n' % headersize)
+        if headersize:
+            return self._readexact(headersize)
+        return None
+
+    def __call__(self):
+        self.ui.debug('bundle2 stream interruption, looking for a part.\n')
+        headerblock = self._readpartheader()
+        if headerblock is None:
+            self.ui.debug('no part found during iterruption.\n')
+            return
+        part = unbundlepart(self.ui, headerblock, self._fp)
+        op = interruptoperation(self.ui)
+        _processpart(op, part)
+
+class interruptoperation(object):
+    """A limited operation to be use by part handler during interruption
+
+    It only have access to an ui object.
+    """
+
+    def __init__(self, ui):
+        self.ui = ui
+        self.reply = None
+
+    @property
+    def repo(self):
+        raise RuntimeError('no repo access from stream interruption')
+
+    def gettransaction(self):
+        raise TransactionUnavailable('no repo access from stream interruption')
+
 class unbundlepart(unpackermixin):
     """a bundle part read from a bundle"""
 
     def __init__(self, ui, header, fp):
         super(unbundlepart, self).__init__(fp)
@@ -770,14 +825,19 @@ class unbundlepart(unpackermixin):
         ## part payload
         def payloadchunks():
             payloadsize = self._unpack(_fpayloadsize)[0]
             self.ui.debug('payload chunk size: %i\n' % payloadsize)
             while payloadsize:
-                if payloadsize < 0:
-                    msg = 'negative payload chunk size: %i' % payloadsize
+                if payloadsize == flaginterrupt:
+                    # interruption detection, the handler will now read a
+                    # single part and process it.
+                    interrupthandler(self.ui, self._fp)()
+                elif payloadsize < 0:
+                    msg = 'negative payload chunk size: %i' %  payloadsize
                     raise error.BundleValueError(msg)
-                yield self._readexact(payloadsize)
+                else:
+                    yield self._readexact(payloadsize)
                 payloadsize = self._unpack(_fpayloadsize)[0]
                 self.ui.debug('payload chunk size: %i\n' % payloadsize)
         self._payloadstream = util.chunkbuffer(payloadchunks())
         # we read the data, tell it
         self._initialized = True


More information about the Mercurial-devel mailing list