D7201: fsmonitor: refresh pywatchman with upstream

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Mon Nov 4 11:27:33 EST 2019


Closed by commit rHG6469c23a40a2: fsmonitor: refresh pywatchman with upstream (authored by indygreg).
This revision was automatically updated to reflect the committed changes.
This revision was not accepted when it landed; it landed in state "Needs Review".

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D7201?vs=17499&id=17517

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D7201/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D7201

AFFECTED FILES
  hgext/fsmonitor/pywatchman/__init__.py
  hgext/fsmonitor/pywatchman/bser.c
  hgext/fsmonitor/pywatchman/capabilities.py
  hgext/fsmonitor/pywatchman/compat.py
  hgext/fsmonitor/pywatchman/encoding.py
  hgext/fsmonitor/pywatchman/load.py
  hgext/fsmonitor/pywatchman/pybser.py
  hgext/fsmonitor/watchmanclient.py

CHANGE DETAILS

diff --git a/hgext/fsmonitor/watchmanclient.py b/hgext/fsmonitor/watchmanclient.py
--- a/hgext/fsmonitor/watchmanclient.py
+++ b/hgext/fsmonitor/watchmanclient.py
@@ -10,6 +10,7 @@
 import getpass
 
 from mercurial import util
+from mercurial.utils import procutil
 
 from . import pywatchman
 
@@ -92,7 +93,7 @@
                 self._watchmanclient = pywatchman.client(
                     timeout=self._timeout,
                     useImmutableBser=True,
-                    watchman_exe=watchman_exe,
+                    binpath=procutil.tonativestr(watchman_exe),
                 )
             return self._watchmanclient.query(*watchmanargs)
         except pywatchman.CommandError as ex:
diff --git a/hgext/fsmonitor/pywatchman/pybser.py b/hgext/fsmonitor/pywatchman/pybser.py
--- a/hgext/fsmonitor/pywatchman/pybser.py
+++ b/hgext/fsmonitor/pywatchman/pybser.py
@@ -26,10 +26,8 @@
 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
 # no unicode literals
+from __future__ import absolute_import, division, print_function
 
 import binascii
 import collections
@@ -37,30 +35,31 @@
 import struct
 import sys
 
-from . import (
-    compat,
-)
+from . import compat
+
 
-BSER_ARRAY = b'\x00'
-BSER_OBJECT = b'\x01'
-BSER_BYTESTRING = b'\x02'
-BSER_INT8 = b'\x03'
-BSER_INT16 = b'\x04'
-BSER_INT32 = b'\x05'
-BSER_INT64 = b'\x06'
-BSER_REAL = b'\x07'
-BSER_TRUE = b'\x08'
-BSER_FALSE = b'\x09'
-BSER_NULL = b'\x0a'
-BSER_TEMPLATE = b'\x0b'
-BSER_SKIP = b'\x0c'
-BSER_UTF8STRING = b'\x0d'
+BSER_ARRAY = b"\x00"
+BSER_OBJECT = b"\x01"
+BSER_BYTESTRING = b"\x02"
+BSER_INT8 = b"\x03"
+BSER_INT16 = b"\x04"
+BSER_INT32 = b"\x05"
+BSER_INT64 = b"\x06"
+BSER_REAL = b"\x07"
+BSER_TRUE = b"\x08"
+BSER_FALSE = b"\x09"
+BSER_NULL = b"\x0a"
+BSER_TEMPLATE = b"\x0b"
+BSER_SKIP = b"\x0c"
+BSER_UTF8STRING = b"\x0d"
 
 if compat.PYTHON3:
     STRING_TYPES = (str, bytes)
     unicode = str
+
     def tobytes(i):
-        return str(i).encode('ascii')
+        return str(i).encode("ascii")
+
     long = int
 else:
     STRING_TYPES = (unicode, str)
@@ -72,6 +71,7 @@
 EMPTY_HEADER = b"\x00\x01\x05\x00\x00\x00\x00"
 EMPTY_HEADER_V2 = b"\x00\x02\x00\x00\x00\x00\x05\x00\x00\x00\x00"
 
+
 def _int_size(x):
     """Return the smallest size int that can store the value"""
     if -0x80 <= x <= 0x7F:
@@ -83,29 +83,34 @@
     elif long(-0x8000000000000000) <= x <= long(0x7FFFFFFFFFFFFFFF):
         return 8
     else:
-        raise RuntimeError('Cannot represent value: ' + str(x))
+        raise RuntimeError("Cannot represent value: " + str(x))
+
 
 def _buf_pos(buf, pos):
     ret = buf[pos]
-    # In Python 2, buf is a str array so buf[pos] is a string. In Python 3, buf
-    # is a bytes array and buf[pos] is an integer.
-    if compat.PYTHON3:
+    # Normalize the return type to bytes
+    if compat.PYTHON3 and not isinstance(ret, bytes):
         ret = bytes((ret,))
     return ret
 
+
 class _bser_buffer(object):
-
     def __init__(self, version):
         self.bser_version = version
         self.buf = ctypes.create_string_buffer(8192)
         if self.bser_version == 1:
-            struct.pack_into(tobytes(len(EMPTY_HEADER)) + b's', self.buf, 0,
-                             EMPTY_HEADER)
+            struct.pack_into(
+                tobytes(len(EMPTY_HEADER)) + b"s", self.buf, 0, EMPTY_HEADER
+            )
             self.wpos = len(EMPTY_HEADER)
         else:
             assert self.bser_version == 2
-            struct.pack_into(tobytes(len(EMPTY_HEADER_V2)) + b's', self.buf, 0,
-                             EMPTY_HEADER_V2)
+            struct.pack_into(
+                tobytes(len(EMPTY_HEADER_V2)) + b"s",
+                self.buf,
+                0,
+                EMPTY_HEADER_V2,
+            )
             self.wpos = len(EMPTY_HEADER_V2)
 
     def ensure_size(self, size):
@@ -117,42 +122,68 @@
         to_write = size + 1
         self.ensure_size(to_write)
         if size == 1:
-            struct.pack_into(b'=cb', self.buf, self.wpos, BSER_INT8, val)
+            struct.pack_into(b"=cb", self.buf, self.wpos, BSER_INT8, val)
         elif size == 2:
-            struct.pack_into(b'=ch', self.buf, self.wpos, BSER_INT16, val)
+            struct.pack_into(b"=ch", self.buf, self.wpos, BSER_INT16, val)
         elif size == 4:
-            struct.pack_into(b'=ci', self.buf, self.wpos, BSER_INT32, val)
+            struct.pack_into(b"=ci", self.buf, self.wpos, BSER_INT32, val)
         elif size == 8:
-            struct.pack_into(b'=cq', self.buf, self.wpos, BSER_INT64, val)
+            struct.pack_into(b"=cq", self.buf, self.wpos, BSER_INT64, val)
         else:
-            raise RuntimeError('Cannot represent this long value')
+            raise RuntimeError("Cannot represent this long value")
         self.wpos += to_write
 
-
     def append_string(self, s):
         if isinstance(s, unicode):
-            s = s.encode('utf-8')
+            s = s.encode("utf-8")
         s_len = len(s)
         size = _int_size(s_len)
         to_write = 2 + size + s_len
         self.ensure_size(to_write)
         if size == 1:
-            struct.pack_into(b'=ccb' + tobytes(s_len) + b's', self.buf,
-                self.wpos, BSER_BYTESTRING, BSER_INT8, s_len, s)
+            struct.pack_into(
+                b"=ccb" + tobytes(s_len) + b"s",
+                self.buf,
+                self.wpos,
+                BSER_BYTESTRING,
+                BSER_INT8,
+                s_len,
+                s,
+            )
         elif size == 2:
-            struct.pack_into(b'=cch' + tobytes(s_len) + b's', self.buf,
-                self.wpos, BSER_BYTESTRING, BSER_INT16, s_len, s)
+            struct.pack_into(
+                b"=cch" + tobytes(s_len) + b"s",
+                self.buf,
+                self.wpos,
+                BSER_BYTESTRING,
+                BSER_INT16,
+                s_len,
+                s,
+            )
         elif size == 4:
-            struct.pack_into(b'=cci' + tobytes(s_len) + b's', self.buf,
-                self.wpos, BSER_BYTESTRING, BSER_INT32, s_len, s)
+            struct.pack_into(
+                b"=cci" + tobytes(s_len) + b"s",
+                self.buf,
+                self.wpos,
+                BSER_BYTESTRING,
+                BSER_INT32,
+                s_len,
+                s,
+            )
         elif size == 8:
