[PATCH 02 of 11] util: create new abstraction for compression engines

Gregory Szorc gregory.szorc at gmail.com
Tue Nov 1 20:08:35 EDT 2016


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1477966026 25200
#      Mon Oct 31 19:07:06 2016 -0700
# Node ID 4015d575d311cd7ebc923d1320e55a76c655c485
# Parent  60f180c9a030ebcee6c6f4f8584fdb94c73ac337
util: create new abstraction for compression engines

Currently, util.py has "compressors" and "decompressors" dicts
mapping compression algorithms to callables returning object that
perform well-defined operations. In addition, revlog.py has code
for calling into a compressor or decompressor explicitly. And, there
is code in the wire protocol for performing zlib compression.

The 3rd party lz4revlog extension has demonstrated the utility of
supporting alternative compression formats for revlog storage. But
it stops short of supporting lz4 for bundles and the wire protocol.

There are also plans to support zstd as a general compression
replacement.

So, there appears to be a market for a unified API for registering
compression engines. This commit starts the process of establishing
one. It establishes a new container class for holding registered
compression engine objects. Each object declares and supports common
operations via attributes.

The built-in zlib, bz2, truncated bz2, and no-op compression engines
are registered with a singleton instance of this class.

It's worth stating that I'm no fan of the "decompressorreader" API.
But this is what existing consumers expect. My plans are to get
consumers using the new "engines" API then transition them to a
better decompression primitive. This partially explains why I don't
care about the duplicated code pattern used for decompressors
(it is abstracted into _makedecompressor in the existing code).

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2851,21 +2851,156 @@ class ctxmanager(object):
                 exc_type, exc_val, exc_tb = pending = sys.exc_info()
         del self._atexit
         if pending:
             raise exc_val
         return received and suppressed
 
 # compression utility
 
+class compressormanager(object):
+    """Holds registrations of various compression engines.
+
+    This class essentially abstracts the differences between compression
+    engines to allow new compression formats to be added easily, possibly from
+    extensions.
+
+    Compressors are registered against the global instance by calling its
+    ``register()`` method.
+    """
+    def __init__(self):
+        self._engines = {}
+        self._bundletypes = {}
+
+    def __getitem__(self, key):
+        return self._engines[key]
+
+    def __contains__(self, key):
+        return key in self._engines
+
+    def __iter__(self):
+        return iter(self._engines.keys())
+
+    def register(self, name, engine):
+        """Register a compression format with the manager.
+
+        The passed compression engine is an object with attributes describing
+        behavior and methods performing well-defined actions. The following
+        attributes are recognized (all are optional):
+
+        * bundletype -- Attribute containing the identifier of this compression
+          format as used by bundles.
+
+        * compressorobj -- Method returning an object with ``compress(data)``
+          and ``flush()`` methods. This object and these methods are used to
+          incrementally feed data (presumably uncompressed) chunks into a
+          compressor. Calls to these methods return compressed bytes, which
+          may be 0-length if there is no output for the operation.
+
+        * decompressorreader -- Method that is used to perform decompression
+          on a file object. Argument is an object with a ``read(size)`` method
+          that returns compressed data. Return value is an object with a
+          ``read(size)`` that returns uncompressed data.
+        """
+        bundletype = getattr(engine, 'bundletype', None)
+        if bundletype and bundletype in self._bundletypes:
+            raise error.Abort(_('bundle type %s is already registered') %
+                              bundletype)
+
+        self._engines[name] = engine
+        if bundletype:
+            self._bundletypes[bundletype] = name
+
+    @property
+    def supportedbundletypes(self):
+        return set(self._bundletypes.keys())
+
+    def forbundletype(self, bundletype):
+        """Obtain a compression engine registered to a bundle type.
+
+        Will raise KeyError if the bundle type isn't registered.
+        """
+        return self._engines[self._bundletypes[bundletype]]
+
+compressionengines = compressormanager()
+
+class _zlibengine(object):
+    @property
+    def bundletype(self):
+        return 'GZ'
+
+    def compressorobj(self):
+        return zlib.compressobj()
+
+    def decompressorreader(self, fh):
+        def gen():
+            d = zlib.decompressobj()
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compressionengines.register('zlib', _zlibengine())
+
+class _bz2engine(object):
+    @property
+    def bundletype(self):
+        return 'BZ'
+
+    def compressorobj(self):
+        return bz2.BZ2Compressor()
+
+    def decompressorreader(self, fh):
+        def gen():
+            d = bz2.BZ2Decompressor()
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compressionengines.register('bz2', _bz2engine())
+
+class _truncatedbz2engine(object):
+    @property
+    def bundletype(self):
+        return '_truncatedBZ'
+
+    # We don't implement compressorobj because it is hackily handled elsewhere.
+
+    def decompressorreader(self, fh):
+        def gen():
+            # The input stream doesn't have the 'BZ' header. So add it back.
+            d = bz2.BZ2Decompressor()
+            d.decompress('BZ')
+            for chunk in filechunkiter(fh):
+                yield d.decompress(chunk)
+
+        return chunkbuffer(gen())
+
+compressionengines.register('bz2truncated', _truncatedbz2engine())
+
 class nocompress(object):
     def compress(self, x):
         return x
+
     def flush(self):
-        return ""
+        return ''
+
+class _noopengine(object):
+    @property
+    def bundletype(self):
+        return 'UN'
+
+    def compressorobj(self):
+        return nocompress()
+
+    def decompressorreader(self, fh):
+        return fh
+
+compressionengines.register('none', _noopengine())
 
 compressors = {
     None: nocompress,
     # lambda to prevent early import
     'BZ': lambda: bz2.BZ2Compressor(),
     'GZ': lambda: zlib.compressobj(),
     }
 # also support the old form by courtesies


More information about the Mercurial-devel mailing list