[PATCH 1 of 3 RFC V2] scmutil: support background file closing

Gregory Szorc gregory.szorc at gmail.com
Fri Jan 8 18:39:22 UTC 2016


# HG changeset patch
# User Gregory Szorc <gregory.szorc at gmail.com>
# Date 1452278344 28800
#      Fri Jan 08 10:39:04 2016 -0800
# Node ID e687c551d27e079e7146aaa2e56df0b07d4f9e47
# Parent  667ae51eaa4cf74ab456565a57d32d434349a3f4
scmutil: support background file closing

Closing files that have been appended to is relatively slow on
Windows/NTFS. This makes several Mercurial operations slower on
Windows.

The workaround to this issue is conceptually simple: use multiple
threads for I/O. Unfortunately, Python doesn't scale well to multiple
threads because of the GIL. And, refactoring our code to use threads
everywhere would be a huge undertaking. So, we decide to tackle this
problem by starting small: establishing a thread pool for closing
files.

This patch establishes a mechanism for closing file handles on separate
threads. The coordinator object is basically a queue of file handles to
operate on and a thread pool consuming from the queue.

When files are opened through the VFS layer, the caller can specify
that delay closing is allowed.

A proxy class for file handles has been added. We must use a proxy
because it isn't possible to modify __class__ on built-in types. This
adds some overhead. But as future patches will show, this overhead
is cancelled out by the benefit of closing file handles on background
threads.

diff --git a/mercurial/scmutil.py b/mercurial/scmutil.py
--- a/mercurial/scmutil.py
+++ b/mercurial/scmutil.py
@@ -2,23 +2,26 @@
 #
 #  Copyright Matt Mackall <mpm at selenic.com>
 #
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
 from __future__ import absolute_import
 
+import Queue
+import contextlib
 import errno
 import glob
 import os
 import re
 import shutil
 import stat
 import tempfile
+import threading
 
 from .i18n import _
 from .node import wdirrev
 from . import (
     encoding,
     error,
     match as matchmod,
     osutil,
@@ -249,25 +252,26 @@ class abstractvfs(object):
         try:
             return self.readlines(path, mode=mode)
         except IOError as inst:
             if inst.errno != errno.ENOENT:
                 raise
         return []
 
     def open(self, path, mode="r", text=False, atomictemp=False,
-             notindexed=False):
+             notindexed=False, backgroundclose=False):
         '''Open ``path`` file, which is relative to vfs root.
 
         Newly created directories are marked as "not to be indexed by
         the content indexing service", if ``notindexed`` is specified
         for "write" mode access.
         '''
         self.open = self.__call__
-        return self.__call__(path, mode, text, atomictemp, notindexed)
+        return self.__call__(path, mode, text, atomictemp, notindexed,
+                             backgroundclose=backgroundclose)
 
     def read(self, path):
         with self(path, 'rb') as fp:
             return fp.read()
 
     def readlines(self, path, mode='rb'):
         with self(path, mode=mode) as fp:
             return fp.readlines()