-            struct.pack_into(b'=ccq' + tobytes(s_len) + b's', self.buf,
-                self.wpos, BSER_BYTESTRING, BSER_INT64, s_len, s)
+            struct.pack_into(
+                b"=ccq" + tobytes(s_len) + b"s",
+                self.buf,
+                self.wpos,
+                BSER_BYTESTRING,
+                BSER_INT64,
+                s_len,
+                s,
+            )
         else:
-            raise RuntimeError('Cannot represent this string value')
+            raise RuntimeError("Cannot represent this string value")
         self.wpos += to_write
 
-
     def append_recursive(self, val):
         if isinstance(val, bool):
             needed = 1
@@ -161,12 +192,12 @@
                 to_encode = BSER_TRUE
             else:
                 to_encode = BSER_FALSE
-            struct.pack_into(b'=c', self.buf, self.wpos, to_encode)
+            struct.pack_into(b"=c", self.buf, self.wpos, to_encode)
             self.wpos += needed
         elif val is None:
             needed = 1
             self.ensure_size(needed)
-            struct.pack_into(b'=c', self.buf, self.wpos, BSER_NULL)
+            struct.pack_into(b"=c", self.buf, self.wpos, BSER_NULL)
             self.wpos += needed
         elif isinstance(val, (int, long)):
             self.append_long(val)
@@ -175,61 +206,106 @@
         elif isinstance(val, float):
             needed = 9
             self.ensure_size(needed)
-            struct.pack_into(b'=cd', self.buf, self.wpos, BSER_REAL, val)
+            struct.pack_into(b"=cd", self.buf, self.wpos, BSER_REAL, val)
             self.wpos += needed
-        elif isinstance(val, collections.Mapping) and \
-            isinstance(val, collections.Sized):
+        elif isinstance(val, collections.Mapping) and isinstance(
+            val, collections.Sized
+        ):
             val_len = len(val)
             size = _int_size(val_len)
             needed = 2 + size
             self.ensure_size(needed)
             if size == 1:
-                struct.pack_into(b'=ccb', self.buf, self.wpos, BSER_OBJECT,
-                    BSER_INT8, val_len)
+                struct.pack_into(
+                    b"=ccb",
+                    self.buf,
+                    self.wpos,
+                    BSER_OBJECT,
+                    BSER_INT8,
+                    val_len,
+                )
             elif size == 2:
-                struct.pack_into(b'=cch', self.buf, self.wpos, BSER_OBJECT,
-                    BSER_INT16, val_len)
+                struct.pack_into(
+                    b"=cch",
+                    self.buf,
+                    self.wpos,
+                    BSER_OBJECT,
+                    BSER_INT16,
+                    val_len,
+                )
             elif size == 4:
-                struct.pack_into(b'=cci', self.buf, self.wpos, BSER_OBJECT,
-                    BSER_INT32, val_len)
+                struct.pack_into(
+                    b"=cci",
+                    self.buf,
+                    self.wpos,
+                    BSER_OBJECT,
+                    BSER_INT32,
+                    val_len,
+                )
             elif size == 8:
-                struct.pack_into(b'=ccq', self.buf, self.wpos, BSER_OBJECT,
-                    BSER_INT64, val_len)
+                struct.pack_into(
+                    b"=ccq",
+                    self.buf,
+                    self.wpos,
+                    BSER_OBJECT,
+                    BSER_INT64,
+                    val_len,
+                )
             else:
-                raise RuntimeError('Cannot represent this mapping value')
+                raise RuntimeError("Cannot represent this mapping value")
             self.wpos += needed
             if compat.PYTHON3:
                 iteritems = val.items()
             else:
-                iteritems = val.iteritems()
+                iteritems = val.iteritems()  # noqa: B301 Checked version above
             for k, v in iteritems:
                 self.append_string(k)
                 self.append_recursive(v)
-        elif isinstance(val, collections.Iterable) and \
-            isinstance(val, collections.Sized):
+        elif isinstance(val, collections.Iterable) and isinstance(
+            val, collections.Sized
+        ):
             val_len = len(val)
             size = _int_size(val_len)
             needed = 2 + size
             self.ensure_size(needed)
             if size == 1:
-                struct.pack_into(b'=ccb', self.buf, self.wpos, BSER_ARRAY,
-                    BSER_INT8, val_len)
+                struct.pack_into(
+                    b"=ccb", self.buf, self.wpos, BSER_ARRAY, BSER_INT8, val_len
+                )
             elif size == 2:
-                struct.pack_into(b'=cch', self.buf, self.wpos, BSER_ARRAY,
-                    BSER_INT16, val_len)
+                struct.pack_into(
+                    b"=cch",
+                    self.buf,
+                    self.wpos,
+                    BSER_ARRAY,
+                    BSER_INT16,
+                    val_len,
+                )
             elif size == 4:
-                struct.pack_into(b'=cci', self.buf, self.wpos, BSER_ARRAY,
-                    BSER_INT32, val_len)
+                struct.pack_into(
+                    b"=cci",
+                    self.buf,
+                    self.wpos,
+                    BSER_ARRAY,
+                    BSER_INT32,
+                    val_len,
+                )
             elif size == 8:
-                struct.pack_into(b'=ccq', self.buf, self.wpos, BSER_ARRAY,
-                    BSER_INT64, val_len)
+                struct.pack_into(
+                    b"=ccq",
+                    self.buf,
+                    self.wpos,
+                    BSER_ARRAY,
+                    BSER_INT64,
+                    val_len,
+                )
             else:
-                raise RuntimeError('Cannot represent this sequence value')
+                raise RuntimeError("Cannot represent this sequence value")
             self.wpos += needed
             for v in val:
                 self.append_recursive(v)
         else:
-            raise RuntimeError('Cannot represent unknown value type')
+            raise RuntimeError("Cannot represent unknown value type")
 
 
 def dumps(obj, version=1, capabilities=0):
@@ -238,18 +314,19 @@
     # Now fill in the overall length
     if version == 1:
         obj_len = bser_buf.wpos - len(EMPTY_HEADER)
-        struct.pack_into(b'=i', bser_buf.buf, 3, obj_len)
+        struct.pack_into(b"=i", bser_buf.buf, 3, obj_len)
     else:
         obj_len = bser_buf.wpos - len(EMPTY_HEADER_V2)
-        struct.pack_into(b'=i', bser_buf.buf, 2, capabilities)
-        struct.pack_into(b'=i', bser_buf.buf, 7, obj_len)
-    return bser_buf.buf.raw[:bser_buf.wpos]
+        struct.pack_into(b"=i", bser_buf.buf, 2, capabilities)
+        struct.pack_into(b"=i", bser_buf.buf, 7, obj_len)
+    return bser_buf.buf.raw[: bser_buf.wpos]
+
 
 # This is a quack-alike with the bserObjectType in bser.c
 # It provides by getattr accessors and getitem for both index
 # and name.
 class _BunserDict(object):
-    __slots__ = ('_keys', '_values')
+    __slots__ = ("_keys", "_values")
 
     def __init__(self, keys, values):
         self._keys = keys
@@ -261,18 +338,19 @@
     def __getitem__(self, key):
         if isinstance(key, (int, long)):
             return self._values[key]
-        elif key.startswith('st_'):
+        elif key.startswith("st_"):
             # hack^Wfeature to allow mercurial to use "st_size" to
             # reference "size"
             key = key[3:]
         try:
             return self._values[self._keys.index(key)]
         except ValueError:
-            raise KeyError('_BunserDict has no key %s' % key)
+            raise KeyError("_BunserDict has no key %s" % key)
 
     def __len__(self):
         return len(self._keys)
 
+
 class Bunser(object):
     def __init__(self, mutable=True, value_encoding=None, value_errors=None):
         self.mutable = mutable
@@ -281,7 +359,7 @@
         if value_encoding is None:
             self.value_errors = None
         elif value_errors is None:
-            self.value_errors = 'strict'
+            self.value_errors = "strict"
         else:
             self.value_errors = value_errors
 
@@ -290,33 +368,35 @@
         try:
             int_type = _buf_pos(buf, pos)
         except IndexError:
-            raise ValueError('Invalid bser int encoding, pos out of range')
+            raise ValueError("Invalid bser int encoding, pos out of range")
         if int_type == BSER_INT8:
             needed = 2
-            fmt = b'=b'
+            fmt = b"=b"
         elif int_type == BSER_INT16:
             needed = 3
-            fmt = b'=h'
+            fmt = b"=h"
         elif int_type == BSER_INT32:
             needed = 5
-            fmt = b'=i'
+            fmt = b"=i"
         elif int_type == BSER_INT64:
             needed = 9
-            fmt = b'=q'
+            fmt = b"=q"
         else:
-            raise ValueError('Invalid bser int encoding 0x%s' %
-                             binascii.hexlify(int_type).decode('ascii'))
+            raise ValueError(
+                "Invalid bser int encoding 0x%s at position %s"
+                % (binascii.hexlify(int_type).decode("ascii"), pos)
+            )
         int_val = struct.unpack_from(fmt, buf, pos + 1)[0]
         return (int_val, pos + needed)
 
     def unser_utf8_string(self, buf, pos):
         str_len, pos = self.unser_int(buf, pos + 1)
-        str_val = struct.unpack_from(tobytes(str_len) + b's', buf, pos)[0]
-        return (str_val.decode('utf-8'), pos + str_len)
+        str_val = struct.unpack_from(tobytes(str_len) + b"s", buf, pos)[0]
+        return (str_val.decode("utf-8"), pos + str_len)
 
     def unser_bytestring(self, buf, pos):
         str_len, pos = self.unser_int(buf, pos + 1)
-        str_val = struct.unpack_from(tobytes(str_len) + b's', buf, pos)[0]
+        str_val = struct.unpack_from(tobytes(str_len) + b"s", buf, pos)[0]
         if self.value_encoding is not None:
             str_val = str_val.decode(self.value_encoding, self.value_errors)
             # str_len stays the same because that's the length in bytes
@@ -325,12 +405,12 @@
     def unser_array(self, buf, pos):
         arr_len, pos = self.unser_int(buf, pos + 1)
         arr = []
-        for i in range(arr_len):
+        for _ in range(arr_len):
             arr_item, pos = self.loads_recursive(buf, pos)
             arr.append(arr_item)
 
         if not self.mutable:
-          arr = tuple(arr)
+            arr = tuple(arr)
 
         return arr, pos
 
@@ -342,7 +422,7 @@
             keys = []
             vals = []
 
-        for i in range(obj_len):
+        for _ in range(obj_len):
             key, pos = self.unser_utf8_string(buf, pos)
             val, pos = self.loads_recursive(buf, pos)
             if self.mutable:
@@ -359,13 +439,13 @@
     def unser_template(self, buf, pos):
         val_type = _buf_pos(buf, pos + 1)
         if val_type != BSER_ARRAY:
-            raise RuntimeError('Expect ARRAY to follow TEMPLATE')
+            raise RuntimeError("Expect ARRAY to follow TEMPLATE")
         # force UTF-8 on keys
-        keys_bunser = Bunser(mutable=self.mutable, value_encoding='utf-8')
+        keys_bunser = Bunser(mutable=self.mutable, value_encoding="utf-8")
         keys, pos = keys_bunser.unser_array(buf, pos + 1)
         nitems, pos = self.unser_int(buf, pos)
         arr = []
-        for i in range(nitems):
+        for _ in range(nitems):
             if self.mutable:
                 obj = {}
             else:
@@ -392,11 +472,15 @@
 
     def loads_recursive(self, buf, pos):
         val_type = _buf_pos(buf, pos)
-        if (val_type == BSER_INT8 or val_type == BSER_INT16 or
-            val_type == BSER_INT32 or val_type == BSER_INT64):
+        if (
+            val_type == BSER_INT8
+            or val_type == BSER_INT16
+            or val_type == BSER_INT32
+            or val_type == BSER_INT64
+        ):
             return self.unser_int(buf, pos)
         elif val_type == BSER_REAL:
-            val = struct.unpack_from(b'=d', buf, pos + 1)[0]
+            val = struct.unpack_from(b"=d", buf, pos + 1)[0]
             return (val, pos + 9)
         elif val_type == BSER_TRUE:
             return (True, pos + 1)
@@ -415,23 +499,26 @@
         elif val_type == BSER_TEMPLATE:
             return self.unser_template(buf, pos)
         else:
-            raise ValueError('unhandled bser opcode 0x%s' %
-                             binascii.hexlify(val_type).decode('ascii'))
+            raise ValueError(
+                "unhandled bser opcode 0x%s"
+                % binascii.hexlify(val_type).decode("ascii")
+            )
 
 
 def _pdu_info_helper(buf):
+    bser_version = -1
     if buf[0:2] == EMPTY_HEADER[0:2]:
         bser_version = 1
         bser_capabilities = 0
         expected_len, pos2 = Bunser.unser_int(buf, 2)
     elif buf[0:2] == EMPTY_HEADER_V2[0:2]:
         if len(buf) < 8:
-            raise ValueError('Invalid BSER header')
+            raise ValueError("Invalid BSER header")
         bser_version = 2
         bser_capabilities = struct.unpack_from("I", buf, 2)[0]
         expected_len, pos2 = Bunser.unser_int(buf, 6)
     else:
-        raise ValueError('Invalid BSER header')
+        raise ValueError("Invalid BSER header")
 
     return bser_version, bser_capabilities, expected_len, pos2
 
@@ -470,14 +557,20 @@
     pos = info[3]
 
     if len(buf) != expected_len + pos:
-        raise ValueError('bser data len != header len')
+        raise ValueError(
+            "bser data len %d != header len %d" % (expected_len + pos, len(buf))
+        )
 
-    bunser = Bunser(mutable=mutable, value_encoding=value_encoding,
-                    value_errors=value_errors)
+    bunser = Bunser(
+        mutable=mutable,
+        value_encoding=value_encoding,
+        value_errors=value_errors,
+    )
 
     return bunser.loads_recursive(buf, pos)[0]
 
 
 def load(fp, mutable=True, value_encoding=None, value_errors=None):
     from . import load
+
     return load.load(fp, mutable, value_encoding, value_errors)
diff --git a/hgext/fsmonitor/pywatchman/load.py b/hgext/fsmonitor/pywatchman/load.py
--- a/hgext/fsmonitor/pywatchman/load.py
+++ b/hgext/fsmonitor/pywatchman/load.py
@@ -26,17 +26,17 @@
 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
 # no unicode literals
+from __future__ import absolute_import, division, print_function
+
+import ctypes
+
 
 try:
     from . import bser
 except ImportError:
     from . import pybser as bser
 
-import ctypes
 
 EMPTY_HEADER = b"\x00\x01\x05\x00\x00\x00\x00"
 
@@ -95,13 +95,15 @@
         ctypes.resize(buf, total_len)
 
     body = (ctypes.c_char * (total_len - len(header))).from_buffer(
-        buf, len(header))
+        buf, len(header)
+    )
     read_len = _read_bytes(fp, body)
     if read_len < len(body):
-        raise RuntimeError('bser data ended early')
+        raise RuntimeError("bser data ended early")
 
     return bser.loads(
         (ctypes.c_char * total_len).from_buffer(buf, 0),
         mutable,
         value_encoding,
-        value_errors)
+        value_errors,
+    )
diff --git a/hgext/fsmonitor/pywatchman/encoding.py b/hgext/fsmonitor/pywatchman/encoding.py
--- a/hgext/fsmonitor/pywatchman/encoding.py
+++ b/hgext/fsmonitor/pywatchman/encoding.py
@@ -26,48 +26,50 @@
 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
 # no unicode literals
-
-'''Module to deal with filename encoding on the local system, as returned by
-Watchman.'''
+from __future__ import absolute_import, division, print_function
 
 import sys
 
-from . import (
-    compat,
-)
+from . import compat
+
+
+"""Module to deal with filename encoding on the local system, as returned by
+Watchman."""
+
 
 if compat.PYTHON3:
-    default_local_errors = 'surrogateescape'
+    default_local_errors = "surrogateescape"
 
     def get_local_encoding():
-        if sys.platform == 'win32':
+        if sys.platform == "win32":
             # Watchman always returns UTF-8 encoded strings on Windows.
-            return 'utf-8'
+            return "utf-8"
         # On the Python 3 versions we support, sys.getfilesystemencoding never
         # returns None.
         return sys.getfilesystemencoding()
+
+
 else:
     # Python 2 doesn't support surrogateescape, so use 'strict' by
     # default. Users can register a custom surrogateescape error handler and use
     # that if they so desire.
-    default_local_errors = 'strict'
+    default_local_errors = "strict"
 
     def get_local_encoding():
-        if sys.platform == 'win32':
+        if sys.platform == "win32":
             # Watchman always returns UTF-8 encoded strings on Windows.
-            return 'utf-8'
+            return "utf-8"
         fsencoding = sys.getfilesystemencoding()
         if fsencoding is None:
             # This is very unlikely to happen, but if it does, just use UTF-8
-            fsencoding = 'utf-8'
+            fsencoding = "utf-8"
         return fsencoding
 
+
 def encode_local(s):
     return s.encode(get_local_encoding(), default_local_errors)
 
+
 def decode_local(bs):
     return bs.decode(get_local_encoding(), default_local_errors)
