[PATCH 6 of 8 zstd-revlogs] util: compression APIs to support revlog decompression

Gregory Szorc gregory.szorc at gmail.com
Mon Jan 2 18:57:57 EST 2017


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1483392440 28800
#      Mon Jan 02 13:27:20 2017 -0800
# Node ID da38ab97f1f4d312966431a16d1f2acb90b75242
# Parent  9079e3d9ed5d7dbb1f108363c4dc37f5cdd4c7b7
util: compression APIs to support revlog decompression

Previously, compression engines had APIs for performing revlog
compression but no mechanism to perform revlog decompression. This
patch changes that.

Revlog decompression is slightly more complicated than compression
because in the compression case there is (currently) only a single
engine that can be used at a time. However for decompression, a
revlog could contain chunks from multiple compression engines. This
means decompression needs to map to multiple engines and
decompressors. This functionality is outside the scope of this patch.
But it drives the decision for engines to declare a byte header
sequence that identifies revlog data as belonging to an engine and
an API for obtaining an engine from a revlog header.

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2963,6 +2963,8 @@ class compressormanager(object):
         self._bundlenames = {}
         # Internal bundle identifier to engine name.
         self._bundletypes = {}
+        # Revlog header to engine name.
+        self._revlogheaders = {}
 
     def __getitem__(self, key):
         return self._engines[key]
@@ -3004,6 +3006,14 @@ class compressormanager(object):
 
             self._bundletypes[bundletype] = name
 
+        revlogheader = engine.revlogheader()
+        if revlogheader and revlogheader in self._revlogheaders:
+            raise error.Abort(_('revlog header %s already registered by %s') %
+                              (revlogheader, self._revlogheaders[revlogheader]))
+
+        if revlogheader:
+            self._revlogheaders[revlogheader] = name
+
         self._engines[name] = engine
 
     @property
@@ -3040,6 +3050,13 @@ class compressormanager(object):
                               engine.name())
         return engine
 
+    def forrevlogheader(self, header):
+        """Obtain a compression engine registered to a revlog header.
+
+        Will raise KeyError if the revlog header value isn't registered.
+        """
+        return self._engines[self._revlogheaders[header]]
+
 compengines = compressormanager()
 
 class compressionengine(object):
@@ -3080,6 +3097,16 @@ class compressionengine(object):
         """
         return None
 
+    def revlogheader(self):
+        """Header added to revlog chunks that identifies this engine.
+
+        If this engine can be used to compress revlogs, this method should
+        return the bytes used to identify chunks compressed with this engine.
+        Else, the method should return ``None`` to indicate it does not
+        participate in revlog compression.
+        """
+        return None
+
     def compressstream(self, it, opts=None):
         """Compress an iterator of chunks.
 
@@ -3109,6 +3136,13 @@ class compressionengine(object):
         the data could not be compressed (too small, not compressible, etc).
         The returned data should have a header uniquely identifying this
         compression format so decompression can be routed to this engine.
+        This header should be identified by the ``revlogheader()`` return
+        value.
+
+        The object has a ``decompress(data)`` method that decompresses
+        data. The method will only be called if ``data`` begins with
+        ``revlogheader()``. The method should return the raw, uncompressed
+        data or raise a ``RevlogError``.
 
         The object is reusable but is not thread safe.
         """
@@ -3121,6 +3155,9 @@ class _zlibengine(compressionengine):
     def bundletype(self):
         return 'gzip', 'GZ'
 
+    def revlogheader(self):
+        return 'x'
+
     def compressstream(self, it, opts=None):
         opts = opts or {}
 
@@ -3177,6 +3214,13 @@ class _zlibengine(compressionengine):
                     return ''.join(parts)
                 return None
 
+        def decompress(self, data):
+            try:
+                return zlib.decompress(data)
+            except zlib.error as e:
+                raise error.RevlogError(_('revlog decompress error: %s') %
+                                        str(e))
+
     def revlogcompressor(self, opts=None):
         return self.zlibrevlogcompressor()
 
@@ -3237,6 +3281,9 @@ class _noopengine(compressionengine):
     def bundletype(self):
         return 'none', 'UN'
 
+    # We don't implement revlogheader because it is handled specially
+    # in the revlog class.
+
     def compressstream(self, it, opts=None):
         return it
 
@@ -3274,6 +3321,9 @@ class _zstdengine(compressionengine):
     def bundletype(self):
         return 'zstd', 'ZS'
 
+    def revlogheader(self):
+        return '\x28'
+
     def compressstream(self, it, opts=None):
         opts = opts or {}
         # zstd level 3 is almost always significantly faster than zlib
@@ -3302,7 +3352,9 @@ class _zstdengine(compressionengine):
             # pre-allocate a buffer to hold the result.
             self._cctx = zstd.ZstdCompressor(level=level,
                                              write_content_size=True)
+            self._dctx = zstd.ZstdDecompressor()
             self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+            self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
 
         def compress(self, data):
             insize = len(data)
@@ -3333,6 +3385,28 @@ class _zstdengine(compressionengine):
                     return ''.join(chunks)
                 return None
 
+        def decompress(self, data):
+            insize = len(data)
+
+            try:
+                # This was measured to be faster than other streaming
+                # decompressors.
+                dobj = self._dctx.decompressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._decompinsize
+                    chunk = dobj.decompress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                # Frame should be exhausted, so no finish() API.
+
+                return ''.join(chunks)
+            except Exception as e:
+                raise error.RevlogError(_('revlog decompress error: %s') %
+                                        str(e))
+
     def revlogcompressor(self, opts=None):
         opts = opts or {}
         return self.zstdrevlogcompressor(self._module,


More information about the Mercurial-devel mailing list