D1388: bundle2: implement consume() API on unbundlepart

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Mon Nov 13 23:59:30 EST 2017


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  We want bundle parts to not be seekable by default. That means
  eliminating the generic seek() method.
  
  A common pattern in bundle2.py is to seek to the end of the part
  data. This is mainly used by the part iteration code to ensure
  the underlying stream is advanced to the next bundle part.
  
  In this commit, we establish a dedicated API for consuming a
  bundle2 part data. We switch users of seek() to it.
  
  The old implementation of seek(0, os.SEEK_END) would effectively
  call self.read(). The new implementation calls self.read(32768)
  in a loop. The old implementation would therefore assemble a
  buffer to hold all remaining data being seeked over. For seeking
  over large bundle parts, this would involve a large allocation and
  a lot of overhead to collect intermediate data! This overhead can
  be seen in the results for `hg perfbundleread`:
  
  ! bundle2 iterparts()
  ! wall 10.891305 comb 10.820000 user 7.990000 sys 2.830000 (best of 3)
  ! wall 8.070791 comb 8.060000 user 7.180000 sys 0.880000 (best of 3)
  ! bundle2 part seek()
  ! wall 12.991478 comb 10.390000 user 7.720000 sys 2.670000 (best of 3)
  ! wall 10.370142 comb 10.350000 user 7.430000 sys 2.920000 (best of 3)
  
  Of course, skipping over large payload data isn't likely very common.
  So I doubt the performance wins will be observed in the wild.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1388

AFFECTED FILES
  mercurial/bundle2.py

CHANGE DETAILS

diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py
--- a/mercurial/bundle2.py
+++ b/mercurial/bundle2.py
@@ -363,7 +363,7 @@
                 self.count = count
                 self.current = p
                 yield p
-                p.seek(0, os.SEEK_END)
+                p.consume()
                 self.current = None
         self.iterator = func()
         return self.iterator
@@ -385,11 +385,11 @@
             try:
                 if self.current:
                     # consume the part content to not corrupt the stream.
-                    self.current.seek(0, os.SEEK_END)
+                    self.current.consume()
 
                 for part in self.iterator:
                     # consume the bundle content
-                    part.seek(0, os.SEEK_END)
+                    part.consume()
             except Exception:
                 seekerror = True
 
@@ -856,10 +856,11 @@
         while headerblock is not None:
             part = seekableunbundlepart(self.ui, headerblock, self._fp)
             yield part
-            # Seek to the end of the part to force it's consumption so the next
-            # part can be read. But then seek back to the beginning so the
-            # code consuming this generator has a part that starts at 0.
-            part.seek(0, os.SEEK_END)
+            # Ensure part is fully consumed so we can start reading the next
+            # part.
+            part.consume()
+            # But then seek back to the beginning so the code consuming this
+            # generator has a part that starts at 0.
             part.seek(0, os.SEEK_SET)
             headerblock = self._readpartheader()
         indebug(self.ui, 'end of bundle2 stream')
@@ -1165,7 +1166,7 @@
             raise
         finally:
             if not hardabort:
-                part.seek(0, os.SEEK_END)
+                part.consume()
         self.ui.debug('bundle2-input-stream-interrupt:'
                       ' closing out of band context\n')
 
@@ -1300,6 +1301,20 @@
         """Generator of decoded chunks in the payload."""
         return decodepayloadchunks(self.ui, self._fp)
 
+    def consume(self):
+        """Read the part payload until completion.
+
+        By consuming the part data, the underlying stream read offset will
+        be advanced to the next part (or end of stream).
+        """
+        if self.consumed:
+            return
+
+        chunk = self.read(32768)
+        while chunk:
+            self._pos += len(chunk)
+            chunk = self.read(32768)
+
     def read(self, size=None):
         """read payload data"""
         if not self._initialized:



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list