diff --git a/hgext/fsmonitor/pywatchman/compat.py b/hgext/fsmonitor/pywatchman/compat.py
--- a/hgext/fsmonitor/pywatchman/compat.py
+++ b/hgext/fsmonitor/pywatchman/compat.py
@@ -26,20 +26,22 @@
 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
 # no unicode literals
-
-'''Compatibility module across Python 2 and 3.'''
+from __future__ import absolute_import, division, print_function
 
 import sys
 
+
+"""Compatibility module across Python 2 and 3."""
+
+
+PYTHON2 = sys.version_info < (3, 0)
 PYTHON3 = sys.version_info >= (3, 0)
 
 # This is adapted from https://bitbucket.org/gutworth/six, and used under the
 # MIT license. See LICENSE for a full copyright notice.
 if PYTHON3:
+
     def reraise(tp, value, tb=None):
         try:
             if value is None:
@@ -50,16 +52,20 @@
         finally:
             value = None
             tb = None
+
+
 else:
-    exec('''
+    exec(
+        """
 def reraise(tp, value, tb=None):
     try:
         raise tp, value, tb
     finally:
         tb = None
-'''.strip())
+""".strip()
+    )
 
 if PYTHON3:
     UNICODE = str
 else:
-    UNICODE = unicode
+    UNICODE = unicode  # noqa: F821 We handled versioning above
diff --git a/hgext/fsmonitor/pywatchman/capabilities.py b/hgext/fsmonitor/pywatchman/capabilities.py
--- a/hgext/fsmonitor/pywatchman/capabilities.py
+++ b/hgext/fsmonitor/pywatchman/capabilities.py
@@ -26,20 +26,20 @@
 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
 # no unicode literals
+from __future__ import absolute_import, division, print_function
 
 import re
 
+
 def parse_version(vstr):
     res = 0
-    for n in vstr.split('.'):
+    for n in vstr.split("."):
         res = res * 1000
         res = res + int(n)
     return res
 
+
 cap_versions = {
     "cmd-watch-del-all": "3.1.1",
     "cmd-watch-project": "3.1",
@@ -49,23 +49,29 @@
     "wildmatch": "3.7",
 }
 
+
 def check(version, name):
     if name in cap_versions:
         return version >= parse_version(cap_versions[name])
     return False
 
+
 def synthesize(vers, opts):
     """ Synthesize a capability enabled version response
         This is a very limited emulation for relatively recent feature sets
     """
-    parsed_version = parse_version(vers['version'])
-    vers['capabilities'] = {}
-    for name in opts['optional']:
-        vers['capabilities'][name] = check(parsed_version, name)
-    for name in opts['required']:
+    parsed_version = parse_version(vers["version"])
+    vers["capabilities"] = {}
+    for name in opts["optional"]:
+        vers["capabilities"][name] = check(parsed_version, name)
+    failed = False  # noqa: F841 T25377293 Grandfathered in
+    for name in opts["required"]:
         have = check(parsed_version, name)
-        vers['capabilities'][name] = have
+        vers["capabilities"][name] = have
         if not have:
-            vers['error'] = 'client required capability `' + name + \
-                            '` is not supported by this server'
+            vers["error"] = (
+                "client required capability `"
+                + name
+                + "` is not supported by this server"
+            )
     return vers
diff --git a/hgext/fsmonitor/pywatchman/bser.c b/hgext/fsmonitor/pywatchman/bser.c
--- a/hgext/fsmonitor/pywatchman/bser.c
+++ b/hgext/fsmonitor/pywatchman/bser.c
@@ -128,38 +128,27 @@
   Py_ssize_t i, n;
   PyObject* name_bytes = NULL;
   PyObject* ret = NULL;
-  const char* namestr = NULL;
+  const char* namestr;
 
   if (PyIndex_Check(name)) {
     i = PyNumber_AsSsize_t(name, PyExc_IndexError);
     if (i == -1 && PyErr_Occurred()) {
       goto bail;
     }
+    ret = PySequence_GetItem(obj->values, i);
+    goto bail;
+  }
 
-    if (i == 8 && PySequence_Size(obj->values) < 9) {
-      // Hack alert: Python 3 removed support for os.stat().st_mtime
-      // being an integer.Instead, if you need an integer, you have to
-      // use os.stat()[stat.ST_MTIME] instead. stat.ST_MTIME is 8, and
-      // our stat tuples are shorter than that, so we can detect
-      // requests for index 8 on tuples shorter than that and return
-      // st_mtime instead.
-      namestr = "st_mtime";
-    } else {
-      ret = PySequence_GetItem(obj->values, i);
+  // We can be passed in Unicode objects here -- we don't support anything other
+  // than UTF-8 for keys.
+  if (PyUnicode_Check(name)) {
+    name_bytes = PyUnicode_AsUTF8String(name);
+    if (name_bytes == NULL) {
       goto bail;
     }
+    namestr = PyBytes_AsString(name_bytes);
   } else {
-    // We can be passed in Unicode objects here -- we don't support anything other
-    // than UTF-8 for keys.
-    if (PyUnicode_Check(name)) {
-      name_bytes = PyUnicode_AsUTF8String(name);
-      if (name_bytes == NULL) {
-        goto bail;
-      }
-      namestr = PyBytes_AsString(name_bytes);
-    } else {
-      namestr = PyBytes_AsString(name);
-    }
+    namestr = PyBytes_AsString(name);
   }
 
   if (namestr == NULL) {
@@ -1147,11 +1136,15 @@
 }
 
 static PyObject* bser_load(PyObject* self, PyObject* args, PyObject* kw) {
-  PyObject *load, *string;
+  PyObject* load;
+  PyObject* load_method;
+  PyObject* string;
+  PyObject* load_method_args;
+  PyObject* load_method_kwargs;
   PyObject* fp = NULL;
   PyObject* mutable_obj = NULL;
-  const char* value_encoding = NULL;
-  const char* value_errors = NULL;
+  PyObject* value_encoding = NULL;
+  PyObject* value_errors = NULL;
 
   static char* kw_list[] = {
       "fp", "mutable", "value_encoding", "value_errors", NULL};
@@ -1159,7 +1152,7 @@
   if (!PyArg_ParseTupleAndKeywords(
           args,
           kw,
-          "OOzz:load",
+          "O|OOO:load",
           kw_list,
           &fp,
           &mutable_obj,
@@ -1172,8 +1165,33 @@
   if (load == NULL) {
     return NULL;
   }
-  string = PyObject_CallMethod(
-      load, "load", "OOzz", fp, mutable_obj, value_encoding, value_errors);
+  load_method = PyObject_GetAttrString(load, "load");
+  if (load_method == NULL) {
+    return NULL;
+  }
+  // Mandatory method arguments
+  load_method_args = Py_BuildValue("(O)", fp);
+  if (load_method_args == NULL) {
+    return NULL;
+  }
+  // Optional method arguments
+  load_method_kwargs = PyDict_New();
+  if (load_method_kwargs == NULL) {
+    return NULL;
+  }
+  if (mutable_obj) {
+    PyDict_SetItemString(load_method_kwargs, "mutable", mutable_obj);
+  }
+  if (value_encoding) {
+    PyDict_SetItemString(load_method_kwargs, "value_encoding", value_encoding);
+  }
+  if (value_errors) {
+    PyDict_SetItemString(load_method_kwargs, "value_errors", value_errors);
+  }
+  string = PyObject_Call(load_method, load_method_args, load_method_kwargs);
+  Py_DECREF(load_method_kwargs);
+  Py_DECREF(load_method_args);
+  Py_DECREF(load_method);
   Py_DECREF(load);
   return string;
 }
diff --git a/hgext/fsmonitor/pywatchman/__init__.py b/hgext/fsmonitor/pywatchman/__init__.py
--- a/hgext/fsmonitor/pywatchman/__init__.py
+++ b/hgext/fsmonitor/pywatchman/__init__.py
@@ -26,10 +26,8 @@
 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
 # no unicode literals
+from __future__ import absolute_import, division, print_function
 
 import inspect
 import math
@@ -38,33 +36,22 @@
 import subprocess
 import time
 
+from . import capabilities, compat, encoding, load
+
+
 # Sometimes it's really hard to get Python extensions to compile,
 # so fall back to a pure Python implementation.
 try:
     from . import bser
+
     # Demandimport causes modules to be loaded lazily. Force the load now
     # so that we can fall back on pybser if bser doesn't exist
     bser.pdu_info
 except ImportError:
     from . import pybser as bser
 
-from mercurial.utils import (
-    procutil,
-)
 
-from mercurial import (
-    pycompat,
-)
-
-from . import (
-    capabilities,
-    compat,
-    encoding,
-    load,
-)
-
-
-if os.name == 'nt':
+if os.name == "nt":
     import ctypes
     import ctypes.wintypes
 
@@ -73,7 +60,7 @@
     GENERIC_WRITE = 0x40000000
     FILE_FLAG_OVERLAPPED = 0x40000000
     OPEN_EXISTING = 3
-    INVALID_HANDLE_VALUE = -1
+    INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
     FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
     FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
     FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
@@ -92,9 +79,11 @@
 
     class OVERLAPPED(ctypes.Structure):
         _fields_ = [
-            ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
-            ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
-            ("hEvent", wintypes.HANDLE)
+            ("Internal", ULONG_PTR),
+            ("InternalHigh", ULONG_PTR),
+            ("Offset", wintypes.DWORD),
+            ("OffsetHigh", wintypes.DWORD),
+            ("hEvent", wintypes.HANDLE),
         ]
 
         def __init__(self):
@@ -107,9 +96,15 @@
     LPDWORD = ctypes.POINTER(wintypes.DWORD)
 
     CreateFile = ctypes.windll.kernel32.CreateFileA
-    CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
-                           wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
-                           wintypes.HANDLE]
+    CreateFile.argtypes = [
+        wintypes.LPSTR,
+        wintypes.DWORD,
+        wintypes.DWORD,
+        wintypes.LPVOID,
+        wintypes.DWORD,
+        wintypes.DWORD,
+        wintypes.HANDLE,
+    ]
     CreateFile.restype = wintypes.HANDLE
 
     CloseHandle = ctypes.windll.kernel32.CloseHandle
@@ -117,13 +112,23 @@
     CloseHandle.restype = wintypes.BOOL
 
     ReadFile = ctypes.windll.kernel32.ReadFile
-    ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
-                         LPDWORD, ctypes.POINTER(OVERLAPPED)]
+    ReadFile.argtypes = [
+        wintypes.HANDLE,
+        wintypes.LPVOID,
+        wintypes.DWORD,
+        LPDWORD,
+        ctypes.POINTER(OVERLAPPED),
+    ]
     ReadFile.restype = wintypes.BOOL
 
     WriteFile = ctypes.windll.kernel32.WriteFile
-    WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
-                          LPDWORD, ctypes.POINTER(OVERLAPPED)]
+    WriteFile.argtypes = [
+        wintypes.HANDLE,
+        wintypes.LPVOID,
+        wintypes.DWORD,
+        LPDWORD,
+        ctypes.POINTER(OVERLAPPED),
+    ]
     WriteFile.restype = wintypes.BOOL
 
     GetLastError = ctypes.windll.kernel32.GetLastError
@@ -135,34 +140,56 @@
     SetLastError.restype = None
 
     FormatMessage = ctypes.windll.kernel32.FormatMessageA
-    FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
-                              wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
-                              wintypes.DWORD, wintypes.LPVOID]
+    FormatMessage.argtypes = [
+        wintypes.DWORD,
+        wintypes.LPVOID,
+        wintypes.DWORD,
+        wintypes.DWORD,
+        ctypes.POINTER(wintypes.LPSTR),
+        wintypes.DWORD,
+        wintypes.LPVOID,
+    ]
     FormatMessage.restype = wintypes.DWORD
 
     LocalFree = ctypes.windll.kernel32.LocalFree
 
     GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
-    GetOverlappedResult.argtypes = [wintypes.HANDLE,
-                                    ctypes.POINTER(OVERLAPPED), LPDWORD,
-                                    wintypes.BOOL]
+    GetOverlappedResult.argtypes = [
+        wintypes.HANDLE,
+        ctypes.POINTER(OVERLAPPED),
+        LPDWORD,
+        wintypes.BOOL,
+    ]
     GetOverlappedResult.restype = wintypes.BOOL
 
-    GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
-                                    'GetOverlappedResultEx', None)
+    GetOverlappedResultEx = getattr(
+        ctypes.windll.kernel32, "GetOverlappedResultEx", None
+    )
     if GetOverlappedResultEx is not None:
-        GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
-                                          ctypes.POINTER(OVERLAPPED), LPDWORD,
-                                          wintypes.DWORD, wintypes.BOOL]
+        GetOverlappedResultEx.argtypes = [
+            wintypes.HANDLE,
+            ctypes.POINTER(OVERLAPPED),
+            LPDWORD,
+            wintypes.DWORD,
+            wintypes.BOOL,
+        ]
         GetOverlappedResultEx.restype = wintypes.BOOL
 
     WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
-    WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
+    WaitForSingleObjectEx.argtypes = [
+        wintypes.HANDLE,
+        wintypes.DWORD,
+        wintypes.BOOL,
+    ]
     WaitForSingleObjectEx.restype = wintypes.DWORD
 
     CreateEvent = ctypes.windll.kernel32.CreateEventA
-    CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
-                            wintypes.LPSTR]
+    CreateEvent.argtypes = [
+        LPDWORD,
+        wintypes.BOOL,
+        wintypes.BOOL,
+        wintypes.LPSTR,
+    ]
     CreateEvent.restype = wintypes.HANDLE
 
     # Windows Vista is the minimum supported client for CancelIoEx.
@@ -178,9 +205,15 @@
 if _debugging:
 
     def log(fmt, *args):
-        print('[%s] %s' %
-              (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
-               fmt % args[:]))
+        print(
+            "[%s] %s"
+            % (
+                time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
+                fmt % args[:],
+            )
+        )
+
+
 else:
 
     def log(fmt, *args):
@@ -193,8 +226,16 @@
     # FormatMessage will allocate memory and assign it here
     buf = ctypes.c_char_p()
     FormatMessage(
-        FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
-        | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
+        FORMAT_MESSAGE_FROM_SYSTEM
+        | FORMAT_MESSAGE_ALLOCATE_BUFFER
+        | FORMAT_MESSAGE_IGNORE_INSERTS,
+        None,
+        err,
+        0,
+        buf,
+        0,
+        None,
+    )
     try:
         return buf.value
     finally:
@@ -211,21 +252,30 @@
 
     def __str__(self):
         if self.cmd:
-            return '%s, while executing %s' % (self.msg, self.cmd)
+            return "%s, while executing %s" % (self.msg, self.cmd)
         return self.msg
 
 
+class BSERv1Unsupported(WatchmanError):
+    pass
+
+
+class UseAfterFork(WatchmanError):
+    pass
+
+
 class WatchmanEnvironmentError(WatchmanError):
     def __init__(self, msg, errno, errmsg, cmd=None):
         super(WatchmanEnvironmentError, self).__init__(
-            '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
-            cmd)
+            "{0}: errno={1} errmsg={2}".format(msg, errno, errmsg), cmd
+        )
 
 
 class SocketConnectError(WatchmanError):
     def __init__(self, sockpath, exc):
         super(SocketConnectError, self).__init__(
-            'unable to connect to %s: %s' % (sockpath, exc))
+            "unable to connect to %s: %s" % (sockpath, exc)
+        )
         self.sockpath = sockpath
         self.exc = exc
 
@@ -245,15 +295,16 @@
 
     self.msg is the message returned by watchman.
     """