@@ -431,16 +435,37 @@ class abstractvfs(object):
         """
         root = os.path.normpath(self.join(None))
         # when dirpath == root, dirpath[prefixlen:] becomes empty
         # because len(dirpath) < prefixlen.
         prefixlen = len(pathutil.normasprefix(root))
         for dirpath, dirs, files in os.walk(self.join(path), onerror=onerror):
             yield (dirpath[prefixlen:], dirs, files)
 
+    @contextlib.contextmanager
+    def backgroundclosing(self, ui):
+        """Allow files to be closed asynchronously.
+
+        When this context manager is active, ``backgroundclose`` can be passed
+        to ``__call__``/``open`` to result in the file possibly being closed
+        asynchronously, on a background thread.
+        """
+        # This is an arbitrary restriction and could be changed if we ever
+        # have a use case.
+        vfs = getattr(self, 'vfs', self)
+        if getattr(vfs, '_backgroundfilecloser', None):
+            raise Exception('can only have 1 activate background file closer')
+
+        with backgroundfilecloser(ui) as bfc:
+            try:
+                vfs._backgroundfilecloser = bfc
+                yield bfc
+            finally:
+                vfs._backgroundfilecloser = None
+
 class vfs(abstractvfs):
     '''Operate files relative to a base directory
 
     This class is used to hide the details of COW semantics and
     remote file access from higher level code.
     '''
     def __init__(self, base, audit=True, expandpath=False, realpath=False):
         if expandpath:
@@ -473,22 +498,35 @@ class vfs(abstractvfs):
         return util.checkexec(self.base)
 
     def _fixfilemode(self, name):
         if self.createmode is None or not self._chmod:
             return
         os.chmod(name, self.createmode & 0o666)
 
     def __call__(self, path, mode="r", text=False, atomictemp=False,
-                 notindexed=False):
+                 notindexed=False, backgroundclose=False):
         '''Open ``path`` file, which is relative to vfs root.
 
         Newly created directories are marked as "not to be indexed by
         the content indexing service", if ``notindexed`` is specified
         for "write" mode access.
+
+        If ``backgroundclose`` is passed, the file may be closed asynchronously.
+        It can only be used if the ``self.backgroundclosing()`` context manager
+        is active. This should only be specified if the following criteria hold:
+
+        1. There is a potential for writing thousands of files. Unless you
+           are writing thousands of files, the performance benefits of
+           asynchronously closing files is not realized.
+        2. Files are opened exactly once for the ``backgroundclosing``
+           active duration and are therefore free of race conditions between
+           closing a file on a background thread and reopening it. (If the
+           file were opened multiple times, there could be unflushed data
+           because the original file handle hasn't been flushed/closed yet.)
         '''
         if self._audit:
             r = util.checkosfilename(path)
             if r:
                 raise error.Abort("%s: %r" % (r, path))
         self.audit(path)
         f = self.join(path)
 
@@ -523,16 +561,24 @@ class vfs(abstractvfs):
                 if nlink > 0:
                     if self._trustnlink is None:
                         self._trustnlink = nlink > 1 or util.checknlink(f)
                     if nlink > 1 or not self._trustnlink:
                         util.rename(util.mktempcopy(f), f)
         fp = util.posixfile(f, mode)
         if nlink == 0:
             self._fixfilemode(f)
+
+        if backgroundclose:
+            if not self._backgroundfilecloser:
+                raise Exception('backgroundclose can only be used when a '
+                                'backgroundclosing context manager is active')
+
+            fp = delayclosedfile(fp, self._backgroundfilecloser)
+
         return fp
 
     def symlink(self, src, dst):
         self.audit(dst)
         linkname = self.join(dst)
         try:
             os.unlink(linkname)
         except OSError:
@@ -1209,8 +1255,96 @@ def gdinitconfig(ui):
     return (ui.configbool('format', 'generaldelta', False)
             or ui.configbool('format', 'usegeneraldelta', True))
 
 def gddeltaconfig(ui):
     """helper function to know if incoming delta should be optimised
     """
     # experimental config: format.generaldelta
     return ui.configbool('format', 'generaldelta', False)
+
+class delayclosedfile(object):
+    """Proxy for a file object whose close is delayed.
+
+    Do not instantiate outside of the vfs layer.
+    """
+
+    def __init__(self, fh, closer):
+        object.__setattr__(self, '_origfh', fh)
+        object.__setattr__(self, '_closer', closer)
+
+    def __getattr__(self, attr):
+        return getattr(self._origfh, attr)
+
+    def __setattr__(self, attr, value):
+        return setattr(self._origfh, attr, value)
+
+    def __delattr__(self, attr):
+        return delattr(self._origfh, attr)
+
+    def __enter__(self):
+        return self._origfh.__enter__()
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        self._closer.close(self._origfh)
+
+class backgroundfilecloser(object):
+    """Coordinates background closing of file handles on multiple threads."""
+    def __init__(self, ui):
+        self._running = False
+        self._entered = False
+        self._threads = []
+
+        # Only Windows/NTFS has slow file closing. So only enable by default
+        # on that platform. But allow to be enabled elsewhere for testing.
+        # Windows defaults to a limit of 512 open files. A buffer of 128
+        # should give us enough headway.
+        defaultenabled = os.name == 'nt'
+        enabled = ui.configbool('worker', 'backgroundfileclose', defaultenabled)
+
+        if not enabled:
+            return
+
+        maxqueue = ui.configint('worker', 'backgroundclosemaxqueue', 384)
+        threadcount = ui.configint('worker', 'backgroundclosethreads', 4)
+
+        self._queue = Queue.Queue(maxsize=maxqueue)
+        self._running = True
+
+        for i in range(threadcount):
+            t = threading.Thread(target=self._worker, name='backgroundcloser')
+            self._threads.append(t)
+            t.start()
+
+    def __enter__(self):
+        self._entered = True
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        self._running = False
+
+    def _worker(self):
+        """Main routine for worker thread."""
+        while True:
+            try:
+                fh = self._queue.get(block=True, timeout=0.100)
+                try:
+                    fh.close()
+                except Exception:
+                    # Need to catch or the thread will terminate and
+                    # we could orphan file descriptors.
+                    pass
+            except Queue.Empty:
+                if not self._running:
+                    break
+
+    def close(self, fh):
+        """Schedule a file for closing."""
+        if not self._entered:
+            raise Exception('can only call close() when context manager active')
+
+        # If we're not actively running, close synchronously.
+        if not self._running:
+            fh.close()
+            return
+
+        self._queue.put(fh, block=True, timeout=None)
+


More information about the Mercurial-devel mailing list