D1387: bundle2: implement generic part payload decoder

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Tue Nov 14 04:59:26 UTC 2017


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  The previous commit extracted _payloadchunks() to a new derived class.
  There was still a reference to this method in unbundlepart, making
  unbundlepart unusable on its own.
  
  This commit implements a generic version of a bundle2 part payload
  decoder, without offset tracking. seekableunbundlepart._payloadchunks()
  has been refactored to consume it, adding offset tracking like before.
  We also implement unbundlepart._payloadchunks(), which is a thin
  wrapper for it. Since we never instantiate unbundlepart directly,
  this new method is not used. This will be changed in subsequent
  commits.
  
  The new implementation also inlines some simple code from unpackermixin
  and adds some local variable to prevent extra function calls and
  attribute lookups. `hg perfbundleread` on an uncompressed Firefox
  bundle seems to show a minor win:
  
  ! bundle2 iterparts()
  ! wall 12.593258 comb 12.250000 user 8.870000 sys 3.380000 (best of 3)
  ! wall 10.891305 comb 10.820000 user 7.990000 sys 2.830000 (best of 3)
  ! bundle2 part seek()
  ! wall 13.173163 comb 11.100000 user 8.390000 sys 2.710000 (best of 3)
  ! wall 12.991478 comb 10.390000 user 7.720000 sys 2.670000 (best of 3)
  ! bundle2 part read(8k)
  ! wall 9.483612 comb 9.480000 user 8.420000 sys 1.060000 (best of 3)
  ! wall 8.599892 comb 8.580000 user 7.720000 sys 0.860000 (best of 3)
  ! bundle2 part read(16k)
  ! wall 9.159815 comb 9.150000 user 8.220000 sys 0.930000 (best of 3)
  ! wall 8.265361 comb 8.250000 user 7.360000 sys 0.890000 (best of 3)
  ! bundle2 part read(32k)
  ! wall 9.141308 comb 9.130000 user 8.220000 sys 0.910000 (best of 3)
  ! wall 8.290308 comb 8.280000 user 7.330000 sys 0.950000 (best of 3)
  ! bundle2 part read(128k)
  ! wall 8.880587 comb 8.850000 user 7.960000 sys 0.890000 (best of 3)
  ! wall 8.204900 comb 8.150000 user 7.210000 sys 0.940000 (best of 3)
  
  Function call overhead in Python strikes again!
  
  Of course, bundle2 decoding CPU overhead is likely small compared to
  decompression and changegroup application. But every little bit helps.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1387

AFFECTED FILES
  mercurial/bundle2.py

CHANGE DETAILS

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -1187,6 +1187,32 @@
     def gettransaction(self):
         raise TransactionUnavailable('no repo access from stream interruption')
 
+def decodepayloadchunks(ui, fh):
+    """Reads bundle2 part payload data into chunks.
+
+    Part payload data consists of framed chunks. This function takes
+    a file handle and emits those chunks.
+    """
+    headersize = struct.calcsize(_fpayloadsize)
+    readexactly = changegroup.readexactly
+
+    chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+    indebug(ui, 'payload chunk size: %i' % chunksize)
+
+    while chunksize:
+        if chunksize >= 0:
+            yield readexactly(fh, chunksize)
+        elif chunksize == flaginterrupt:
+            # Interrupt "signal" detected. The regular stream is interrupted
+            # and a bundle2 part follows. Consume it.
+            interrupthandler(ui, fh)()
+        else:
+            raise error.BundleValueError(
+                'negative payload chunk size: %s' % chunksize)
+
+        chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
+        indebug(ui, 'payload chunk size: %i' % chunksize)
+
 class unbundlepart(unpackermixin):
     """a bundle part read from a bundle"""
 
@@ -1270,6 +1296,10 @@
         # we read the data, tell it
         self._initialized = True
 
+    def _payloadchunks(self):
+        """Generator of decoded chunks in the payload."""
+        return decodepayloadchunks(self.ui, self._fp)
+
     def read(self, size=None):
         """read payload data"""
         if not self._initialized:
@@ -1320,25 +1350,14 @@
             self._seekfp(self._chunkindex[chunknum][1])
 
         pos = self._chunkindex[chunknum][0]
-        payloadsize = self._unpack(_fpayloadsize)[0]
-        indebug(self.ui, 'payload chunk size: %i' % payloadsize)
-        while payloadsize:
-            if payloadsize == flaginterrupt:
-                # interruption detection, the handler will now read a
-                # single part and process it.
-                interrupthandler(self.ui, self._fp)()
-            elif payloadsize < 0:
-                msg = 'negative payload chunk size: %i' %  payloadsize
-                raise error.BundleValueError(msg)
-            else:
-                result = self._readexact(payloadsize)
-                chunknum += 1
-                pos += payloadsize
-                if chunknum == len(self._chunkindex):
-                    self._chunkindex.append((pos, self._tellfp()))
-                yield result
-            payloadsize = self._unpack(_fpayloadsize)[0]
-            indebug(self.ui, 'payload chunk size: %i' % payloadsize)
+
+        for chunk in decodepayloadchunks(self.ui, self._fp):
+            chunknum += 1
+            pos += len(chunk)
+            if chunknum == len(self._chunkindex):
+                self._chunkindex.append((pos, self._tellfp()))
+
+            yield chunk
 
     def _findchunk(self, pos):
         '''for a given payload position, return a chunk number and offset'''



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list