+
     def __init__(self, msg, cmd=None):
         super(CommandError, self).__init__(
-            'watchman command error: %s' % (msg, ),
-            cmd,
+            "watchman command error: %s" % (msg,), cmd
         )
 
 
 class Transport(object):
     """ communication transport to the watchman server """
+
     buf = None
 
     def close(self):
@@ -289,7 +340,7 @@
         while True:
             b = self.readBytes(4096)
             if b"\n" in b:
-                result = b''.join(self.buf)
+                result = b"".join(self.buf)
                 (line, b) = b.split(b"\n", 1)
                 self.buf = [b]
                 return result + line
@@ -298,6 +349,7 @@
 
 class Codec(object):
     """ communication encoding for the watchman server """
+
     transport = None
 
     def __init__(self, transport):
@@ -315,9 +367,10 @@
 
 class UnixSocketTransport(Transport):
     """ local unix domain socket transport """
+
     sock = None
 
-    def __init__(self, sockpath, timeout, watchman_exe):
+    def __init__(self, sockpath, timeout):
         self.sockpath = sockpath
         self.timeout = timeout
 
@@ -331,8 +384,9 @@
             raise SocketConnectError(self.sockpath, e)
 
     def close(self):
-        self.sock.close()
-        self.sock = None
+        if self.sock:
+            self.sock.close()
+            self.sock = None
 
     def setTimeout(self, value):
         self.timeout = value
@@ -342,16 +396,16 @@
         try:
             buf = [self.sock.recv(size)]
             if not buf[0]:
-                raise WatchmanError('empty watchman response')
+                raise WatchmanError("empty watchman response")
             return buf[0]
         except socket.timeout:
