[PATCH 08 of 11] util: add a stream compression API to compression engines
Gregory Szorc
gregory.szorc at gmail.com
Tue Nov 1 20:08:41 EDT 2016
# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1477159930 25200
# Sat Oct 22 11:12:10 2016 -0700
# Node ID 1d4d111b644453acc4893478528a5f2ecd7ca023
# Parent 289da69280d95f1b983fdf9216739411a9953fb6
util: add a stream compression API to compression engines
It is a common pattern throughout the code to perform compression
on an iterator of chunks, yielding an iterator of compressed chunks.
Let's formalize that as part of the compression engine API.
diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2890,16 +2890,22 @@ class compressormanager(object):
format as used by bundles.
* compressorobj -- Method returning an object with ``compress(data)``
and ``flush()`` methods. This object and these methods are used to
incrementally feed data (presumably uncompressed) chunks into a
compressor. Calls to these methods return compressed bytes, which
may be 0-length if there is no output for the operation.
+ * compressstream -- Compress an iterator of chunks and return an
+ iterator of compressed chunks.
+
+ Optionally accepts an argument defining how to perform compression.
+ Each engine treats this argument differently.
+
* decompressorreader -- Method that is used to perform decompression
on a file object. Argument is an object with a ``read(size)`` method
that returns compressed data. Return value is an object with a
``read(size)`` that returns uncompressed data.
"""
bundletype = getattr(engine, 'bundletype', None)
if bundletype and bundletype in self._bundletypes:
raise error.Abort(_('bundle type %s is already registered') %
@@ -2925,16 +2931,29 @@ compressionengines = compressormanager()
class _zlibengine(object):
@property
def bundletype(self):
return 'GZ'
def compressorobj(self):
return zlib.compressobj()
+ def compressstream(self, it, opts=None):
+ opts = opts or {}
+
+ z = zlib.compressobj(opts.get('level', -1))
+ for chunk in it:
+ data = z.compress(chunk)
+ # Not all calls to compress emit data. It is cheaper to inspect
+ # here than to feed empty chunks through generator.
+ if data:
+ yield data
+
+ yield z.flush()
+
def decompressorreader(self, fh):
def gen():
d = zlib.decompressobj()
for chunk in filechunkiter(fh):
yield d.decompress(chunk)
return chunkbuffer(gen())
@@ -2943,16 +2962,26 @@ compressionengines.register('zlib', _zli
class _bz2engine(object):
@property
def bundletype(self):
return 'BZ'
def compressorobj(self):
return bz2.BZ2Compressor()
+ def compressstream(self, it, opts=None):
+ opts = opts or {}
+ z = bz2.BZ2Compressor(opts.get('level', 9))
+ for chunk in it:
+ data = z.compress(chunk)
+ if data:
+ yield data
+
+ yield z.flush()
+
def decompressorreader(self, fh):
def gen():
d = bz2.BZ2Decompressor()
for chunk in filechunkiter(fh):
yield d.decompress(chunk)
return chunkbuffer(gen())
@@ -2987,15 +3016,18 @@ class nocompress(object):
class _noopengine(object):
@property
def bundletype(self):
return 'UN'
def compressorobj(self):
return nocompress()
+ def compressstream(self, it, opts=None):
+ return it
+
def decompressorreader(self, fh):
return fh
compressionengines.register('none', _noopengine())
# convenient shortcut
dst = debugstacktrace
More information about the Mercurial-devel
mailing list