[PATCH] util: introduce ctxmanager, to avoid nested try/finally blocks

Bryan O'Sullivan bos at serpentine.com
Mon Jan 11 23:25:48 UTC 2016


# HG changeset patch
# User Bryan O'Sullivan <bos at serpentine.com>
# Date 1452554743 28800
#      Mon Jan 11 15:25:43 2016 -0800
# Node ID 018c00fff7fb75891f8771815a865015424eedea
# Parent  44c0f05935c2b2ad02352c34b6d670dd0b2e5e5b
util: introduce ctxmanager, to avoid nested try/finally blocks

This is similar in spirit to contextlib.nested in Python <= 2.6,
but uses an extra level of indirection to avoid its inability to
clean up if an __enter__ method raises an exception.

Why add this mechanism?  It greatly simplifies scoped resource
management, and lets us eliminate several hundred lines of try/finally
blocks.  In many of these cases the "finally" is separated from the
"try" by hundreds of lines of code, which makes the connection
between resource acquisition and disposal difficult to follow.

(The preferred mechanism would be the "multi-with" syntax of 2.7+,
but Mercurial can't move to 2.7 for a while.)

Intended use:

>>> with ctxmanager(lambda: file('foo'), lambda: file('bar')) as c:
>>>    f1, f2 = c()

This will open both foo and bar when c() is invoked, and will close
both upon exit from the block.  If the attempt to open bar raises
an exception, the block will not be entered - but foo will still
be closed.

diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -2632,6 +2632,66 @@ def _makedecompressor(decompcls):
         return chunkbuffer(generator(fh))
     return func
 
+class ctxmanager(object):
+    '''A context manager for use in 'with' blocks to allow multiple
+    contexts to be entered at once.  This is both safer and more
+    flexible than contextlib.nested.
+
+    Once Mercurial supports Python 2.7+, this will become mostly
+    unnecessary.
+    '''
+
+    def __init__(self, *args):
+        '''Accepts a list of no-argument functions that return context
+        managers.  These will be invoked at __call__ time.'''
+        self._pending = args
+        self._atexit = []
+
+    def __enter__(self):
+        return self
+
+    def __call__(self):
+        '''Create and enter context managers in the order in which they were
+        passed to the constructor.'''
+        values = []
+        for func in self._pending:
+            obj = func()
+            values.append(obj.__enter__())
+            self._atexit.append(obj.__exit__)
+        del self._pending
+        return values
+
+    def atexit(self, func, *args, **kwargs):
+        '''Add a function to call when this context manager exits.  The
+        ordering of multiple atexit calls is unspecified, save that
+        they will happen before any __exit__ functions.'''
+        def wrapper(exc_type, exc_val, exc_tb):
+            func(*args, **kwargs)
+        self._atexit.append(wrapper)
+        return func
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        '''Context managers are exited in the reverse order from which
+        they were created.'''
+        received = exc_type is not None
+        suppressed = False
+        pending = None
+        self._atexit.reverse()
+        for exitfunc in self._atexit:
+            try:
+                if exitfunc(exc_type, exc_val, exc_tb):
+                    suppressed = True
+                    exc_type = None
+                    exc_val = None
+                    exc_tb = None
+            except BaseException, e:
+                pending = sys.exc_info()
+                exc_type, exc_val, exc_tb = pending = sys.exc_info()
+        del self._atexit
+        if pending:
+            raise exc_val
+        return received and suppressed
+
 def _bz2():
     d = bz2.BZ2Decompressor()
     # Bzip2 stream start with BZ, but we stripped it.
diff --git a/tests/test-ctxmanager.py b/tests/test-ctxmanager.py
new file mode 100644
--- /dev/null
+++ b/tests/test-ctxmanager.py
@@ -0,0 +1,77 @@
+from __future__ import absolute_import
+
+import silenttestrunner
+import unittest
+
+from mercurial.util import ctxmanager
+
+class contextmanager(object):
+    def __init__(self, name, trace):
+        self.name = name
+        self.entered = False
+        self.exited = False
+        self.trace = trace
+
+    def __enter__(self):
+        self.entered = True
+        self.trace(('enter', self.name))
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.exited = exc_type, exc_val, exc_tb
+        self.trace(('exit', self.name))
+
+    def __repr__(self):
+        return '<ctx %r>' % self.name
+
+class ctxerror(Exception):
+    pass
+
+class raise_on_enter(contextmanager):
+    def __enter__(self):
+        self.trace(('raise', self.name))
+        raise ctxerror(self.name)
+
+class raise_on_exit(contextmanager):
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.trace(('raise', self.name))
+        raise ctxerror(self.name)
+
+def ctxmgr(name, trace):
+    return lambda: contextmanager(name, trace)
+
+class test_ctxmanager(unittest.TestCase):
+    def test_basics(self):
+        trace = []
+        addtrace = trace.append
+        with ctxmanager(ctxmgr('a', addtrace), ctxmgr('b', addtrace)) as c:
+            a, b = c()
+            c.atexit(addtrace, ('atexit', 'x'))
+            c.atexit(addtrace, ('atexit', 'y'))
+        self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'),
+                                 ('atexit', 'y'), ('atexit', 'x'),
+                                 ('exit', 'b'), ('exit', 'a')])
+
+    def test_raise_on_enter(self):
+        trace = []
+        addtrace = trace.append
+        with self.assertRaises(ctxerror):
+            with ctxmanager(ctxmgr('a', addtrace),
+                           lambda: raise_on_enter('b', addtrace)) as c:
+                c()
+                addtrace('unreachable')
+        self.assertEqual(trace, [('enter', 'a'), ('raise', 'b'), ('exit', 'a')])
+
+    def test_raise_on_exit(self):
+        trace = []
+        addtrace = trace.append
+        with self.assertRaises(ctxerror):
+            with ctxmanager(ctxmgr('a', addtrace),
+                           lambda: raise_on_exit('b', addtrace)) as c:
+                c()
+                addtrace('running')
+        self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'), 'running',
+                                 ('raise', 'b'), ('exit', 'a')])
+
+if __name__ == '__main__':
+    silenttestrunner.main(__name__)


More information about the Mercurial-devel mailing list