-            raise SocketTimeout('timed out waiting for response')
+            raise SocketTimeout("timed out waiting for response")
 
     def write(self, data):
         try:
             self.sock.sendall(data)
         except socket.timeout:
-            raise SocketTimeout('timed out sending query command')
+            raise SocketTimeout("timed out sending query command")
 
 
 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
@@ -364,7 +418,7 @@
     source code (see get_overlapped_result_ex_impl in stream_win.c). This
     way, maintenance should be simplified.
     """
-    log('Preparing to wait for maximum %dms', millis )
+    log("Preparing to wait for maximum %dms", millis)
     if millis != 0:
         waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
         if waitReturnCode == WAIT_OBJECT_0:
@@ -383,12 +437,12 @@
         elif waitReturnCode == WAIT_FAILED:
             # something went wrong calling WaitForSingleObjectEx
             err = GetLastError()
-            log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
+            log("WaitForSingleObjectEx failed: %s", _win32_strerror(err))
             return False
         else:
             # unexpected situation deserving investigation.
             err = GetLastError()
-            log('Unexpected error: %s', _win32_strerror(err))
+            log("Unexpected error: %s", _win32_strerror(err))
             return False
 
     return GetOverlappedResult(pipe, olap, nbytes, False)
@@ -397,36 +451,52 @@
 class WindowsNamedPipeTransport(Transport):
     """ connect to a named pipe """
 
-    def __init__(self, sockpath, timeout, watchman_exe):
+    def __init__(self, sockpath, timeout):
         self.sockpath = sockpath
         self.timeout = int(math.ceil(timeout * 1000))
         self._iobuf = None
 
-        self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
-                               OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
+        if compat.PYTHON3:
+            sockpath = os.fsencode(sockpath)
+        self.pipe = CreateFile(
+            sockpath,
+            GENERIC_READ | GENERIC_WRITE,
+            0,
+            None,
+            OPEN_EXISTING,
+            FILE_FLAG_OVERLAPPED,
+            None,
+        )
 
-        if self.pipe == INVALID_HANDLE_VALUE:
+        err = GetLastError()
+        if self.pipe == INVALID_HANDLE_VALUE or self.pipe == 0:
             self.pipe = None
-            self._raise_win_err('failed to open pipe %s' % sockpath,
-                                GetLastError())
+            raise SocketConnectError(self.sockpath, self._make_win_err("", err))
 
         # event for the overlapped I/O operations
         self._waitable = CreateEvent(None, True, False, None)
+        err = GetLastError()
         if self._waitable is None:
-            self._raise_win_err('CreateEvent failed', GetLastError())
+            self._raise_win_err("CreateEvent failed", err)
 
         self._get_overlapped_result_ex = GetOverlappedResultEx
-        if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
-            self._get_overlapped_result_ex is None):
+        if (
+            os.getenv("WATCHMAN_WIN7_COMPAT") == "1"
+            or self._get_overlapped_result_ex is None
+        ):
             self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
 
     def _raise_win_err(self, msg, err):
-        raise IOError('%s win32 error code: %d %s' %
-                      (msg, err, _win32_strerror(err)))
+        raise self._make_win_err(msg, err)
+
+    def _make_win_err(self, msg, err):
+        return IOError(
+            "%s win32 error code: %d %s" % (msg, err, _win32_strerror(err))
+        )
 
     def close(self):
         if self.pipe:
-            log('Closing pipe')
+            log("Closing pipe")
             CloseHandle(self.pipe)
         self.pipe = None
 
@@ -460,7 +530,7 @@
         olap = OVERLAPPED()
         olap.hEvent = self._waitable
 
-        log('made read buff of size %d', size)
+        log("made read buff of size %d", size)
 
         # ReadFile docs warn against sending in the nread parameter for async
         # operations, so we always collect it via GetOverlappedResultEx
@@ -469,23 +539,23 @@
         if not immediate:
             err = GetLastError()
             if err != ERROR_IO_PENDING:
-                self._raise_win_err('failed to read %d bytes' % size,
-                                    GetLastError())
+                self._raise_win_err("failed to read %d bytes" % size, err)
 
         nread = wintypes.DWORD()
-        if not self._get_overlapped_result_ex(self.pipe, olap, nread,
-                                              0 if immediate else self.timeout,
-                                              True):
+        if not self._get_overlapped_result_ex(
+            self.pipe, olap, nread, 0 if immediate else self.timeout, True
+        ):
             err = GetLastError()
             CancelIoEx(self.pipe, olap)
 
             if err == WAIT_TIMEOUT:
-                log('GetOverlappedResultEx timedout')
-                raise SocketTimeout('timed out after waiting %dms for read' %
-                                    self.timeout)
+                log("GetOverlappedResultEx timedout")
+                raise SocketTimeout(
+                    "timed out after waiting %dms for read" % self.timeout
+                )
 
-            log('GetOverlappedResultEx reports error %d', err)
-            self._raise_win_err('error while waiting for read', err)
+            log("GetOverlappedResultEx reports error %d", err)
+            self._raise_win_err("error while waiting for read", err)
 
         nread = nread.value
         if nread == 0:
@@ -494,7 +564,7 @@
             # other way this shows up is if the client has gotten in a weird
             # state, so let's bail out
             CancelIoEx(self.pipe, olap)
-            raise IOError('Async read yielded 0 bytes; unpossible!')
+            raise IOError("Async read yielded 0 bytes; unpossible!")
 
         # Holds precisely the bytes that we read from the prior request
         buf = buf[:nread]
@@ -511,21 +581,25 @@
         olap = OVERLAPPED()
         olap.hEvent = self._waitable
 
-        immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
-                              None, olap)
+        immediate = WriteFile(
+            self.pipe, ctypes.c_char_p(data), len(data), None, olap
+        )
 
         if not immediate:
             err = GetLastError()
             if err != ERROR_IO_PENDING:
-                self._raise_win_err('failed to write %d bytes' % len(data),
-                                    GetLastError())
+                self._raise_win_err(
+                    "failed to write %d bytes to handle %r"
+                    % (len(data), self.pipe),
+                    err,
+                )
 
         # Obtain results, waiting if needed
         nwrote = wintypes.DWORD()
-        if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
-                                          0 if immediate else self.timeout,
-                                          True):
-            log('made write of %d bytes', nwrote.value)
+        if self._get_overlapped_result_ex(
+            self.pipe, olap, nwrote, 0 if immediate else self.timeout, True
+        ):
+            log("made write of %d bytes", nwrote.value)
             return nwrote.value
 
         err = GetLastError()
@@ -535,10 +609,21 @@
         CancelIoEx(self.pipe, olap)
 
         if err == WAIT_TIMEOUT:
-            raise SocketTimeout('timed out after waiting %dms for write' %
-                                self.timeout)
-        self._raise_win_err('error while waiting for write of %d bytes' %
-                            len(data), err)
+            raise SocketTimeout(
+                "timed out after waiting %dms for write" % self.timeout
+            )
+        self._raise_win_err(
+            "error while waiting for write of %d bytes" % len(data), err
+        )
+
+
+def _default_binpath(binpath=None):
+    if binpath:
+        return binpath
+    # The test harness sets WATCHMAN_BINARY to the binary under test,
+    # so we use that by default, otherwise, allow resolving watchman
+    # from the users PATH.
+    return os.environ.get("WATCHMAN_BINARY", "watchman")
 
 
 class CLIProcessTransport(Transport):
@@ -560,13 +645,14 @@
     It is the responsibility of the caller to set the send and
     receive codecs appropriately.
     """
+
     proc = None
     closed = True
 
-    def __init__(self, sockpath, timeout, watchman_exe):
+    def __init__(self, sockpath, timeout, binpath=None):
         self.sockpath = sockpath
         self.timeout = timeout
-        self.watchman_exe = watchman_exe
+        self.binpath = _default_binpath(binpath)
 
     def close(self):
         if self.proc:
@@ -574,32 +660,32 @@
                 self.proc.kill()
             self.proc.stdin.close()
             self.proc.stdout.close()
+            self.proc.wait()
             self.proc = None
 
     def _connect(self):
         if self.proc:
             return self.proc
         args = [
-            self.watchman_exe,
-            '--sockname={0}'.format(self.sockpath),
-            '--logfile=/BOGUS',
-            '--statefile=/BOGUS',
-            '--no-spawn',
-            '--no-local',
-            '--no-pretty',
-            '-j',
+            self.binpath,
+            "--sockname={0}".format(self.sockpath),
+            "--logfile=/BOGUS",
+            "--statefile=/BOGUS",
+            "--no-spawn",
+            "--no-local",
+            "--no-pretty",
+            "-j",
         ]
