[PATCH] util: introduce ctxmanager, to avoid nested try/finally blocks

Augie Fackler raf at durin42.com
Mon Jan 11 20:51:29 CST 2016


On Mon, Jan 11, 2016 at 03:25:48PM -0800, Bryan O'Sullivan wrote:
> # HG changeset patch
> # User Bryan O'Sullivan <bos at serpentine.com>
> # Date 1452554743 28800
> #      Mon Jan 11 15:25:43 2016 -0800
> # Node ID 018c00fff7fb75891f8771815a865015424eedea
> # Parent  44c0f05935c2b2ad02352c34b6d670dd0b2e5e5b
> util: introduce ctxmanager, to avoid nested try/finally blocks

I'm an enormous fan. Queued with delight.

>
> This is similar in spirit to contextlib.nested in Python <= 2.6,
> but uses an extra level of indirection to avoid its inability to
> clean up if an __enter__ method raises an exception.
>
> Why add this mechanism?  It greatly simplifies scoped resource
> management, and lets us eliminate several hundred lines of try/finally
> blocks.  In many of these cases the "finally" is separated from the
> "try" by hundreds of lines of code, which makes the connection
> between resource acquisition and disposal difficult to follow.
>
> (The preferred mechanism would be the "multi-with" syntax of 2.7+,
> but Mercurial can't move to 2.7 for a while.)
>
> Intended use:
>
> >>> with ctxmanager(lambda: file('foo'), lambda: file('bar')) as c:
> >>>    f1, f2 = c()
>
> This will open both foo and bar when c() is invoked, and will close
> both upon exit from the block.  If the attempt to open bar raises
> an exception, the block will not be entered - but foo will still
> be closed.
>
> diff --git a/mercurial/util.py b/mercurial/util.py
> --- a/mercurial/util.py
> +++ b/mercurial/util.py
> @@ -2632,6 +2632,66 @@ def _makedecompressor(decompcls):
>          return chunkbuffer(generator(fh))
>      return func
>
> +class ctxmanager(object):
> +    '''A context manager for use in 'with' blocks to allow multiple
> +    contexts to be entered at once.  This is both safer and more
> +    flexible than contextlib.nested.
> +
> +    Once Mercurial supports Python 2.7+, this will become mostly
> +    unnecessary.
> +    '''
> +
> +    def __init__(self, *args):
> +        '''Accepts a list of no-argument functions that return context
> +        managers.  These will be invoked at __call__ time.'''
> +        self._pending = args
> +        self._atexit = []
> +
> +    def __enter__(self):
> +        return self
> +
> +    def __call__(self):
> +        '''Create and enter context managers in the order in which they were
> +        passed to the constructor.'''
> +        values = []
> +        for func in self._pending:
> +            obj = func()
> +            values.append(obj.__enter__())
> +            self._atexit.append(obj.__exit__)
> +        del self._pending
> +        return values
> +
> +    def atexit(self, func, *args, **kwargs):
> +        '''Add a function to call when this context manager exits.  The
> +        ordering of multiple atexit calls is unspecified, save that
> +        they will happen before any __exit__ functions.'''
> +        def wrapper(exc_type, exc_val, exc_tb):
> +            func(*args, **kwargs)
> +        self._atexit.append(wrapper)
> +        return func
> +
> +    def __exit__(self, exc_type, exc_val, exc_tb):
> +        '''Context managers are exited in the reverse order from which
> +        they were created.'''
> +        received = exc_type is not None
> +        suppressed = False
> +        pending = None
> +        self._atexit.reverse()
> +        for exitfunc in self._atexit:
> +            try:
> +                if exitfunc(exc_type, exc_val, exc_tb):
> +                    suppressed = True
> +                    exc_type = None
> +                    exc_val = None
> +                    exc_tb = None
> +            except BaseException, e:
> +                pending = sys.exc_info()
> +                exc_type, exc_val, exc_tb = pending = sys.exc_info()
> +        del self._atexit
> +        if pending:
> +            raise exc_val
> +        return received and suppressed
> +
>  def _bz2():
>      d = bz2.BZ2Decompressor()
>      # Bzip2 stream start with BZ, but we stripped it.
> diff --git a/tests/test-ctxmanager.py b/tests/test-ctxmanager.py
> new file mode 100644
> --- /dev/null
> +++ b/tests/test-ctxmanager.py
> @@ -0,0 +1,77 @@
> +from __future__ import absolute_import
> +
> +import silenttestrunner
> +import unittest
> +
> +from mercurial.util import ctxmanager
> +
> +class contextmanager(object):
> +    def __init__(self, name, trace):
> +        self.name = name
> +        self.entered = False
> +        self.exited = False
> +        self.trace = trace
> +
> +    def __enter__(self):
> +        self.entered = True
> +        self.trace(('enter', self.name))
> +        return self
> +
> +    def __exit__(self, exc_type, exc_val, exc_tb):
> +        self.exited = exc_type, exc_val, exc_tb
> +        self.trace(('exit', self.name))
> +
> +    def __repr__(self):
> +        return '<ctx %r>' % self.name
> +
> +class ctxerror(Exception):
> +    pass
> +
> +class raise_on_enter(contextmanager):
> +    def __enter__(self):
> +        self.trace(('raise', self.name))
> +        raise ctxerror(self.name)
> +
> +class raise_on_exit(contextmanager):
> +    def __exit__(self, exc_type, exc_val, exc_tb):
> +        self.trace(('raise', self.name))
> +        raise ctxerror(self.name)
> +
> +def ctxmgr(name, trace):
> +    return lambda: contextmanager(name, trace)
> +
> +class test_ctxmanager(unittest.TestCase):
> +    def test_basics(self):
> +        trace = []
> +        addtrace = trace.append
> +        with ctxmanager(ctxmgr('a', addtrace), ctxmgr('b', addtrace)) as c:
> +            a, b = c()
> +            c.atexit(addtrace, ('atexit', 'x'))
> +            c.atexit(addtrace, ('atexit', 'y'))
> +        self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'),
> +                                 ('atexit', 'y'), ('atexit', 'x'),
> +                                 ('exit', 'b'), ('exit', 'a')])
> +
> +    def test_raise_on_enter(self):
> +        trace = []
> +        addtrace = trace.append
> +        with self.assertRaises(ctxerror):
> +            with ctxmanager(ctxmgr('a', addtrace),
> +                           lambda: raise_on_enter('b', addtrace)) as c:
> +                c()
> +                addtrace('unreachable')
> +        self.assertEqual(trace, [('enter', 'a'), ('raise', 'b'), ('exit', 'a')])
> +
> +    def test_raise_on_exit(self):
> +        trace = []
> +        addtrace = trace.append
> +        with self.assertRaises(ctxerror):
> +            with ctxmanager(ctxmgr('a', addtrace),
> +                           lambda: raise_on_exit('b', addtrace)) as c:
> +                c()
> +                addtrace('running')
> +        self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'), 'running',
> +                                 ('raise', 'b'), ('exit', 'a')])
> +
> +if __name__ == '__main__':
> +    silenttestrunner.main(__name__)
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel at selenic.com
> https://selenic.com/mailman/listinfo/mercurial-devel


More information about the Mercurial-devel mailing list