D3303: cborutil: implement support for indefinite length CBOR types

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Fri Apr 13 06:26:44 UTC 2018


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  The vendored cbor2 package doesn't have support for streaming /
  indefinite length items when encoding. On the decoding side, it
  supported indefinite types. However, it waits for all data to arrive
  before emitting a result. This is kind of unfortunate because
  indefinite length items facilitate streaming without buffering.
  
  This commit implements support for encoding indefinite length
  bytestrings, arrays, and maps. It implements support for decoding
  indefinite length bytestrings.
  
  I strived to use generators for moving data around as much as
  possible because they are much efficient than read()/write()
  because no extra memory copying, allocation, concatenations,
  buffering, etc occur unless the producer/consumer needs it to.
  This helps keep things fast.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3303

AFFECTED FILES
  contrib/import-checker.py
  mercurial/utils/cborutil.py
  tests/test-cbor.py

CHANGE DETAILS

diff --git a/tests/test-cbor.py b/tests/test-cbor.py
new file mode 100644
--- /dev/null
+++ b/tests/test-cbor.py
@@ -0,0 +1,235 @@
+from __future__ import absolute_import
+
+import io
+import unittest
+
+from mercurial.thirdparty import (
+    cbor,
+)
+from mercurial.utils import (
+    cborutil,
+)
+
+class IndefiniteBytestringTests(unittest.TestCase):
+    def testitertoiter(self):
+        # This is the example from RFC 7049 Section 2.2.2.
+        source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
+
+        it = cborutil.itertoindefinitebytestring(source)
+
+        self.assertEqual(next(it), b'\x5f')
+        self.assertEqual(next(it), b'\x44')
+        self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
+        self.assertEqual(next(it), b'\x43')
+        self.assertEqual(next(it), b'\xee\xff\x99')
+        self.assertEqual(next(it), b'\xff')
+
+        with self.assertRaises(StopIteration):
+            next(it)
+
+        dest = b''.join(cborutil.itertoindefinitebytestring(source))
+
+        self.assertEqual(cbor.loads(dest), b''.join(source))
+
+    def testreadtoiter(self):
+        source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff')
+
+        it = cborutil.readindefinitebytestringtoiter(source)
+        self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
+        self.assertEqual(next(it), b'\xee\xff\x99')
+
+        with self.assertRaises(StopIteration):
+            next(it)
+
+    def testtoiterlarge(self):
+        source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
+
+        dest = b''.join(cborutil.itertoindefinitebytestring(source))
+
+        self.assertEqual(cbor.loads(dest), b''.join(source))
+
+    def testbuffertoindefinite(self):
+        source = b'\x00\x01\x02\x03' + b'\xff' * 16384
+
+        it = cborutil.buffertoindefinitebytestring(source, chunksize=2)
+
+        self.assertEqual(next(it), b'\x5f')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\x00\x01')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\x02\x03')
+        self.assertEqual(next(it), b'\x42')
+        self.assertEqual(next(it), b'\xff\xff')
+
+    def testbuffertoindefiniteroundtrip(self):
+        source = b'x' * 1048576
+
+        chunks = list(cborutil.buffertoindefinitebytestring(source))
+        self.assertEqual(len(chunks), 34)
+
+        self.assertEqual(cbor.loads(b''.join(chunks)), source)
+
+class StreamArrayTests(unittest.TestCase):
+    def testempty(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder):
+            pass
+
+        self.assertEqual(b.getvalue(), '\x9f\xff')
+        self.assertEqual(cbor.loads(b.getvalue()), [])
+
+    def testone(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder) as fn:
+            fn(b'foo')
+
+        self.assertEqual(cbor.loads(b.getvalue()), [b'foo'])
+
+    def testmultiple(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder) as fn:
+            fn(0)
+            fn(True)
+            fn(b'foo')
+            fn(None)
+
+        self.assertEqual(cbor.loads(b.getvalue()), [0, True, b'foo', None])
+
+    def testnested(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streamarray(encoder):
+            with cborutil.streamarray(encoder) as fn:
+                fn(b'foo')
+                fn(b'bar')
+
+        self.assertEqual(cbor.loads(b.getvalue()), [[b'foo', b'bar']])
+
+    def testitemslist(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        orig = [b'foo', b'bar', None, True, 42]
+
+        cborutil.streamarrayitems(encoder, orig)
+        self.assertEqual(cbor.loads(b.getvalue()), orig)
+
+    def testitemsgen(self):
+        def makeitems():
+            yield b'foo'
+            yield b'bar'
+            yield None
+            yield 42
+
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        cborutil.streamarrayitems(encoder, makeitems())
+        self.assertEqual(cbor.loads(b.getvalue()), [b'foo', b'bar', None, 42])
+
+class StreamMapTests(unittest.TestCase):
+    def testempty(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder):
+            pass
+
+        self.assertEqual(b.getvalue(), '\xbf\xff')
+        self.assertEqual(cbor.loads(b.getvalue()), {})
+
+    def testone(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder) as fn:
+            fn(b'key1', b'value1')
+
+        self.assertEqual(cbor.loads(b.getvalue()), {b'key1': b'value1'})
+
+    def testmultiple(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder) as fn:
+            fn(0, 1)
+            fn(b'key1', b'value1')
+            fn(True, None)
+
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            0: 1,
+            b'key1': b'value1',
+            True: None,
+        })
+
+    def testcomplex(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder) as fn:
+            fn(b'key1', b'value1')
+            fn(b'map', {b'inner1key': b'inner1value'})
+            fn(b'array', [0, 1, 2])
+
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            b'key1': b'value1',
+            b'map': {b'inner1key': b'inner1value'},
+            b'array': [0, 1, 2],
+        })
+
+    def testnested(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        with cborutil.streammap(encoder):
+            encoder.encode(b'streamkey')
+            with cborutil.streammap(encoder) as fn2:
+                fn2(b'inner1key', b'inner1value')
+                fn2(0, 1)
+
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            b'streamkey': {
+                b'inner1key': b'inner1value',
+                0: 1,
+            },
+        })
+
+    def testitemsdict(self):
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        orig = [
+            (b'foo', b'bar'),
+            (42, 19),
+            (None, True),
+        ]
+
+        cborutil.streammapitems(encoder, orig)
+        self.assertEqual(cbor.loads(b.getvalue()), dict(orig))
+
+    def testitemsgen(self):
+        def makeitems():
+            yield b'foo', b'bar'
+            yield None, True
+            yield 42, 19
+
+        b = io.BytesIO()
+        encoder = cbor.CBOREncoder(b)
+
+        cborutil.streammapitems(encoder, makeitems())
+        self.assertEqual(cbor.loads(b.getvalue()), {
+            b'foo': b'bar',
+            None: True,
+            42: 19,
+        })
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)
diff --git a/mercurial/utils/cborutil.py b/mercurial/utils/cborutil.py
new file mode 100644
--- /dev/null
+++ b/mercurial/utils/cborutil.py
@@ -0,0 +1,206 @@
+# cborutil.py - CBOR extensions
+#
+# Copyright 2018 Gregory Szorc <gregory.szorc at gmail.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+import contextlib
+import struct
+
+from ..thirdparty.cbor.cbor2 import (
+    decoder as decodermod,
+    encoder as encodermod,
+)
+
+# Very short very of RFC 7049...
+#
+# Each item begins with a byte. The 3 high bits of that byte denote the
+# "major type." The lower 5 bits denote the "subtype." Each major type
+# has its own encoding mechanism.
+#
+# Most types have lengths. However, bytestring, string, array, and map
+# can be indefinite length. These are denotes by a subtype with value 31.
+# Sub-components of those types then come afterwards and are terminated
+# by a "break" byte.
+
+MAJOR_TYPE_UINT = 0
+MAJOR_TYPE_NEGINT = 1
+MAJOR_TYPE_BYTESTRING = 2
+MAJOR_TYPE_STRING = 3
+MAJOR_TYPE_ARRAY = 4
+MAJOR_TYPE_MAP = 5
+MAJOR_TYPE_SEMANTIC = 6
+MAJOR_TYPE_SPECIAL = 7
+
+SUBTYPE_MASK = 0b00011111
+
+SUBTYPE_INDEFINITE = 31
+
+# Indefinite types begin with their major type ORd with information value 31.
+BEGIN_INDEFINITE_BYTESTRING = struct.pack(
+    r'>B', MAJOR_TYPE_BYTESTRING << 5 | SUBTYPE_INDEFINITE)
+BEGIN_INDEFINITE_ARRAY = struct.pack(
+    r'>B', MAJOR_TYPE_ARRAY << 5 | SUBTYPE_INDEFINITE)
+BEGIN_INDEFINITE_MAP = struct.pack(
+    r'>B', MAJOR_TYPE_MAP << 5 | SUBTYPE_INDEFINITE)
+
+# The break ends an indefinite length item.
+BREAK = b'\xff'
+BREAK_INT = 255
+
+def beginindefinitearray(encoder):
+    encoder.write(BEGIN_INDEFINITE_ARRAY)
+
+def beginindefinitemap(encoder):
+    encoder.write(BEGIN_INDEFINITE_MAP)
+
+ at contextlib.contextmanager
+def streamarray(encoder):
+    """Write an array in a streaming manner.
+
+    Used as a context manager, the context manager resolves to a function
+    that should be called for each item to write to the array.
+
+    When the context manager exits, the indefinite length array is ended.
+    """
+    def writeitem(value):
+        encoder.encode(value)
+
+    beginindefinitearray(encoder)
+    yield writeitem
+    encoder.write(BREAK)
+
+def streamarrayitems(encoder, items):
+    """Write out an iterable of items to a streaming array."""
+    with streamarray(encoder) as fn:
+        for value in items:
+            fn(value)
+
+ at contextlib.contextmanager
+def streammap(encoder):
+    """Write a map in a streaming manner.
+
+    Used as a context manager, the context manager resolves to a function
+    that should be called with a key and value of each map item to write.
+
+    When the context manager exits, the indefinite length map is ended.
+
+    If is possible to nest streaming data structures. If the caller writes
+    out 2 values, the first value will be interpreted as a key and the second
+    a value. So a caller could do something like::
+
+        with streammap(encoder):
+            encoder.encode(b'mykey')
+            with streammap(encoder) as fn:
+                fn(b'innerkey', b'value')
+
+    This would decode to ``{b'mykey': {b'innerkey': b'value'}}``.
+    """
+    def writeitem(key, value):
+        encoder.encode(key)
+        encoder.encode(value)
+
+    beginindefinitemap(encoder)
+    yield writeitem
+    encoder.write(BREAK)
+
+def streammapitems(encoder, items):
+    """Write out an iterable of (key, value) items to a streaming map."""
+    with streammap(encoder) as fn:
+        for key, value in items:
+            fn(key, value)
+
+def itertoindefinitebytestring(it):
+    """Convert an iterator of chunks to an indefinite bytestring.
+
+    Given an input that is iterable and each element in the iterator is
+    representable as bytes, emit an indefinite length bytestring.
+    """
+    # Alias for performance.
+    encodelen = encodermod.encode_length
+    bytestringmajor = MAJOR_TYPE_BYTESTRING << 5
+
+    yield BEGIN_INDEFINITE_BYTESTRING
+
+    for chunk in it:
+        yield encodelen(bytestringmajor, len(chunk))
+        yield chunk
+
+    yield BREAK
+
+def buffertoindefinitebytestring(source, chunksize=65536):
+    """Given a large source buffer, emit as an indefinite length bytestring.
+
+    This is a generator of chunks constituting the encoded CBOR data.
+    """
+    # Alias for performance.
+    encodelen = encodermod.encode_length
+    bytestringmajor = MAJOR_TYPE_BYTESTRING << 5
+
+    yield BEGIN_INDEFINITE_BYTESTRING
+
+    i = 0
+    l = len(source)
+
+    while True:
+        chunk = source[i:i + chunksize]
+        i += len(chunk)
+
+        yield encodelen(bytestringmajor, len(chunk))
+        yield chunk
+
+        if i >= l:
+            break
+
+    yield BREAK
+
+def readindefinitebytestringtoiter(fh, expectheader=True):
+    """Read an indefinite bytestring to a generator.
+
+    Receives an object with a ``read(X)`` method to read N bytes.
+
+    If ``expectheader`` is True, it is expected that the first byte read
+    will represent an indefinite length bytestring. Otherwise, we
+    expect the first byte to be part of the first bytestring chunk.
+    """
+    read = fh.read
+    decodeuint = decodermod.decode_uint
+    byteasinteger = decodermod.byte_as_integer
+
+    if expectheader:
+        initial = decodermod.byte_as_integer(read(1))
+
+        majortype = initial >> 5
+        subtype = initial & SUBTYPE_MASK
+
+        if majortype != MAJOR_TYPE_BYTESTRING:
+            raise decodermod.CBORDecodeError(
+                'expected major type %d; got %d' % (MAJOR_TYPE_BYTESTRING,
+                                                    majortype))
+
+        if subtype != SUBTYPE_INDEFINITE:
+            raise decodermod.CBORDecodeError(
+                'expected indefinite subtype; got %d' % subtype)
+
+    # The indefinite bytestring is composed of chunks of normal bytestrings.
+    # Read chunks until we hit a BREAK byte.
+
+    while True:
+        # We need to sniff for the BREAK byte.
+        initial = byteasinteger(read(1))
+
+        if initial == BREAK_INT:
+            break
+
+        length = decodeuint(fh, initial & SUBTYPE_MASK)
+        chunk = read(length)
+
+        if len(chunk) != length:
+            raise decodermod.CBORDecodeError(
+                'failed to read bytestring chunk: got %d bytes; expected %d' % (
+                    len(chunk), length))
+
+        yield chunk
diff --git a/contrib/import-checker.py b/contrib/import-checker.py
--- a/contrib/import-checker.py
+++ b/contrib/import-checker.py
@@ -36,6 +36,8 @@
     'mercurial.pure.parsers',
     # third-party imports should be directly imported
     'mercurial.thirdparty',
+    'mercurial.thirdparty.cbor',
+    'mercurial.thirdparty.cbor.cbor2',
     'mercurial.thirdparty.zope',
     'mercurial.thirdparty.zope.interface',
 )



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list