-        self.proc = subprocess.Popen(pycompat.rapply(procutil.tonativestr,
-                                                     args),
-                                     stdin=subprocess.PIPE,
-                                     stdout=subprocess.PIPE)
+        self.proc = subprocess.Popen(
+            args, stdin=subprocess.PIPE, stdout=subprocess.PIPE
+        )
         return self.proc
 
     def readBytes(self, size):
         self._connect()
         res = self.proc.stdout.read(size)
-        if res == '':
-            raise WatchmanError('EOF on CLI process transport')
+        if not res:
+            raise WatchmanError("EOF on CLI process transport")
         return res
 
     def write(self, data):
@@ -616,13 +702,22 @@
 class BserCodec(Codec):
     """ use the BSER encoding.  This is the default, preferred codec """
 
+    def __init__(self, transport, value_encoding, value_errors):
+        super(BserCodec, self).__init__(transport)
+        self._value_encoding = value_encoding
+        self._value_errors = value_errors
+
     def _loads(self, response):
-        return bser.loads(response) # Defaults to BSER v1
+        return bser.loads(
+            response,
+            value_encoding=self._value_encoding,
+            value_errors=self._value_errors,
+        )
 
     def receive(self):
         buf = [self.transport.readBytes(sniff_len)]
         if not buf[0]:
-            raise WatchmanError('empty watchman response')
+            raise WatchmanError("empty watchman response")
 
         _1, _2, elen = bser.pdu_info(buf[0])
 
@@ -631,15 +726,15 @@
             buf.append(self.transport.readBytes(elen - rlen))
             rlen += len(buf[-1])
 
-        response = b''.join(buf)
+        response = b"".join(buf)
         try:
             res = self._loads(response)
             return res
         except ValueError as e:
-            raise WatchmanError('watchman response decode error: %s' % e)
+            raise WatchmanError("watchman response decode error: %s" % e)
 
     def send(self, *args):
-        cmd = bser.dumps(*args) # Defaults to BSER v1
+        cmd = bser.dumps(*args)  # Defaults to BSER v1
         self.transport.write(cmd)
 
 
