[PATCH 02 of 14 V3] util: implement varint functions

Boris Feld boris.feld at octobus.net
Fri Jan 19 18:47:07 EST 2018


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1516398755 -3600
#      Fri Jan 19 22:52:35 2018 +0100
# Node ID 5dbc3c53c923b8d11b5efcaf0f415b3d8c8c5180
# Parent  15f7795f96a5f9acb3ed2e640fcec82f3ccd6f53
# EXP-Topic b2-stream
# Available At https://bitbucket.org/octobus/mercurial-devel/
#              hg pull https://bitbucket.org/octobus/mercurial-devel/ -r 5dbc3c53c923
util: implement varint functions

This will be useful in an incoming version-2 of the stream format.

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -3874,3 +3874,73 @@ def readexactly(stream, n):
                            " (got %d bytes, expected %d)")
                           % (len(s), n))
     return s
+
+def uvarintencode(value):
+    """Encode an unsigned integer value to a varint.
+
+    A varint is a variable length integer of 1 or more bytes. Each byte
+    except the last has the most significant bit set. The lower 7 bits of
+    each byte store the 2's complement representation, least significant group
+    first.
+
+    >>> uvarintencode(0)
+    '\\x00'
+    >>> uvarintencode(1)
+    '\\x01'
+    >>> uvarintencode(127)
+    '\\x7f'
+    >>> uvarintencode(1337)
+    '\\xb9\\n'
+    >>> uvarintencode(65536)
+    '\\x80\\x80\\x04'
+    >>> uvarintencode(-1)
+    Traceback (most recent call last):
+        ...
+    ProgrammingError: negative value for uvarint: -1
+    """
+    if value < 0:
+        raise error.ProgrammingError('negative value for uvarint: %d'
+                                     % value)
+    bits = value & 0x7f
+    value >>= 7
+    bytes = []
+    while value:
+        bytes.append(pycompat.bytechr(0x80 | bits))
+        bits = value & 0x7f
+        value >>= 7
+    bytes.append(pycompat.bytechr(bits))
+
+    return ''.join(bytes)
+
+def uvarintdecodestream(fh):
+    """Decode an unsigned variable length integer from a stream.
+
+    The passed argument is anything that has a ``.read(N)`` method.
+
+    >>> try:
+    ...     from StringIO import StringIO as BytesIO
+    ... except ImportError:
+    ...     from io import BytesIO
+    >>> uvarintdecodestream(BytesIO(b'\\x00'))
+    0
+    >>> uvarintdecodestream(BytesIO(b'\\x01'))
+    1
+    >>> uvarintdecodestream(BytesIO(b'\\x7f'))
+    127
+    >>> uvarintdecodestream(BytesIO(b'\\xb9\\n'))
+    1337
+    >>> uvarintdecodestream(BytesIO(b'\\x80\\x80\\x04'))
+    65536
+    >>> uvarintdecodestream(BytesIO(b'\\x80'))
+    Traceback (most recent call last):
+        ...
+    Abort: stream ended unexpectedly (got 0 bytes, expected 1)
+    """
+    result = 0
+    shift = 0
+    while True:
+        byte = ord(readexactly(fh, 1))
+        result |= ((byte & 0x7f) << shift)
+        if not (byte & 0x80):
+            return result
+        shift += 7


More information about the Mercurial-devel mailing list