[PATCH default] fsmonitor: fix for windows 7 and earlier not supporting GetOverlappedResultEx

Olivier Trempe oliviertrempe at gmail.com
Fri Jul 15 17:53:07 UTC 2016


# HG changeset patch
# User Olivier Trempe <otrempe>
# Date 1468268295 14400
#      Mon Jul 11 16:18:15 2016 -0400
# Node ID c1706a40f3a1be09ca10de2485b2340a3de8f331
# Parent  9d02bed8477bec7f679d6aeb5b1dd8bcdb80f64d
fsmonitor: fix for windows 7 and earlier not supporting GetOverlappedResultEx

diff -r 9d02bed8477b -r c1706a40f3a1 hgext/fsmonitor/pywatchman/__init__.py
--- a/hgext/fsmonitor/pywatchman/__init__.py	Tue Jul 12 22:26:04 2016 -0700
+++ b/hgext/fsmonitor/pywatchman/__init__.py	Mon Jul 11 16:18:15 2016 -0400
@@ -43,6 +43,7 @@
 import capabilities
 
 if os.name == 'nt':
+    import sys
     import ctypes
     import ctypes.wintypes
 
@@ -57,20 +58,33 @@
     FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
     WAIT_TIMEOUT = 0x00000102
     WAIT_OBJECT_0 = 0x00000000
-    ERROR_IO_PENDING = 997
+
+    # Overlapped I/O event is not in a signaled state. (996)
+    ERROR_IO_INCOMPLETE = 0x000003E4
+    # Overlapped I/O operation is in progress. (997)
+    ERROR_IO_PENDING = 0x000003E5
+
+
+    # The pointer size follows the architecture
+    # We use WPARAM since this type is already conditionally defined
+    ULONG_PTR = ctypes.wintypes.WPARAM
+
 
     class OVERLAPPED(ctypes.Structure):
         _fields_ = [
-            ("Internal", wintypes.ULONG), ("InternalHigh", wintypes.ULONG),
+            ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
             ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
             ("hEvent", wintypes.HANDLE)
         ]
 
         def __init__(self):
+            self.Internal = 0
+            self.InternalHigh = 0
             self.Offset = 0
             self.OffsetHigh = 0
             self.hEvent = 0
 
+
     LPDWORD = ctypes.POINTER(wintypes.DWORD)
 
     CreateFile = ctypes.windll.kernel32.CreateFileA
@@ -105,16 +119,128 @@
 
     LocalFree = ctypes.windll.kernel32.LocalFree
 
-    GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx
-    GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
-                                      ctypes.POINTER(OVERLAPPED), LPDWORD,
-                                      wintypes.DWORD, wintypes.BOOL]
-    GetOverlappedResultEx.restype = wintypes.BOOL
+    # Windows 7 and earlier does not support GetOverlappedResultEx
+    # The alternative is to use GetOverlappedResult and wait
+    # for read or write operation to complete. This is
+    # done be using CreateEvent and WaitForSingleObject.
+    # Windows 8 is version 6.2 - see the Operating System Version
+    # page on MSDN
+    WINDOWS_8_AND_LATER = sys.getwindowsversion() >= (6, 2)
+    if not WINDOWS_8_AND_LATER:
+        CreateEvent = ctypes.windll.kernel32.CreateEventA
+        CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
+                                wintypes.LPSTR]
+        CreateEvent.restype = wintypes.HANDLE
+
+        GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
+        GetOverlappedResult.argtypes = [wintypes.HANDLE,
+                                        ctypes.POINTER(OVERLAPPED), LPDWORD,
+                                        wintypes.BOOL]
+        GetOverlappedResult.restype = wintypes.BOOL
+
+        WaitForSingleObject = ctypes.windll.kernel32.WaitForSingleObject
+        WaitForSingleObject.argtypes = [wintypes.HANDLE, wintypes.DWORD]
+        WaitForSingleObject.restype = wintypes.DWORD
+    else:
+        GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx
+        GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
+                                          ctypes.POINTER(OVERLAPPED), LPDWORD,
+                                          wintypes.DWORD, wintypes.BOOL]
+        GetOverlappedResultEx.restype = wintypes.BOOL
 
     CancelIoEx = ctypes.windll.kernel32.CancelIoEx
     CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
     CancelIoEx.restype = wintypes.BOOL
 