@@ -648,74 +743,96 @@
         immutable object support """
 
     def _loads(self, response):
-        return bser.loads(response, False) # Defaults to BSER v1
+        return bser.loads(
+            response,
+            False,
+            value_encoding=self._value_encoding,
+            value_errors=self._value_errors,
+        )
 
 
 class Bser2WithFallbackCodec(BserCodec):
     """ use BSER v2 encoding """
 
-    def __init__(self, transport):
-        super(Bser2WithFallbackCodec, self).__init__(transport)
-        # Once the server advertises support for bser-v2 we should switch this
-        # to 'required' on Python 3.
-        self.send(["version", {"optional": ["bser-v2"]}])
+    def __init__(self, transport, value_encoding, value_errors):
+        super(Bser2WithFallbackCodec, self).__init__(
+            transport, value_encoding, value_errors
+        )
+        if compat.PYTHON3:
+            bserv2_key = "required"
+        else:
+            bserv2_key = "optional"
+
+        self.send(["version", {bserv2_key: ["bser-v2"]}])
 
         capabilities = self.receive()
 
-        if 'error' in capabilities:
-          raise Exception('Unsupported BSER version')
+        if "error" in capabilities:
+            raise BSERv1Unsupported(
+                "The watchman server version does not support Python 3. Please "
+                "upgrade your watchman server."
+            )
 
-        if capabilities['capabilities']['bser-v2']:
+        if capabilities["capabilities"]["bser-v2"]:
             self.bser_version = 2
             self.bser_capabilities = 0
         else:
             self.bser_version = 1
             self.bser_capabilities = 0
 
-    def _loads(self, response):
-        return bser.loads(response)
-
     def receive(self):
         buf = [self.transport.readBytes(sniff_len)]
         if not buf[0]:
-            raise WatchmanError('empty watchman response')
+            raise WatchmanError("empty watchman response")
 
         recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
 
-        if hasattr(self, 'bser_version'):
-          # Readjust BSER version and capabilities if necessary
-          self.bser_version = max(self.bser_version, recv_bser_version)
-          self.capabilities = self.bser_capabilities & recv_bser_capabilities
+        if hasattr(self, "bser_version"):
+            # Readjust BSER version and capabilities if necessary
+            self.bser_version = max(self.bser_version, recv_bser_version)
+            self.capabilities = self.bser_capabilities & recv_bser_capabilities
 
         rlen = len(buf[0])
         while elen > rlen:
             buf.append(self.transport.readBytes(elen - rlen))
             rlen += len(buf[-1])
 
-        response = b''.join(buf)
+        response = b"".join(buf)
         try:
             res = self._loads(response)
             return res
         except ValueError as e:
-            raise WatchmanError('watchman response decode error: %s' % e)
+            raise WatchmanError("watchman response decode error: %s" % e)
 
     def send(self, *args):
-        if hasattr(self, 'bser_version'):
-            cmd = bser.dumps(*args, version=self.bser_version,
-                capabilities=self.bser_capabilities)
+        if hasattr(self, "bser_version"):
+            cmd = bser.dumps(
+                *args,
+                version=self.bser_version,
+                capabilities=self.bser_capabilities
+            )
         else:
             cmd = bser.dumps(*args)
         self.transport.write(cmd)
 
 
+class ImmutableBser2Codec(Bser2WithFallbackCodec, ImmutableBserCodec):
+    """ use the BSER encoding, decoding values using the newer
+        immutable object support """
+
+    pass
+
+
 class JsonCodec(Codec):
     """ Use json codec.  This is here primarily for testing purposes """
+
     json = None
 
     def __init__(self, transport):
         super(JsonCodec, self).__init__(transport)
         # optional dep on json, only if JsonCodec is used
         import json
+
         self.json = json
 
     def receive(self):
@@ -727,7 +844,7 @@
             # but it's possible we might get non-ASCII bytes that are valid
             # UTF-8.
             if compat.PYTHON3:
-                line = line.decode('utf-8')
+                line = line.decode("utf-8")
             return self.json.loads(line)
         except Exception as e:
             print(e, line)
@@ -739,12 +856,13 @@
         # containing Unicode strings to Unicode string. Even with (the default)
         # ensure_ascii=True, dumps returns a Unicode string.
         if compat.PYTHON3:
-            cmd = cmd.encode('ascii')
+            cmd = cmd.encode("ascii")
         self.transport.write(cmd + b"\n")
 
 
 class client(object):
     """ Handles the communication with the watchman service """
+
     sockpath = None
     transport = None
     sendCodec = None
@@ -754,60 +872,100 @@
     subs = {}  # Keyed by subscription name
     sub_by_root = {}  # Keyed by root, then by subscription name
     logs = []  # When log level is raised
-    unilateral = ['log', 'subscription']
+    unilateral = ["log", "subscription"]
     tport = None
     useImmutableBser = None
-    watchman_exe = None
+    pid = None
 
-    def __init__(self,
-                 sockpath=None,
-                 timeout=1.0,
-                 transport=None,
-                 sendEncoding=None,
-                 recvEncoding=None,
-                 useImmutableBser=False,
-                 watchman_exe=None):
+    def __init__(
+        self,
+        sockpath=None,
+        timeout=1.0,
+        transport=None,
+        sendEncoding=None,
+        recvEncoding=None,
+        useImmutableBser=False,
+        # use False for these two because None has a special
+        # meaning
+        valueEncoding=False,
+        valueErrors=False,
+        binpath=None,
+    ):
         self.sockpath = sockpath
         self.timeout = timeout
         self.useImmutableBser = useImmutableBser
-        self.watchman_exe = watchman_exe
+        self.binpath = _default_binpath(binpath)
 
         if inspect.isclass(transport) and issubclass(transport, Transport):
             self.transport = transport
         else:
-            transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
-            if transport == 'local' and os.name == 'nt':
+            transport = transport or os.getenv("WATCHMAN_TRANSPORT") or "local"
+            if transport == "local" and os.name == "nt":
                 self.transport = WindowsNamedPipeTransport
-            elif transport == 'local':
+            elif transport == "local":
                 self.transport = UnixSocketTransport
-            elif transport == 'cli':
+            elif transport == "cli":
                 self.transport = CLIProcessTransport
                 if sendEncoding is None:
-                    sendEncoding = 'json'
+                    sendEncoding = "json"
                 if recvEncoding is None:
                     recvEncoding = sendEncoding
             else:
-                raise WatchmanError('invalid transport %s' % transport)
+                raise WatchmanError("invalid transport %s" % transport)
 
-        sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
-                           'bser')
-        recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
-                           'bser')
+        sendEncoding = str(
+            sendEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
+        )
+        recvEncoding = str(
+            recvEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
+        )
 
         self.recvCodec = self._parseEncoding(recvEncoding)
         self.sendCodec = self._parseEncoding(sendEncoding)
 
+        # We want to act like the native OS methods as much as possible. This
+        # means returning bytestrings on Python 2 by default and Unicode
+        # strings on Python 3. However we take an optional argument that lets
+        # users override this.
+        if valueEncoding is False:
+            if compat.PYTHON3:
+                self.valueEncoding = encoding.get_local_encoding()
+                self.valueErrors = encoding.default_local_errors
+            else:
+                self.valueEncoding = None
+                self.valueErrors = None
+        else:
+            self.valueEncoding = valueEncoding
+            if valueErrors is False:
+                self.valueErrors = encoding.default_local_errors
+            else:
+                self.valueErrors = valueErrors
+
+    def _makeBSERCodec(self, codec):
+        def make_codec(transport):
+            return codec(transport, self.valueEncoding, self.valueErrors)
+
+        return make_codec
+
     def _parseEncoding(self, enc):
-        if enc == 'bser':
+        if enc == "bser":
             if self.useImmutableBser:
-                return ImmutableBserCodec
-            return BserCodec
-        elif enc == 'experimental-bser-v2':
-          return Bser2WithFallbackCodec
-        elif enc == 'json':
+                return self._makeBSERCodec(ImmutableBser2Codec)
+            return self._makeBSERCodec(Bser2WithFallbackCodec)
+        elif enc == "bser-v1":
+            if compat.PYTHON3:
+                raise BSERv1Unsupported(
+                    "Python 3 does not support the BSER v1 encoding: specify "
+                    '"bser" or omit the sendEncoding and recvEncoding '
+                    "arguments"
+                )
+            if self.useImmutableBser:
+                return self._makeBSERCodec(ImmutableBserCodec)
+            return self._makeBSERCodec(BserCodec)
+        elif enc == "json":
             return JsonCodec
         else:
-            raise WatchmanError('invalid encoding %s' % enc)
+            raise WatchmanError("invalid encoding %s" % enc)
 
     def _hasprop(self, result, name):
         if self.useImmutableBser:
@@ -817,29 +975,28 @@
     def _resolvesockname(self):
         # if invoked via a trigger, watchman will set this env var; we
         # should use it unless explicitly set otherwise
-        path = os.getenv('WATCHMAN_SOCK')
+        path = os.getenv("WATCHMAN_SOCK")
         if path:
             return path
 
-        cmd = [self.watchman_exe, '--output-encoding=bser', 'get-sockname']
+        cmd = [self.binpath, "--output-encoding=bser", "get-sockname"]
         try:
-            args = dict(stdout=subprocess.PIPE,
-                        stderr=subprocess.PIPE,
-                        close_fds=os.name != 'nt')
+            args = dict(
+                stdout=subprocess.PIPE, stderr=subprocess.PIPE
+            )  # noqa: C408
 
-            if os.name == 'nt':
+            if os.name == "nt":
                 # if invoked via an application with graphical user interface,
                 # this call will cause a brief command window pop-up.
                 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
                 startupinfo = subprocess.STARTUPINFO()
                 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
-                args['startupinfo'] = startupinfo
+                args["startupinfo"] = startupinfo
 
-            p = subprocess.Popen(pycompat.rapply(procutil.tonativestr, cmd),
-                                 **args)
+            p = subprocess.Popen(cmd, **args)
 
         except OSError as e:
-            raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
+            raise WatchmanError('"watchman" executable not in PATH (%s)', e)
 
         stdout, stderr = p.communicate()
         exitcode = p.poll()
@@ -848,27 +1005,43 @@
             raise WatchmanError("watchman exited with code %d" % exitcode)
 
         result = bser.loads(stdout)
-        if b'error' in result:
-            raise WatchmanError('get-sockname error: %s' % result['error'])
+        if "error" in result:
+            raise WatchmanError("get-sockname error: %s" % result["error"])
 
-        return result[b'sockname']
+        return result["sockname"]
 
     def _connect(self):
         """ establish transport connection """
 
         if self.recvConn:
+            if self.pid != os.getpid():
+                raise UseAfterFork(
+                    "do not re-use a connection after fork; open a new client instead"
+                )
             return
 
         if self.sockpath is None:
             self.sockpath = self._resolvesockname()
 
-        self.tport = self.transport(self.sockpath, self.timeout, self.watchman_exe)
+        kwargs = {}
+        if self.transport == CLIProcessTransport:
+            kwargs["binpath"] = self.binpath
+
+        self.tport = self.transport(self.sockpath, self.timeout, **kwargs)
         self.sendConn = self.sendCodec(self.tport)
         self.recvConn = self.recvCodec(self.tport)
+        self.pid = os.getpid()
 
     def __del__(self):
         self.close()
 
+    def __enter__(self):
+        self._connect()
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_traceback):
+        self.close()
+
     def close(self):
         if self.tport:
             self.tport.close()
@@ -893,26 +1066,20 @@
 
         self._connect()
         result = self.recvConn.receive()
-        if self._hasprop(result, 'error'):
-            error = result['error']
-            if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
-                error = result['error'].decode('utf-8', 'surrogateescape')
-            raise CommandError(error)
+        if self._hasprop(result, "error"):
+            raise CommandError(result["error"])
 
-        if self._hasprop(result, 'log'):
-            log = result['log']
-            if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
-                log = log.decode('utf-8', 'surrogateescape')
-            self.logs.append(log)
+        if self._hasprop(result, "log"):
+            self.logs.append(result["log"])
 
-        if self._hasprop(result, 'subscription'):
-            sub = result['subscription']
+        if self._hasprop(result, "subscription"):
+            sub = result["subscription"]
             if not (sub in self.subs):
                 self.subs[sub] = []
             self.subs[sub].append(result)
 
             # also accumulate in {root,sub} keyed store
-            root = os.path.normcase(result['root'])
+            root = os.path.normpath(os.path.normcase(result["root"]))
             if not root in self.sub_by_root:
                 self.sub_by_root[root] = {}
             if not sub in self.sub_by_root[root]:
@@ -922,7 +1089,7 @@
         return result
 
     def isUnilateralResponse(self, res):
-        if 'unilateral' in res and res['unilateral']:
+        if "unilateral" in res and res["unilateral"]:
             return True
         # Fall back to checking for known unilateral responses
         for k in self.unilateral:
@@ -955,18 +1122,11 @@
         remove processing impacts both the unscoped and scoped stores
         for the subscription data.
         """
-        if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
-            # People may pass in Unicode strings here -- but currently BSER only
-            # returns bytestrings. Deal with that.
-            if isinstance(root, str):
-                root = encoding.encode_local(root)
-            if isinstance(name, str):
-                name = name.encode('utf-8')
-
         if root is not None:
-            if not root in self.sub_by_root:
+            root = os.path.normpath(os.path.normcase(root))
+            if root not in self.sub_by_root:
                 return None
-            if not name in self.sub_by_root[root]:
+            if name not in self.sub_by_root[root]:
                 return None
             sub = self.sub_by_root[root][name]
             if remove:
@@ -976,7 +1136,7 @@
                     del self.subs[name]
             return sub
 
-        if not (name in self.subs):
+        if name not in self.subs:
             return None
         sub = self.subs[name]
         if remove:
@@ -992,7 +1152,7 @@
         and NOT returned via this method.
         """
 
-        log('calling client.query')
+        log("calling client.query")
         self._connect()
         try:
             self.sendConn.send(args)
@@ -1006,27 +1166,27 @@
             # When we can depend on Python 3, we can use PEP 3134
             # exception chaining here.
             raise WatchmanEnvironmentError(
-                'I/O error communicating with watchman daemon',
+                "I/O error communicating with watchman daemon",
                 ee.errno,
                 ee.strerror,
-                args)
+                args,
+            )
         except WatchmanError as ex:
             ex.setCommand(args)
             raise
 
     def capabilityCheck(self, optional=None, required=None):
         """ Perform a server capability check """
-        res = self.query('version', {
-            'optional': optional or [],
-            'required': required or []
-        })
+        res = self.query(
+            "version", {"optional": optional or [], "required": required or []}
+        )
 
-        if not self._hasprop(res, 'capabilities'):
+        if not self._hasprop(res, "capabilities"):
             # Server doesn't support capabilities, so we need to
             # synthesize the results based on the version
             capabilities.synthesize(res, optional)
-            if 'error' in res:
-                raise CommandError(res['error'])
+            if "error" in res:
+                raise CommandError(res["error"])
 
         return res
 



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list