[PATCH 08 of 11] util: add a stream compression API to compression engines

Gregory Szorc gregory.szorc at gmail.com
Tue Nov 1 20:08:41 EDT 2016


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1477159930 25200
#      Sat Oct 22 11:12:10 2016 -0700
# Node ID 1d4d111b644453acc4893478528a5f2ecd7ca023
# Parent  289da69280d95f1b983fdf9216739411a9953fb6
util: add a stream compression API to compression engines

It is a common pattern throughout the code to perform compression
on an iterator of chunks, yielding an iterator of compressed chunks.
Let's formalize that as part of the compression engine API.

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2890,16 +2890,22 @@ class compressormanager(object):
           format as used by bundles.
 
         * compressorobj -- Method returning an object with ``compress(data)``
           and ``flush()`` methods. This object and these methods are used to
           incrementally feed data (presumably uncompressed) chunks into a
           compressor. Calls to these methods return compressed bytes, which
           may be 0-length if there is no output for the operation.
 
+        * compressstream -- Compress an iterator of chunks and return an
+          iterator of compressed chunks.
+
+          Optionally accepts an argument defining how to perform compression.
+          Each engine treats this argument differently.
+
         * decompressorreader -- Method that is used to perform decompression
           on a file object. Argument is an object with a ``read(size)`` method
           that returns compressed data. Return value is an object with a
           ``read(size)`` that returns uncompressed data.
         """
         bundletype = getattr(engine, 'bundletype', None)
         if bundletype and bundletype in self._bundletypes:
             raise error.Abort(_('bundle type %s is already registered') %
@@ -2925,16 +2931,29 @@ compressionengines = compressormanager()
 class _zlibengine(object):
     @property
     def bundletype(self):
         return 'GZ'
 
     def compressorobj(self):
         return zlib.compressobj()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+
+        z = zlib.compressobj(opts.get('level', -1))
+        for chunk in it:
+            data = z.compress(chunk)
+            # Not all calls to compress emit data. It is cheaper to inspect
+            # here than to feed empty chunks through generator.
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = zlib.decompressobj()
             for chunk in filechunkiter(fh):
                 yield d.decompress(chunk)
 
         return chunkbuffer(gen())
 
@@ -2943,16 +2962,26 @@ compressionengines.register('zlib', _zli
 class _bz2engine(object):
     @property
     def bundletype(self):
         return 'BZ'
 
     def compressorobj(self):
         return bz2.BZ2Compressor()
 
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        z = bz2.BZ2Compressor(opts.get('level', 9))
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
     def decompressorreader(self, fh):
         def gen():
             d = bz2.BZ2Decompressor()
             for chunk in filechunkiter(fh):
                 yield d.decompress(chunk)
 
         return chunkbuffer(gen())
 
@@ -2987,15 +3016,18 @@ class nocompress(object):
 class _noopengine(object):
     @property
     def bundletype(self):
         return 'UN'
 
     def compressorobj(self):
         return nocompress()
 
+    def compressstream(self, it, opts=None):
+        return it
+
     def decompressorreader(self, fh):
         return fh
 
 compressionengines.register('none', _noopengine())
 
 # convenient shortcut
 dst = debugstacktrace


More information about the Mercurial-devel mailing list