+
+    class OverlappedStructWrapper(object):
+        """We act differently on WindowsNamedPipe operations depending on the
+           os version. In Windows 7 and earlier, we have to wait explicitly for
+           the asynch readFile or writeFile to complete before retrieving the
+           result using GetOverlappedResult. The os takes care of everything in
+           Windows 8 and later using GetOverlappedResultEx with a timeout.
+        """
+
+        def __init__(self, _raisewinerrorfct):
+            self._raisewinerror = _raisewinerrorfct  # callback
+            self.olap = OVERLAPPED()  # instance of an OVERLAPPED struct
+
+        def __enter__(self):
+            # Zero mem on the OVERLAPPED struct.
+            ctypes.memset(ctypes.addressof(self.olap), 0,
+                                           ctypes.sizeof(self.olap))
+
+            if not WINDOWS_8_AND_LATER:
+                self.olap.hEvent = CreateEvent(None, True, False, None)
+
+                if self.olap.hEvent is None:
+                    self._raisewinerror('CreateEvent failed', GetLastError())
+
+            return self
+
+        def __exit__(self, exc_type, exc_val, exc_tb):
+            if self.olap.hEvent is not None:
+                # We release the handle for the event
+                CloseHandle(self.olap.hEvent)
+
+        def GetOverlappedResult_(self, immediate, timeout, pipe, nbytes):
+            """ Called next to asynchronous ReadFile or WriteFile. We therefore
+                expect a pending IO operation, other cases are handled as
+                error.
+            """
+            if not WINDOWS_8_AND_LATER and not immediate:
+                if GetLastError() == ERROR_IO_PENDING:
+                    waitErr = WaitForSingleObject(self.olap.hEvent, timeout)
+                    if waitErr != WAIT_OBJECT_0:
+                        self._raisewinerror('WaitForSingleObject failed with'
+                                            ' code %d' % waitErr,
+                                            GetLastError())
+
+            getOverlappedResultSucceed = wintypes.BOOL()
+            if WINDOWS_8_AND_LATER:
+                dwms = 0 if immediate else timeout
+                getOverlappedResultSucceed = GetOverlappedResultEx(pipe,
+                                                                   self.olap,
+                                                                   nbytes,
+                                                                   dwms,
+                                                                   True)
+            else:
+                getOverlappedResultSucceed = GetOverlappedResult(pipe,
+                                                                 self.olap,
+                                                                 nbytes,
+                                                                 False)
+
+            if not getOverlappedResultSucceed:
+                getOverlappedErr = GetLastError()
+
+                # It's potentially unsafe to allow the write to continue after
+                # we unwind, so let's make a best effort to avoid that
+                # happening.
+                # We cancel the operation (read or write) on any fail
+                if not CancelIoEx(pipe, self.olap):
+                    self._raisewinerror('CancelIoEx failed with error %d',
+                                        GetLastError())
+
+                if WINDOWS_8_AND_LATER:
+                    if getOverlappedErr == WAIT_TIMEOUT:
+                        log('GetOverlappedResultEx timedout')
+                        raise SocketTimeout('timed out after waiting %dms for'
+                                            ' read' % timeout)
+                    log('GetOverlappedResultEx reports error %d',
+                        getOverlappedErr)
+                else:
+                    if getOverlappedErr == ERROR_IO_INCOMPLETE:
+                        self._raisewinerror('GetOverlappedResult: IO operation'
+                                            'still pending.', getOverlappedErr)
+                    log('GetOverlappedResult reports error %d',
+                        getOverlappedErr)
+
+                self._raisewinerror('error while waiting for read or write',
+                                    getOverlappedErr)
+                return False
+
+            return True
+
 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
 sniff_len = 13
 
@@ -281,7 +407,7 @@
 
         if self.pipe == INVALID_HANDLE_VALUE:
             self.pipe = None
