[PATCH 2 of 8] bundle2: add a unbundle part responsible from unbundling part

pierre-yves.david at ens-lyon.org pierre-yves.david at ens-lyon.org
Sat Apr 12 17:08:41 CDT 2014


# HG changeset patch
# User Pierre-Yves David <pierre-yves.david at fb.com>
# Date 1397245396 14400
#      Fri Apr 11 15:43:16 2014 -0400
# Node ID 5259d2fac78125ee7e5417566c7ff04f8efcc6fc
# Parent  24f7528f409dff53d1372df5015e59d331983311
bundle2: add a unbundle part responsible from unbundling part

We have a new unbundle class and it is now responsible from extracting its own
data. The top level bundler only extract the header (to detect end of stream
marker) then leave everything else to the `unbundlepart` class. The ultimate
goal is to have `unbundlepart` responsible to lazily extract its payload.

The unbundling code is mostly untouched but the diff seems not keen to
explicit this fact.

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -446,75 +446,26 @@ class unbundle20(unpackermixing):
     def __iter__(self):
         """yield all parts contained in the stream"""
         # make sure param have been loaded
         self.params
         self.ui.debug('start extraction of bundle2 parts\n')
-        part = self._readpart()
-        while part is not None:
+        headerblock = self._readpartheader()
+        while headerblock is not None:
+            part = unbundlepart(self.ui, headerblock, self._fp)
             yield part
-            part = self._readpart()
+            headerblock = self._readpartheader()
         self.ui.debug('end of bundle2 stream\n')
 
-    def _readpart(self):
-        """return None when an end of stream markers is reach"""
+    def _readpartheader(self):
+        """reads a part header size and return the bytes blob
 
+        returns None if empty"""
         headersize = self._unpack(_fpartheadersize)[0]
         self.ui.debug('part header size: %i\n' % headersize)
-        if not headersize:
-            return None
-        headerblock = self._readexact(headersize)
-        # some utility to help reading from the header block
-        self._offset = 0 # layer violation to have something easy to understand
-        def fromheader(size):
-            """return the next <size> byte from the header"""
-            offset = self._offset
-            data = headerblock[offset:(offset + size)]
-            self._offset = offset + size
-            return data
-        def unpackheader(format):
-            """read given format from header
-
-            This automatically compute the size of the format to read."""
-            data = fromheader(struct.calcsize(format))
-            return _unpack(format, data)
-
-        typesize = unpackheader(_fparttypesize)[0]
-        parttype = fromheader(typesize)
-        self.ui.debug('part type: "%s"\n' % parttype)
-        partid = unpackheader(_fpartid)[0]
-        self.ui.debug('part id: "%s"\n' % partid)
-        ## reading parameters
-        # param count
-        mancount, advcount = unpackheader(_fpartparamcount)
-        self.ui.debug('part parameters: %i\n' % (mancount + advcount))
-        # param size
-        paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
-        # make it a list of couple again
-        paramsizes = zip(paramsizes[::2], paramsizes[1::2])
-        # split mandatory from advisory
-        mansizes = paramsizes[:mancount]
-        advsizes = paramsizes[mancount:]
-        # retrive param value
-        manparams = []
-        for key, value in mansizes:
-            manparams.append((fromheader(key), fromheader(value)))
-        advparams = []
-        for key, value in advsizes:
-            advparams.append((fromheader(key), fromheader(value)))
-        del self._offset # clean up layer, nobody saw anything.
-        ## part payload
-        payload = []
-        payloadsize = self._unpack(_fpayloadsize)[0]
-        self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        while payloadsize:
-            payload.append(self._readexact(payloadsize))
-            payloadsize = self._unpack(_fpayloadsize)[0]
-            self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        payload = ''.join(payload)
-        current = bundlepart(parttype, manparams, advparams, data=payload)
-        current.id = partid
-        return current
+        if headersize:
+            return self._readexact(headersize)
+        return None
 
 
 class bundlepart(object):
     """A bundle2 part contains application level payload
 
@@ -582,10 +533,80 @@ class bundlepart(object):
                 yield chunk
                 chunk = buff.read(preferedchunksize)
         elif len(self.data):
             yield self.data
 
+class unbundlepart(unpackermixing):
+    """a bundle part read from a bundle"""
+
+    def __init__(self, ui, header, fp):
+        super(unbundlepart, self).__init__(fp)
+        self.ui = ui
+        # unbundle state attr
+        self._headerdata = header
+        # part data
+        self.id = None
+        self.type = None
+        self.mandatoryparams = None
+        self.advisoryparams = None
+        self.data = None
+        self._readdata()
+
+    def _readdata(self):
+        """read the header and setup the object"""
+        # some utility to help reading from the header block
+        headerblock = self._headerdata
+        self._offset = 0 # layer violation to have something easy to understand
+        def fromheader(size):
+            """return the next <size> byte from the header"""
+            offset = self._offset
+            data = headerblock[offset:(offset + size)]
+            self._offset = offset + size
+            return data
+        def unpackheader(format):
+            """read given format from header
+
+            This automatically compute the size of the format to read."""
+            data = fromheader(struct.calcsize(format))
+            return _unpack(format, data)
+
+        typesize = unpackheader(_fparttypesize)[0]
+        self.type = fromheader(typesize)
+        self.ui.debug('part type: "%s"\n' % self.type)
+        self.id = unpackheader(_fpartid)[0]
+        self.ui.debug('part id: "%s"\n' % self.id)
+        ## reading parameters
+        # param count
+        mancount, advcount = unpackheader(_fpartparamcount)
+        self.ui.debug('part parameters: %i\n' % (mancount + advcount))
+        # param size
+        paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
+        # make it a list of couple again
+        paramsizes = zip(paramsizes[::2], paramsizes[1::2])
+        # split mandatory from advisory
+        mansizes = paramsizes[:mancount]
+        advsizes = paramsizes[mancount:]
+        # retrive param value
+        manparams = []
+        for key, value in mansizes:
+            manparams.append((fromheader(key), fromheader(value)))
+        advparams = []
+        for key, value in advsizes:
+            advparams.append((fromheader(key), fromheader(value)))
+        del self._offset # clean up layer, nobody saw anything.
+        self.mandatoryparams = manparams
+        self.advisoryparams  = advparams
+        ## part payload
+        payload = []
+        payloadsize = self._unpack(_fpayloadsize)[0]
+        self.ui.debug('payload chunk size: %i\n' % payloadsize)
+        while payloadsize:
+            payload.append(self._readexact(payloadsize))
+            payloadsize = self._unpack(_fpayloadsize)[0]
+            self.ui.debug('payload chunk size: %i\n' % payloadsize)
+        self.data = ''.join(payload)
+
 @parthandler('changegroup')
 def handlechangegroup(op, inpart):
     """apply a changegroup part on the repo
 
     This is a very early implementation that will massive rework before being
@@ -603,13 +624,14 @@ def handlechangegroup(op, inpart):
     ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
     op.records.add('changegroup', {'return': ret})
     if op.reply is not None:
         # This is definitly not the final form of this
         # return. But one need to start somewhere.
-        op.reply.addpart(bundlepart('reply:changegroup', (),
-                                    [('in-reply-to', str(inpart.id)),
-                                     ('return', '%i' % ret)]))
+        part = bundlepart('reply:changegroup', (),
+                           [('in-reply-to', str(inpart.id)),
+                            ('return', '%i' % ret)])
+        op.reply.addpart(part)
 
 @parthandler('reply:changegroup')
 def handlechangegroup(op, inpart):
     p = dict(inpart.advisoryparams)
     ret = int(p['return'])


More information about the Mercurial-devel mailing list