D2640: cbor: remove tests files and fix core's test-check*

pulkit (Pulkit Goyal) phabricator at mercurial-scm.org
Sun Mar 4 16:11:26 UTC 2018


pulkit created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This patch fixes test-check* failures because of the new thirdparty cbor which
  is moved to core. Also this patch deletes the test files of cbor.
  
  Next patch will reorder the files so we can use them inside mercurial/

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D2640

AFFECTED FILES
  mercurial/thirdparty/cbor/cbor/__init__.py
  mercurial/thirdparty/cbor/cbor/cbor.py
  mercurial/thirdparty/cbor/cbor/tests/__init__.py
  mercurial/thirdparty/cbor/cbor/tests/test_cbor.py
  mercurial/thirdparty/cbor/cbor/tests/test_objects.py
  mercurial/thirdparty/cbor/cbor/tests/test_usage.py
  mercurial/thirdparty/cbor/cbor/tests/test_vectors.py
  mercurial/thirdparty/cbor/setup.py
  mercurial/thirdparty/cbor/utest.sh
  tests/test-check-py3-compat.t
  tests/test-check-pyflakes.t

CHANGE DETAILS

diff --git a/tests/test-check-pyflakes.t b/tests/test-check-pyflakes.t
--- a/tests/test-check-pyflakes.t
+++ b/tests/test-check-pyflakes.t
@@ -16,6 +16,7 @@
   $ testrepohg locate 'set:**.py or grep("^#!.*python")' \
   > -X hgext/fsmonitor/pywatchman \
   > -X mercurial/pycompat.py -X contrib/python-zstandard \
+  > -X mercurial/thirdparty/cbor \
   > 2>/dev/null \
   > | xargs pyflakes 2>/dev/null | "$TESTDIR/filterpyflakes.py"
   
diff --git a/tests/test-check-py3-compat.t b/tests/test-check-py3-compat.t
--- a/tests/test-check-py3-compat.t
+++ b/tests/test-check-py3-compat.t
@@ -5,6 +5,7 @@
 
   $ testrepohg files 'set:(**.py)' \
   > -X hgdemandimport/demandimportpy2.py \
+  > -X mercurial/thirdparty/cbor \
   > | sed 's|\\|/|g' | xargs $PYTHON contrib/check-py3-compat.py
   contrib/python-zstandard/setup.py not using absolute_import
   contrib/python-zstandard/setup_zstd.py not using absolute_import
diff --git a/mercurial/thirdparty/cbor/utest.sh b/mercurial/thirdparty/cbor/utest.sh
deleted file mode 100755
--- a/mercurial/thirdparty/cbor/utest.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/bin/sh -x
-
-python -m cbor.tests.test_cbor
-python -m cbor.tests.test_objects
-python -m cbor.tests.test_usage
-python -m cbor.tests.test_vectors
-
-#python cbor/tests/test_cbor.py
-#python cbor/tests/test_objects.py
-#python cbor/tests/test_usage.py
-#python cbor/tests/test_vectors.py
diff --git a/mercurial/thirdparty/cbor/setup.py b/mercurial/thirdparty/cbor/setup.py
--- a/mercurial/thirdparty/cbor/setup.py
+++ b/mercurial/thirdparty/cbor/setup.py
@@ -1,4 +1,3 @@
-#! /usr/bin/env python
 # Copyright 2014 Brian Olson
 # 
 # Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/mercurial/thirdparty/cbor/cbor/tests/test_vectors.py b/mercurial/thirdparty/cbor/cbor/tests/test_vectors.py