-            self._raise_win_err('failed to open pipe %s' % sockpath,
+            self._raisewinerror('failed to open pipe %s' % sockpath,
                                 GetLastError())
 
     def _win32_strerror(self, err):
@@ -297,7 +423,7 @@
         finally:
             LocalFree(buf)
 
-    def _raise_win_err(self, msg, err):
+    def _raisewinerror(self, msg, err):
         raise IOError('%s win32 error code: %d %s' %
                       (msg, err, self._win32_strerror(err)))
 
@@ -324,48 +450,38 @@
 
         # We need to initiate a read
         buf = ctypes.create_string_buffer(size)
-        olap = OVERLAPPED()
-
-        log('made read buff of size %d', size)
 
-        # ReadFile docs warn against sending in the nread parameter for async
-        # operations, so we always collect it via GetOverlappedResultEx
-        immediate = ReadFile(self.pipe, buf, size, None, olap)
+        with OverlappedStructWrapper(self._raisewinerror) as olapwrapper:
+            log('made read buff of size %d', size)
 
-        if not immediate:
-            err = GetLastError()
-            if err != ERROR_IO_PENDING:
-                self._raise_win_err('failed to read %d bytes' % size,
+            # ReadFile docs warn against sending in the nbytes parameter for
+            # async operations, so we always collect it via
+            # GetOverlappedResultEx
+            immediate = ReadFile(self.pipe, buf, size, None, olapwrapper.olap)
+
+            if (WINDOWS_8_AND_LATER and not immediate
+                and GetLastError() != ERROR_IO_PENDING):
+                self._raisewinerror('failed to read %d bytes' % size,
                                     GetLastError())
 
-        nread = wintypes.DWORD()
-        if not GetOverlappedResultEx(self.pipe, olap, nread,
-                                     0 if immediate else self.timeout, True):
-            err = GetLastError()
-            CancelIoEx(self.pipe, olap)
-
-            if err == WAIT_TIMEOUT:
-                log('GetOverlappedResultEx timedout')
-                raise SocketTimeout('timed out after waiting %dms for read' %
-                                    self.timeout)
+            nbytes = wintypes.DWORD()
+            olapwrapper.GetOverlappedResult_(immediate, self.timeout,
+                                             self.pipe, nbytes)
 
-            log('GetOverlappedResultEx reports error %d', err)
-            self._raise_win_err('error while waiting for read', err)
-
-        nread = nread.value
-        if nread == 0:
-            # Docs say that named pipes return 0 byte when the other end did
-            # a zero byte write.  Since we don't ever do that, the only
-            # other way this shows up is if the client has gotten in a weird
-            # state, so let's bail out
-            CancelIoEx(self.pipe, olap)
-            raise IOError('Async read yielded 0 bytes; unpossible!')
+            nbytes = nbytes.value
+            if nbytes == 0:
+                # Docs say that named pipes return 0 byte when the other end did
+                # a zero byte write.  Since we don't ever do that, the only
+                # other way this shows up is if the client has gotten in a weird
+                # state, so let's bail out
+                CancelIoEx(self.pipe, olapwrapper.olap)
+                raise IOError('Async read yielded 0 bytes; unpossible!')
 
         # Holds precisely the bytes that we read from the prior request
-        buf = buf[:nread]
+        buf = buf[:nbytes]
 
-        returned_size = min(nread, size)
-        if returned_size == nread:
+        returned_size = min(nbytes, size)
+        if returned_size == nbytes:
             return buf
 
         # keep any left-overs around for a later read to consume
@@ -373,33 +489,23 @@
         return buf[:returned_size]
 
     def write(self, data):
-        olap = OVERLAPPED()
-        immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
-                              None, olap)
-
-        if not immediate:
-            err = GetLastError()
-            if err != ERROR_IO_PENDING:
-                self._raise_win_err('failed to write %d bytes' % len(data),
-                                    GetLastError())
+        with OverlappedStructWrapper(self._raisewinerror) as olapwrapper:
+            immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
+                                  None, olapwrapper.olap)
 
-        # Obtain results, waiting if needed
-        nwrote = wintypes.DWORD()
-        if GetOverlappedResultEx(self.pipe, olap, nwrote, 0 if immediate else
-                                 self.timeout, True):
-            return nwrote.value
-
-        err = GetLastError()
+            if (WINDOWS_8_AND_LATER and not immediate
+                and GetLastError() != ERROR_IO_PENDING):
+                self._raisewinerror('failed to write %d bytes'
+                                    % len(data, GetLastError()))
 
-        # It's potentially unsafe to allow the write to continue after
-        # we unwind, so let's make a best effort to avoid that happening
-        CancelIoEx(self.pipe, olap)
+            # Obtain results, waiting if needed
+            nbytes = wintypes.DWORD()
+            if olapwrapper.GetOverlappedResult_(immediate, self.timeout,
+                                                self.pipe, nbytes):
+                return nbytes.value
 
-        if err == WAIT_TIMEOUT:
-            raise SocketTimeout('timed out after waiting %dms for write' %
-                                self.timeout)
-        self._raise_win_err('error while waiting for write of %d bytes' %
-                            len(data), err)
+        self._raisewinerror('error while waiting for write of %d bytes' %
+                            len(data), GetLastError())
 
 
 class CLIProcessTransport(Transport):


More information about the Mercurial-devel mailing list