[PATCH] util: introduce ctxmanager, to avoid nested try/finally blocks
Augie Fackler
raf at durin42.com
Mon Jan 11 20:51:29 CST 2016
On Mon, Jan 11, 2016 at 03:25:48PM -0800, Bryan O'Sullivan wrote:
> # HG changeset patch
> # User Bryan O'Sullivan <bos at serpentine.com>
> # Date 1452554743 28800
> # Mon Jan 11 15:25:43 2016 -0800
> # Node ID 018c00fff7fb75891f8771815a865015424eedea
> # Parent 44c0f05935c2b2ad02352c34b6d670dd0b2e5e5b
> util: introduce ctxmanager, to avoid nested try/finally blocks
I'm an enormous fan. Queued with delight.
>
> This is similar in spirit to contextlib.nested in Python <= 2.6,
> but uses an extra level of indirection to avoid its inability to
> clean up if an __enter__ method raises an exception.
>
> Why add this mechanism? It greatly simplifies scoped resource
> management, and lets us eliminate several hundred lines of try/finally
> blocks. In many of these cases the "finally" is separated from the
> "try" by hundreds of lines of code, which makes the connection
> between resource acquisition and disposal difficult to follow.
>
> (The preferred mechanism would be the "multi-with" syntax of 2.7+,
> but Mercurial can't move to 2.7 for a while.)
>
> Intended use:
>
> >>> with ctxmanager(lambda: file('foo'), lambda: file('bar')) as c:
> >>> f1, f2 = c()
>
> This will open both foo and bar when c() is invoked, and will close
> both upon exit from the block. If the attempt to open bar raises
> an exception, the block will not be entered - but foo will still
> be closed.
>
> diff --git a/mercurial/util.py b/mercurial/util.py
> --- a/mercurial/util.py
> +++ b/mercurial/util.py
> @@ -2632,6 +2632,66 @@ def _makedecompressor(decompcls):
> return chunkbuffer(generator(fh))
> return func
>
> +class ctxmanager(object):
> + '''A context manager for use in 'with' blocks to allow multiple
> + contexts to be entered at once. This is both safer and more
> + flexible than contextlib.nested.
> +
> + Once Mercurial supports Python 2.7+, this will become mostly
> + unnecessary.
> + '''
> +
> + def __init__(self, *args):
> + '''Accepts a list of no-argument functions that return context
> + managers. These will be invoked at __call__ time.'''
> + self._pending = args
> + self._atexit = []
> +
> + def __enter__(self):
> + return self
> +
> + def __call__(self):
> + '''Create and enter context managers in the order in which they were
> + passed to the constructor.'''
> + values = []
> + for func in self._pending:
> + obj = func()
> + values.append(obj.__enter__())
> + self._atexit.append(obj.__exit__)
> + del self._pending
> + return values
> +
> + def atexit(self, func, *args, **kwargs):
> + '''Add a function to call when this context manager exits. The
> + ordering of multiple atexit calls is unspecified, save that
> + they will happen before any __exit__ functions.'''
> + def wrapper(exc_type, exc_val, exc_tb):
> + func(*args, **kwargs)
> + self._atexit.append(wrapper)
> + return func
> +
> + def __exit__(self, exc_type, exc_val, exc_tb):
> + '''Context managers are exited in the reverse order from which
> + they were created.'''
> + received = exc_type is not None
> + suppressed = False
> + pending = None
> + self._atexit.reverse()
> + for exitfunc in self._atexit:
> + try:
> + if exitfunc(exc_type, exc_val, exc_tb):
> + suppressed = True
> + exc_type = None
> + exc_val = None
> + exc_tb = None
> + except BaseException, e:
> + pending = sys.exc_info()
> + exc_type, exc_val, exc_tb = pending = sys.exc_info()
> + del self._atexit
> + if pending:
> + raise exc_val
> + return received and suppressed
> +
> def _bz2():
> d = bz2.BZ2Decompressor()
> # Bzip2 stream start with BZ, but we stripped it.
> diff --git a/tests/test-ctxmanager.py b/tests/test-ctxmanager.py
> new file mode 100644
> --- /dev/null
> +++ b/tests/test-ctxmanager.py
> @@ -0,0 +1,77 @@
> +from __future__ import absolute_import
> +
> +import silenttestrunner
> +import unittest
> +
> +from mercurial.util import ctxmanager
> +
> +class contextmanager(object):
> + def __init__(self, name, trace):
> + self.name = name
> + self.entered = False
> + self.exited = False
> + self.trace = trace
> +
> + def __enter__(self):
> + self.entered = True
> + self.trace(('enter', self.name))
> + return self
> +
> + def __exit__(self, exc_type, exc_val, exc_tb):
> + self.exited = exc_type, exc_val, exc_tb
> + self.trace(('exit', self.name))
> +
> + def __repr__(self):
> + return '<ctx %r>' % self.name
> +
> +class ctxerror(Exception):
> + pass
> +
> +class raise_on_enter(contextmanager):
> + def __enter__(self):
> + self.trace(('raise', self.name))
> + raise ctxerror(self.name)
> +
> +class raise_on_exit(contextmanager):
> + def __exit__(self, exc_type, exc_val, exc_tb):
> + self.trace(('raise', self.name))
> + raise ctxerror(self.name)
> +
> +def ctxmgr(name, trace):
> + return lambda: contextmanager(name, trace)
> +
> +class test_ctxmanager(unittest.TestCase):
> + def test_basics(self):
> + trace = []
> + addtrace = trace.append
> + with ctxmanager(ctxmgr('a', addtrace), ctxmgr('b', addtrace)) as c:
> + a, b = c()
> + c.atexit(addtrace, ('atexit', 'x'))
> + c.atexit(addtrace, ('atexit', 'y'))
> + self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'),
> + ('atexit', 'y'), ('atexit', 'x'),
> + ('exit', 'b'), ('exit', 'a')])
> +
> + def test_raise_on_enter(self):
> + trace = []
> + addtrace = trace.append
> + with self.assertRaises(ctxerror):
> + with ctxmanager(ctxmgr('a', addtrace),
> + lambda: raise_on_enter('b', addtrace)) as c:
> + c()
> + addtrace('unreachable')
> + self.assertEqual(trace, [('enter', 'a'), ('raise', 'b'), ('exit', 'a')])
> +
> + def test_raise_on_exit(self):
> + trace = []
> + addtrace = trace.append
> + with self.assertRaises(ctxerror):
> + with ctxmanager(ctxmgr('a', addtrace),
> + lambda: raise_on_exit('b', addtrace)) as c:
> + c()
> + addtrace('running')
> + self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'), 'running',
> + ('raise', 'b'), ('exit', 'a')])
> +
> +if __name__ == '__main__':
> + silenttestrunner.main(__name__)
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel at selenic.com
> https://selenic.com/mailman/listinfo/mercurial-devel
More information about the Mercurial-devel
mailing list