deleted file mode 100644
--- a/mercurial/thirdparty/cbor/cbor/tests/test_vectors.py
+++ /dev/null
@@ -1,142 +0,0 @@
-#!/usr/bin/env python
-
-"""
-Test CBOR implementation against common "test vectors" set from
-https://github.com/cbor/test-vectors/
-"""
-
-import base64
-import json
-import logging
-import math
-import os
-import sys
-import unittest
-
-
-_IS_PY3 = sys.version_info[0] >= 3
-
-
-logger = logging.getLogger(__name__)
-
-
-#from cbor.cbor import dumps as pydumps
-from cbor.cbor import loads as pyloads
-try:
-    #from cbor._cbor import dumps as cdumps
-    from cbor._cbor import loads as cloads
-except ImportError:
-    # still test what we can without C fast mode
-    logger.warn('testing without C accelerated CBOR', exc_info=True)
-    #cdumps, cloads = None, None
-    cloads = None
-from cbor import Tag
-
-
-# Accomodate several test vectors that have diagnostic descriptors but not JSON
-_DIAGNOSTIC_TESTS = {
-    'Infinity': lambda x: x == float('Inf'),
-    '-Infinity': lambda x: x == float('-Inf'),
-    'NaN': math.isnan,
-    'undefined': lambda x: x is None,
-
-    # TODO: parse into datetime.datetime()
-    '0("2013-03-21T20:04:00Z")': lambda x: isinstance(x, Tag) and (x.tag == 0) and (x.value == '2013-03-21T20:04:00Z'),
-
-    "h''": lambda x: x == b'',
-    "(_ h'0102', h'030405')": lambda x: x == b'\x01\x02\x03\x04\x05',
-    '{1: 2, 3: 4}': lambda x: x == {1: 2, 3: 4},
-    "h'01020304'": lambda x: x == b'\x01\x02\x03\x04',
-}
-
-
-# We expect these to raise exception because they encode reserved/unused codes in the spec.
-# ['hex'] values of tests we expect to raise
-_EXPECT_EXCEPTION = set(['f0', 'f818', 'f8ff'])
-
-
-def _check(row, decoded):
-    cbdata = base64.b64decode(row['cbor'])
-    if cloads is not None:
-        cb = cloads(cbdata)
-        if cb != decoded:
-            anyerr = True
-            sys.stderr.write('expected {0!r} got {1!r} c failed to decode cbor {2}\n'.format(decoded, cb, base64.b16encode(cbdata)))
-
-    cb = pyloads(cbdata)
-    if cb != decoded:
-        anyerr = True
-        sys.stderr.write('expected {0!r} got {1!r} py failed to decode cbor {2}\n'.format(decoded, cb, base64.b16encode(cbdata)))
-
-
-def _check_foo(row, checkf):
-    cbdata = base64.b64decode(row['cbor'])
-    if cloads is not None:
-        cb = cloads(cbdata)
-        if not checkf(cb):
-            anyerr = True
-            sys.stderr.write('expected {0!r} got {1!r} c failed to decode cbor {2}\n'.format(decoded, cb, base64.b16encode(cbdata)))
-
-    cb = pyloads(cbdata)
-    if not checkf(cb):
-        anyerr = True
-        sys.stderr.write('expected {0!r} got {1!r} py failed to decode cbor {2}\n'.format(decoded, cb, base64.b16encode(cbdata)))
-
-
-class TestVectors(unittest.TestCase):
-        def test_vectors(self):
-            here = os.path.dirname(__file__)
-            jf = os.path.abspath(os.path.join(here, '../../../test-vectors/appendix_a.json'))
-            if not os.path.exists(jf):
-                logging.warning('cannot find test-vectors/appendix_a.json, tried: %r', jf)
-                return
-
-            if _IS_PY3:
-                testfile = open(jf, 'r')
-                tv = json.load(testfile)
-            else:
-                testfile = open(jf, 'rb')
-                tv = json.load(testfile)
-            anyerr = False
-            for row in tv:
-                rhex = row.get('hex')
-                if 'decoded' in row:
-                    decoded = row['decoded']
-                    _check(row, decoded)
-                    continue
-                elif 'diagnostic' in row:
-                    diag = row['diagnostic']
-                    checkf = _DIAGNOSTIC_TESTS.get(diag)
-                    if checkf is not None:
-                        _check_foo(row, checkf)
-                        continue
-
-                # variously verbose log of what we're not testing:
-                cbdata = base64.b64decode(row['cbor'])
-                try:
-                    pd = pyloads(cbdata)
-                except:
-                    if rhex and (rhex in _EXPECT_EXCEPTION):
-                        pass
-                    else:
-                        logging.error('failed to py load hex=%s diag=%r', rhex, row.get('diagnostic'), exc_info=True)
-                    pd = ''
-                cd = None
-                if cloads is not None:
-                    try:
-                        cd = cloads(cbdata)
-                    except:
-                        if rhex and (rhex in _EXPECT_EXCEPTION):
-                            pass
-                        else:
-                            logging.error('failed to c load hex=%s diag=%r', rhex, row.get('diagnostic'), exc_info=True)
-                        cd = ''
-                logging.warning('skipping hex=%s diag=%r py=%s c=%s', rhex, row.get('diagnostic'), pd, cd)
-            testfile.close()
-
-            assert not anyerr
-
-
-if __name__ == '__main__':
-    logging.basicConfig(level=logging.DEBUG)
-    unittest.main()
diff --git a/mercurial/thirdparty/cbor/cbor/tests/test_usage.py b/mercurial/thirdparty/cbor/cbor/tests/test_usage.py
deleted file mode 100644
--- a/mercurial/thirdparty/cbor/cbor/tests/test_usage.py
+++ /dev/null
@@ -1,241 +0,0 @@
-#!python
-from __future__ import absolute_import
-from __future__ import division  # / => float
-import gc
-import logging
-import os
-import resource
-import sys
-import tempfile
-import unittest
-
-from cbor.tests.test_cbor import _randob
-
-
-logger = logging.getLogger(__name__)
-
-
-try:
-    from cbor._cbor import dumps as cdumps
-    from cbor._cbor import loads as cloads
-    from cbor._cbor import dump as cdump
-    from cbor._cbor import load as cload
-except ImportError:
-    # still test what we can without C fast mode
-    logger.warn('testing without C accelerated CBOR', exc_info=True)
-    cdumps, cloads, cdump, cload = None, None, None, None
-
-
-
-_TEST_COUNT = 100000
-_TEST_OUTER = 5
-
-
-_IS_PY3 = sys.version_info[0] >= 3
-
-
-if _IS_PY3:
-    _range = range
-    from io import BytesIO as StringIO
-else:
-    _range = xrange
-    from cStringIO import StringIO
-
-
-class TestUsage(unittest.TestCase):
-    def test_dumps_usage(self):
-        '''
-        repeatedly serialize, check that usage doesn't go up
-        '''
-        if cdumps is None:
-            logger.warn('no C dumps(), skipping test_dumps_usage')
-            return
-        start_usage = resource.getrusage(resource.RUSAGE_SELF)
-        usage_history = [start_usage]
-        for o in _range(_TEST_OUTER):
-            for i in _range(_TEST_COUNT):
-                ob = _randob()
-                blob = cdumps(ob)
-                # and silently drop the result. I hope the garbage collector works!
-            t_usage = resource.getrusage(resource.RUSAGE_SELF)
-            usage_history.append(t_usage)
-        end_usage = usage_history[-1]
-        dmaxrss = end_usage.ru_maxrss - start_usage.ru_maxrss
-        didrss = end_usage.ru_idrss - start_usage.ru_idrss
-        dmaxrsspct = ((end_usage.ru_maxrss != 0) and (dmaxrss / end_usage.ru_maxrss)) or 0
-        didrsspct = ((end_usage.ru_idrss != 0) and (didrss / end_usage.ru_idrss)) or 0
-
-        sys.stderr.write('maxrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_maxrss, end_usage.ru_maxrss, dmaxrss, dmaxrsspct * 100.0))
-        sys.stderr.write('idrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_idrss, end_usage.ru_idrss, didrss, didrsspct * 100.0))
-
-        assert (dmaxrsspct) < 0.05, [x.ru_maxrss for x in usage_history]
-        assert (didrsspct) < 0.05, [x.ru_idrss for x in usage_history]
-
-    def test_loads_usage(self):
-        '''
-        repeatedly serialize, check that usage doesn't go up
-        '''
-        if (cdumps is None) or (cloads is None):
-            logger.warn('no C fast CBOR, skipping test_loads_usage')
-            return
-        ## Just a string passes!
-        #ob = 'sntaoheusnatoheusnaotehuasnoetuhaosentuhaoesnth'
-        ## Just an array passes!
-        #ob = [1,2,3,4,5,6,7,8,9,12,12,13]
-        ## Just a dict passes!
-        #ob = {'a':'b', 'c':'d', 'e':'f', 'g':'h'}
-        # dict of dict is doom!
-        #ob = {'a':{'b':'c', 'd':'e', 'f':'g'}, 'x':'p'}
-        ob = {'aoeu':[1,2,3,4],'foo':'bar','pants':{'foo':0xb44, 'pi':3.14}, 'flubber': [{'x':'y', 'z':[None, 2, []]}, 2, 'hello']}
-        blob = cdumps(ob)
-        start_usage = resource.getrusage(resource.RUSAGE_SELF)
-        usage_history = [start_usage]
-        for o in _range(_TEST_OUTER):
-            for i in _range(_TEST_COUNT):
-                dob = cloads(blob)
-                # and silently drop the result. I hope the garbage collector works!
-            t_usage = resource.getrusage(resource.RUSAGE_SELF)
-            usage_history.append(t_usage)
-        end_usage = usage_history[-1]
-        dmaxrss = end_usage.ru_maxrss - start_usage.ru_maxrss
-        didrss = end_usage.ru_idrss - start_usage.ru_idrss
-        dmaxrsspct = ((end_usage.ru_maxrss != 0) and (dmaxrss / end_usage.ru_maxrss)) or 0
-        didrsspct = ((end_usage.ru_idrss != 0) and (didrss / end_usage.ru_idrss)) or 0
-
-        sys.stderr.write('maxrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_maxrss, end_usage.ru_maxrss, dmaxrss, dmaxrsspct * 100.0))
-        sys.stderr.write('idrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_idrss, end_usage.ru_idrss, didrss, didrsspct * 100.0))
-
-        assert (dmaxrsspct) < 0.05, [x.ru_maxrss for x in usage_history]
-        assert (didrsspct) < 0.05, [x.ru_idrss for x in usage_history]
-
-    def test_tempfile(self):
-        '''repeatedly seralize to temp file, then repeatedly deserialize from
-        it, checking usage all along the way.
-        '''
-        if cdump is None:
-            logger.warn('no C dump(), skipping test_tempfile')
-            return
-        with tempfile.NamedTemporaryFile() as ntf:
-            # first, write a bunch to temp file
-            with open(ntf.name, 'wb') as fout:
-                sys.stderr.write('write {!r} {}\n'.format(ntf.name, fout))
-                start_usage = resource.getrusage(resource.RUSAGE_SELF)
-                usage_history = [start_usage]
-                for o in _range(_TEST_OUTER):
-                    for i in _range(_TEST_COUNT):
-                        ob = _randob()
-                        cdump(ob, fout)
-                    t_usage = resource.getrusage(resource.RUSAGE_SELF)
-                    usage_history.append(t_usage)
-                end_usage = usage_history[-1]
-                dmaxrss = end_usage.ru_maxrss - start_usage.ru_maxrss
-                didrss = end_usage.ru_idrss - start_usage.ru_idrss
-                dmaxrsspct = ((end_usage.ru_maxrss != 0) and (dmaxrss / end_usage.ru_maxrss)) or 0
-                didrsspct = ((end_usage.ru_idrss != 0) and (didrss / end_usage.ru_idrss)) or 0
-
-                sys.stderr.write('maxrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_maxrss, end_usage.ru_maxrss, dmaxrss, dmaxrsspct * 100.0))
-                sys.stderr.write('idrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_idrss, end_usage.ru_idrss, didrss, didrsspct * 100.0))
-
-                assert (dmaxrsspct) < 0.05, [x.ru_maxrss for x in usage_history]
-                assert (didrsspct) < 0.05, [x.ru_idrss for x in usage_history]
-
-            sys.stderr.write('{!r} is {} bytes\n'.format(ntf.name, os.path.getsize(ntf.name)))
-
-            # now, read a bunch back from temp file.
-            with open(ntf.name, 'rb') as fin:
-                sys.stderr.write('read {!r} {}\n'.format(ntf.name, fin))
-                start_usage = resource.getrusage(resource.RUSAGE_SELF)
-                usage_history = [start_usage]
-                for o in _range(_TEST_OUTER):
-                    for i in _range(_TEST_COUNT):
-                        dob = cload(fin)
-                        # and silently drop the result. I hope the garbage collector works!
-                    gc.collect()
-                    t_usage = resource.getrusage(resource.RUSAGE_SELF)
-                    usage_history.append(t_usage)
-                end_usage = usage_history[-1]
-                dmaxrss = end_usage.ru_maxrss - start_usage.ru_maxrss
-                didrss = end_usage.ru_idrss - start_usage.ru_idrss
-                dmaxrsspct = ((end_usage.ru_maxrss != 0) and (dmaxrss / end_usage.ru_maxrss)) or 0
-                didrsspct = ((end_usage.ru_idrss != 0) and (didrss / end_usage.ru_idrss)) or 0
-
-                sys.stderr.write('maxrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_maxrss, end_usage.ru_maxrss, dmaxrss, dmaxrsspct * 100.0))
-                sys.stderr.write('idrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_idrss, end_usage.ru_idrss, didrss, didrsspct * 100.0))
-
-                assert (dmaxrsspct) < 0.05, [x.ru_maxrss for x in usage_history]
-                assert (didrsspct) < 0.05, [x.ru_idrss for x in usage_history]
-
-    def test_stringio_usage(self):
-        '''serialize data to StringIO, read it back'''
-        if cdump is None:
-            logger.warn('no C dump(), skipping test_tempfile')
-            return
-
-        # warmup the rusage, allocate everything!
-        fout = StringIO()
-        sys.stderr.write('write 1 to StringIO\n')
-        oblist = []
-        for o in _range(_TEST_OUTER):
-            for i in _range(_TEST_COUNT):
-                ob = _randob()
-                oblist.append(ob)
-                cdump(ob, fout)
-
-        # position at start to overwrite, but leave allocated
-        fout.seek(0)
-
-        sys.stderr.write('write 2 to StringIO\n')
-        start_usage = resource.getrusage(resource.RUSAGE_SELF)
-        usage_history = [start_usage]
-        pos = 0
-        for o in _range(_TEST_OUTER):
-            for i in _range(_TEST_COUNT):
-                ob = oblist[pos]
-                pos += 1
-                cdump(ob, fout)
-            gc.collect()
-            t_usage = resource.getrusage(resource.RUSAGE_SELF)
-            usage_history.append(t_usage)
-        end_usage = usage_history[-1]
-        dmaxrss = end_usage.ru_maxrss - start_usage.ru_maxrss
-        didrss = end_usage.ru_idrss - start_usage.ru_idrss
-        dmaxrsspct = ((end_usage.ru_maxrss != 0) and (dmaxrss / end_usage.ru_maxrss)) or 0
-        didrsspct = ((end_usage.ru_idrss != 0) and (didrss / end_usage.ru_idrss)) or 0
-
-        sys.stderr.write('maxrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_maxrss, end_usage.ru_maxrss, dmaxrss, dmaxrsspct * 100.0))
-        sys.stderr.write('idrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_idrss, end_usage.ru_idrss, didrss, didrsspct * 100.0))
-
-        assert (dmaxrsspct) < 0.05, [x.ru_maxrss for x in usage_history]
-        assert (didrsspct) < 0.05, [x.ru_idrss for x in usage_history]
-
-        sys.stderr.write('StringIO is {} bytes\n'.format(fout.tell()))
-        fout.seek(0)
-
-        fin = fout
-        sys.stderr.write('read StringIO\n')
-        start_usage = resource.getrusage(resource.RUSAGE_SELF)
-        usage_history = [start_usage]
-        for o in _range(_TEST_OUTER):
-            for i in _range(_TEST_COUNT):
-                dob = cload(fin)
-                # and silently drop the result. I hope the garbage collector works!
-            gc.collect()
-            t_usage = resource.getrusage(resource.RUSAGE_SELF)
-            usage_history.append(t_usage)
-        end_usage = usage_history[-1]
-        dmaxrss = end_usage.ru_maxrss - start_usage.ru_maxrss
-        didrss = end_usage.ru_idrss - start_usage.ru_idrss
-        dmaxrsspct = ((end_usage.ru_maxrss != 0) and (dmaxrss / end_usage.ru_maxrss)) or 0
-        didrsspct = ((end_usage.ru_idrss != 0) and (didrss / end_usage.ru_idrss)) or 0
-
-        sys.stderr.write('maxrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_maxrss, end_usage.ru_maxrss, dmaxrss, dmaxrsspct * 100.0))
-        sys.stderr.write('idrss: {} - {}, d={} ({:.2f}%)\n'.format(start_usage.ru_idrss, end_usage.ru_idrss, didrss, didrsspct * 100.0))
-
-        assert (dmaxrsspct) < 0.05, [x.ru_maxrss for x in usage_history]
-        assert (didrsspct) < 0.05, [x.ru_idrss for x in usage_history]
-
-
-
-if __name__ == '__main__':
-    logging.basicConfig(level=logging.DEBUG)
-    unittest.main()
diff --git a/mercurial/thirdparty/cbor/cbor/tests/test_objects.py b/mercurial/thirdparty/cbor/cbor/tests/test_objects.py
deleted file mode 100644
--- a/mercurial/thirdparty/cbor/cbor/tests/test_objects.py
+++ /dev/null
@@ -1,82 +0,0 @@
-import base64
-import sys
-import unittest
-
-
-from cbor.tagmap import ClassTag, TagMapper, Tag, UnknownTagException
-
-#try:
-from cbor.tests.test_cbor import TestPyPy, hexstr
-#except ImportError:
-#    from .test_cbor import TestPyPy, hexstr
-
-
-class SomeType(object):
-    "target type for translator tests"
-    def __init__(self, a, b):
-        self.a = a
-        self.b = b
-
-    @staticmethod
-    def to_cbor(ob):
-        assert isinstance(ob, SomeType)
-        return (ob.a, ob.b)
-
-    @staticmethod
-    def from_cbor(data):
-        return SomeType(*data)
-
-    def __eq__(self, other):
-        # why isn't this just the default implementation in the object class?
-        return isinstance(other, type(self)) and (self.__dict__ == other.__dict__)
-
-
-class UnknownType(object):
-    pass
-
-
-known_tags = [
-    ClassTag(4325, SomeType, SomeType.to_cbor, SomeType.from_cbor)
-]
-
-
-class TestObjects(unittest.TestCase):
-    def setUp(self):
-        self.tx = TagMapper(known_tags)
-
-    def _oso(self, ob):
-        ser = self.tx.dumps(ob)
-        try:
-            o2 = self.tx.loads(ser)
-            assert ob == o2, '%r != %r from %s' % (ob, o2, base64.b16encode(ser))
-        except Exception as e:
-            sys.stderr.write('failure on buf len={0} {1!r} ob={2!r} {3!r}; {4}\n'.format(len(ser), hexstr(ser), ob, ser, e))
-            raise
-
-    def test_basic(self):
-        self._oso(SomeType(1,2))
-
-    def test_unk_fail(self):
-        ok = False
-        try:
-            self.tx.dumps(UnknownType())
-        except:
-            ok = True
-        assert ok
-
-    def test_tag_passthrough(self):
-        self.tx.raise_on_unknown_tag = False
-        self._oso(Tag(1234, 'aoeu'))
-
-    def test_unk_tag_fail(self):
-        ok = False
-        self.tx.raise_on_unknown_tag = True
-        try:
-            self._oso(Tag(1234, 'aoeu'))
-        except UnknownTagException as ute:
-            ok = True
-        ok = False
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/mercurial/thirdparty/cbor/cbor/tests/test_cbor.py b/mercurial/thirdparty/cbor/cbor/tests/test_cbor.py
deleted file mode 100644
--- a/mercurial/thirdparty/cbor/cbor/tests/test_cbor.py
+++ /dev/null
@@ -1,442 +0,0 @@
-#!python
-# -*- coding: utf-8 -*-
-
-import base64
-import datetime
-import json
-import logging
-import random
-import sys
-import time
-import unittest
-import zlib
-
-
-logger = logging.getLogger(__name__)
-
-
-from cbor.cbor import dumps as pydumps
-from cbor.cbor import loads as pyloads
-from cbor.cbor import dump as pydump
-from cbor.cbor import load as pyload
-from cbor.cbor import Tag
-try:
-    from cbor._cbor import dumps as cdumps
-    from cbor._cbor import loads as cloads
-    from cbor._cbor import dump as cdump
-    from cbor._cbor import load as cload
-except ImportError:
-    # still test what we can without C fast mode
-    logger.warn('testing without C accelerated CBOR', exc_info=True)
-    cdumps, cloads, cdump, cload = None, None, None, None
-
-
-_IS_PY3 = sys.version_info[0] >= 3
-
-
-if _IS_PY3:
-    _range = range
-    from io import BytesIO as StringIO
-else:
-    _range = xrange
-    from cStringIO import StringIO
-
-
-class TestRoot(object):
-    @classmethod
-    def loads(cls, *args):
-        return cls._ld[0](*args)
-    @classmethod
-    def dumps(cls, *args, **kwargs):
-        return cls._ld[1](*args, **kwargs)
-    @classmethod
-    def speediterations(cls):
-        return cls._ld[2]
-    @classmethod
-    def load(cls, *args):
-        return cls._ld[3](*args)
-    @classmethod
-    def dump(cls, *args, **kwargs):
-        return cls._ld[4](*args, **kwargs)
-    @classmethod
-    def testable(cls):
-        ok = (cls._ld[0] is not None) and (cls._ld[1] is not None) and (cls._ld[3] is not None) and (cls._ld[4] is not None)
-        if not ok:
-            logger.warn('non-testable case %s skipped', cls.__name__)
-        return ok
-
-# Can't set class level function pointers, because then they expect a
-# (cls) first argument. So, toss them in a list to hide them.
-class TestPyPy(TestRoot):
-    _ld = [pyloads, pydumps, 1000, pyload, pydump]
-
-class TestPyC(TestRoot):
-    _ld = [pyloads, cdumps, 2000, pyload, cdump]
-
-class TestCPy(TestRoot):
-    _ld = [cloads, pydumps, 2000, cload, pydump]
-
-class TestCC(TestRoot):
-    _ld = [cloads, cdumps, 150000, cload, cdump]
-
-
-if _IS_PY3:
-    def _join_jsers(jsers):
-        return (''.join(jsers)).encode('utf8')
-    def hexstr(bs):
-        return ' '.join(map(lambda x: '{0:02x}'.format(x), bs))
-else:
-    def _join_jsers(jsers):
-        return b''.join(jsers)
-    def hexstr(bs):
-        return ' '.join(map(lambda x: '{0:02x}'.format(ord(x)), bs))
-
-
-class XTestCBOR(object):
-    def _oso(self, ob):
-        ser = self.dumps(ob)
-        try:
-            o2 = self.loads(ser)
-            assert ob == o2, '%r != %r from %s' % (ob, o2, base64.b16encode(ser))
-        except Exception as e:
-            sys.stderr.write('failure on buf len={0} {1!r} ob={2!r} {3!r}; {4}\n'.format(len(ser), hexstr(ser), ob, ser, e))
-            raise
-
-    def _osos(self, ob):
-        obs = self.dumps(ob)
-        o2 = self.loads(obs)
-        o2s = self.dumps(o2)
-        assert obs == o2s
-
-    def _oso_bytearray(self, ob):
-        ser = self.dumps(ob)
-        try:
-            o2 = self.loads(bytearray(ser))
-            assert ob == o2, '%r != %r from %s' % (ob, o2, base64.b16encode(ser))
-        except Exception as e:
-            sys.stderr.write('failure on buf len={0} {1!r} ob={2!r} {3!r}; {4}\n'.format(len(ser), hexstr(ser), ob, ser, e))
-            raise
-
-    test_objects = [
-        1,
-        0,
-        True,
-        False,
-        None,
-        -1,
-        -1.5,
-        1.5,
-        1000,
-        -1000,
-        1000000000,
-        2376030000,
-        -1000000000,
-        1000000000000000,
-        -1000000000000000,
-        [],
-        [1,2,3],
-        {},
-        b'aoeu1234\x00\xff',
-        u'åöéûのかめ亀',
-        b'',
-        u'',
-        Tag(1234, 'aoeu'),
-    ]
-
-    def test_basic(self):
-        if not self.testable(): return
-        for ob in self.test_objects:
-            self._oso(ob)
-
-    def test_basic_bytearray(self):
-        if not self.testable(): return
-        xoso = self._oso
-        self._oso = self._oso_bytearray
-        try:
-            self.test_basic()
-        finally:
-            self._oso = xoso
-
-    def test_random_ints(self):
-        if not self.testable(): return
-        icount = self.speediterations()
-        for i in _range(icount):
-            v = random.randint(-4294967295, 0xffffffff)
-            self._oso(v)
-        oldv = []
-        for i in _range(int(icount / 10)):
-            v = random.randint(-1000000000000000000000, 1000000000000000000000)
-            self._oso(v)
-            oldv.append(v)
-
-    def test_randobs(self):
-        if not self.testable(): return
-        icount = self.speediterations()
-        for i in _range(icount):
-            ob = _randob()
-            self._oso(ob)
-
-    def test_tuple(self):
-        if not self.testable(): return
-        l = [1,2,3]
-        t = tuple(l)
-        ser = self.dumps(t)
-        o2 = self.loads(ser)
-        assert l == o2
-
-    def test_speed_vs_json(self):
-        if not self.testable(): return
-        # It should be noted that the python standard library has a C implementation of key parts of json encoding and decoding
-        icount = self.speediterations()
-        obs = [_randob_notag() for x in _range(icount)]
-        st = time.time()
-        bsers = [self.dumps(o) for o in obs]
-        nt = time.time()
-        cbor_ser_time = nt - st
-        jsers = [json.dumps(o) for o in obs]
-        jt = time.time()
-        json_ser_time = jt - nt
-        cbor_byte_count = sum(map(len, bsers))
-        json_byte_count = sum(map(len, jsers))
-        sys.stderr.write(
-            'serialized {nobs} objects into {cb} cbor bytes in {ct:.2f} seconds ({cops:.2f}/s, {cbps:.1f}B/s) and {jb} json bytes in {jt:.2f} seconds ({jops:.2f}/s, {jbps:.1f}B/s)\n'.format(
-            nobs=len(obs),
-            cb=cbor_byte_count,
-            ct=cbor_ser_time,
-            cops=len(obs) / cbor_ser_time,
-            cbps=cbor_byte_count / cbor_ser_time,
-            jb=json_byte_count,
-            jt=json_ser_time,
-            jops=len(obs) / json_ser_time,
-            jbps=json_byte_count / json_ser_time))
-        bsersz = zlib.compress(b''.join(bsers))
-        jsersz = zlib.compress(_join_jsers(jsers))
-        sys.stderr.write('compress to {0} bytes cbor.gz and {1} bytes json.gz\n'.format(
-            len(bsersz), len(jsersz)))
-
-        st = time.time()
-        bo2 = [self.loads(b) for b in bsers]
-        bt = time.time()
-        cbor_load_time = bt - st
-        jo2 = [json.loads(b) for b in jsers]
-        jt = time.time()
-        json_load_time = jt - bt
-        sys.stderr.write('load {nobs} objects from cbor in {ct:.2f} secs ({cops:.2f}/sec, {cbps:.1f}B/s) and json in {jt:.2f} ({jops:.2f}/sec, {jbps:.1f}B/s)\n'.format(
-            nobs=len(obs),
-            ct=cbor_load_time,
-            cops=len(obs) / cbor_load_time,
-            cbps=cbor_byte_count / cbor_load_time,
-            jt=json_load_time,
-            jops=len(obs) / json_load_time,
-            jbps=json_byte_count / json_load_time
-        ))
-
-    def test_loads_none(self):
-        if not self.testable(): return
-        try:
-            ob = self.loads(None)
-            assert False, "expected ValueError when passing in None"
-        except ValueError:
-            pass
-
-    def test_concat(self):
-        "Test that we can concatenate output and retrieve the objects back out."
-        if not self.testable(): return
-        self._oso(self.test_objects)
-        fob = StringIO()
-
-        for ob in self.test_objects:
-            self.dump(ob, fob)
-        fob.seek(0)
-        obs2 = []
-        try:
-            while True:
-                obs2.append(self.load(fob))
-        except EOFError:
-            pass
-        assert obs2 == self.test_objects
-
-    # TODO: find more bad strings with which to fuzz CBOR
-    def test_badread(self):
-        if not self.testable(): return
-        try:
-            ob = self.loads(b'\xff')
-            assert False, 'badread should have failed'
-        except ValueError as ve:
-            #logger.info('error', exc_info=True)
-            pass
-        except Exception as ex:
-            logger.info('unexpected error!', exc_info=True)
-            assert False, 'unexpected error' + str(ex)
-
-    def test_datetime(self):
-        if not self.testable(): return
-        # right now we're just testing that it's possible to dumps()
-        # Tag(0,...) because there was a bug around that.
-        xb = self.dumps(Tag(0, datetime.datetime(1984,1,24,23,22,21).isoformat()))
-
-    def test_sortkeys(self):
-        if not self.testable(): return
-        obytes = []
-        xbytes = []
-        for n in _range(2, 27):
-            ob = {u'{:02x}'.format(x):x for x in _range(n)}
-            obytes.append(self.dumps(ob, sort_keys=True))
-            xbytes.append(self.dumps(ob, sort_keys=False))
-        allOGood = True
-        someXMiss = False
-        for i, g in enumerate(_GOLDEN_SORTED_KEYS_BYTES):
-            if g != obytes[i]:
-                logger.error('bad sorted result, wanted %r got %r', g, obytes[i])
-                allOGood = False
-            if g != xbytes[i]:
-                someXMiss = True
-
-        assert allOGood
-        assert someXMiss
-
-
-_GOLDEN_SORTED_KEYS_BYTES = [
-b'\xa2b00\x00b01\x01',
-b'\xa3b00\x00b01\x01b02\x02',
-b'\xa4b00\x00b01\x01b02\x02b03\x03',
-b'\xa5b00\x00b01\x01b02\x02b03\x03b04\x04',
-b'\xa6b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05',
-b'\xa7b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06',
-b'\xa8b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07',
-b'\xa9b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08',
-b'\xaab00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\t',
-b'\xabb00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\n',
-b'\xacb00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0b',
-b'\xadb00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0c',
-b'\xaeb00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\r',
-b'\xafb00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0e',
-b'\xb0b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0f',
-b'\xb1b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10',
-b'\xb2b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11',
-b'\xb3b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12',
-b'\xb4b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12b13\x13',
-b'\xb5b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12b13\x13b14\x14',
-b'\xb6b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12b13\x13b14\x14b15\x15',
-b'\xb7b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12b13\x13b14\x14b15\x15b16\x16',
-b'\xb8\x18b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12b13\x13b14\x14b15\x15b16\x16b17\x17',
-b'\xb8\x19b00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12b13\x13b14\x14b15\x15b16\x16b17\x17b18\x18\x18',
-b'\xb8\x1ab00\x00b01\x01b02\x02b03\x03b04\x04b05\x05b06\x06b07\x07b08\x08b09\tb0a\nb0b\x0bb0c\x0cb0d\rb0e\x0eb0f\x0fb10\x10b11\x11b12\x12b13\x13b14\x14b15\x15b16\x16b17\x17b18\x18\x18b19\x18\x19',
-]
-
-def gen_sorted_bytes():
-    for n in _range(2, 27):
-        sys.stdout.write(repr(cbor.dumps({u'{:02x}'.format(x):x for x in _range(n)}, sort_keys=True)) + ',\n')
-
-def gen_unsorted_bytes():
-    for n in _range(2, 27):
-        sys.stdout.write(repr(cbor.dumps({u'{:02x}'.format(x):x for x in _range(n)}, sort_keys=False)) + ',\n')
-
-
-class TestCBORPyPy(unittest.TestCase, XTestCBOR, TestPyPy):
-    pass
-
-class TestCBORCPy(unittest.TestCase, XTestCBOR, TestCPy):
-    pass
-
-class TestCBORPyC(unittest.TestCase, XTestCBOR, TestPyC):
-    pass
-
-class TestCBORCC(unittest.TestCase, XTestCBOR, TestCC):
-    pass
-
-
-def _randob():
-    return _randob_x(_randob_probabilities, _randob_probsum, _randob)
-
-def _randob_notag():
-    return _randob_x(_randob_probabilities_notag, _randob_notag_probsum, _randob_notag)
-
-def _randArray(randob=_randob):
-    return [randob() for x in _range(random.randint(0,5))]
-
-_chars = [chr(x) for x in _range(ord(' '), ord('~'))]
-
-def _randStringOrBytes(randob=_randob):
-    tstr = ''.join([random.choice(_chars) for x in _range(random.randint(1,10))])
-    if random.randint(0,1) == 1:
-        if _IS_PY3:
-            # default str is unicode
-            # sometimes squash to bytes
-            return tstr.encode('utf8')
-        else:
-            # default str is bytes
-            # sometimes promote to unicode string
-            return tstr.decode('utf8')
-    return tstr
-
-def _randString(randob=_randob):
-    return ''.join([random.choice(_chars) for x in _range(random.randint(1,10))])
-
-def _randDict(randob=_randob):
-    ob = {}
-    for x in _range(random.randint(0,5)):
-        ob[_randString()] = randob()
-    return ob
-
-
-def _randTag(randob=_randob):
-    t = Tag()
-    # Tags 0..36 are know standard things we might implement special
-    # decoding for. This number will grow over time, and this test
-    # need to be adjusted to only assign unclaimed tags for Tag<->Tag
-    # encode-decode testing.
-    t.tag = random.randint(37, 1000000)
-    t.value = randob()
-    return t
-
-def _randInt(randob=_randob):
-    return random.randint(-4294967295, 4294967295)
-
-def _randBignum(randob=_randob):
-    return random.randint(-1000000000000000000000, 1000000000000000000000)
-
-def _randFloat(randob=_randob):
-    return random.random()
-
-_CONSTANTS = (True, False, None)
-def _randConst(randob=_randob):
-    return random.choice(_CONSTANTS)
-
-_randob_probabilities = [
-    (0.1, _randDict),
-    (0.1, _randTag),
-    (0.2, _randArray),
-    (0.3, _randStringOrBytes),
-    (0.3, _randInt),
-    (0.2, _randBignum),
-    (0.2, _randFloat),
-    (0.2, _randConst),
-]
-
-_randob_probsum = sum([x[0] for x in _randob_probabilities])
-
-_randob_probabilities_notag = [
-    (0.1, _randDict),
-    (0.2, _randArray),
-    (0.3, _randString),
-    (0.3, _randInt),
-    (0.2, _randBignum),
-    (0.2, _randFloat),
-    (0.2, _randConst),
-]
-
-_randob_notag_probsum = sum([x[0] for x in _randob_probabilities_notag])
-
-def _randob_x(probs=_randob_probabilities, probsum=_randob_probsum, randob=_randob):
-    pos = random.uniform(0, probsum)
-    for p, op in probs:
-        if pos < p:
-            return op(randob)
-        pos -= p
-    return None
-
-
-if __name__ == '__main__':
-    logging.basicConfig(level=logging.INFO)
-    unittest.main()
diff --git a/mercurial/thirdparty/cbor/cbor/tests/__init__.py b/mercurial/thirdparty/cbor/cbor/tests/__init__.py
deleted file mode 100644
diff --git a/mercurial/thirdparty/cbor/cbor/cbor.py b/mercurial/thirdparty/cbor/cbor/cbor.py
--- a/mercurial/thirdparty/cbor/cbor/cbor.py
+++ b/mercurial/thirdparty/cbor/cbor/cbor.py
@@ -1,6 +1,3 @@
-#!python
-# -*- Python -*-
-
 import datetime
 import re
 import struct
diff --git a/mercurial/thirdparty/cbor/cbor/__init__.py b/mercurial/thirdparty/cbor/cbor/__init__.py
--- a/mercurial/thirdparty/cbor/cbor/__init__.py
+++ b/mercurial/thirdparty/cbor/cbor/__init__.py
@@ -1,5 +1,3 @@
-#!python
-
 try:
     # try C library _cbor.so
     from ._cbor import loads, dumps, load, dump



To: pulkit, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list