[PATCH 3 of 9] test-commandserver: split helper functions to new hgclient module

Yuya Nishihara yuya at tcha.org
Sun Sep 28 09:17:18 CDT 2014


# HG changeset patch
# User Yuya Nishihara <yuya at tcha.org>
# Date 1411878676 -32400
#      Sun Sep 28 13:31:16 2014 +0900
# Node ID f2f3fb7f326245ba50e333200a918bc95566472e
# Parent  b665907351a96e88e84841fd97b9ddceec9c476e
test-commandserver: split helper functions to new hgclient module

This prepares for porting test-commandserver.py to .t test.

Though command-server test needs many Python codes, .t test will be more
readable than .py test thanks to inlined output.

diff --git a/contrib/hgclient.py b/contrib/hgclient.py
new file mode 100644
--- /dev/null
+++ b/contrib/hgclient.py
@@ -0,0 +1,75 @@
+# A minimal client for Mercurial's command server
+
+import sys, struct, subprocess, cStringIO
+
+def connect(path=None):
+    cmdline = ['hg', 'serve', '--cmdserver', 'pipe']
+    if path:
+        cmdline += ['-R', path]
+
+    server = subprocess.Popen(cmdline, stdin=subprocess.PIPE,
+                              stdout=subprocess.PIPE)
+
+    return server
+
+def writeblock(server, data):
+    server.stdin.write(struct.pack('>I', len(data)))
+    server.stdin.write(data)
+    server.stdin.flush()
+
+def readchannel(server):
+    data = server.stdout.read(5)
+    if not data:
+        raise EOFError
+    channel, length = struct.unpack('>cI', data)
+    if channel in 'IL':
+        return channel, length
+    else:
+        return channel, server.stdout.read(length)
+
+def sep(text):
+    return text.replace('\\', '/')
+
+def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None,
+               outfilter=lambda x: x):
+    print ' runcommand', ' '.join(args)
+    sys.stdout.flush()
+    server.stdin.write('runcommand\n')
+    writeblock(server, '\0'.join(args))
+
+    if not input:
+        input = cStringIO.StringIO()
+
+    while True:
+        ch, data = readchannel(server)
+        if ch == 'o':
+            output.write(outfilter(data))
+            output.flush()
+        elif ch == 'e':
+            error.write(data)
+            error.flush()
+        elif ch == 'I':
+            writeblock(server, input.read(data))
+        elif ch == 'L':
+            writeblock(server, input.readline(data))
+        elif ch == 'r':
+            ret, = struct.unpack('>i', data)
+            if ret != 0:
+                print ' [%d]' % ret
+            return ret
+        else:
+            print "unexpected channel %c: %r" % (ch, data)
+            if ch.isupper():
+                return
+
+def check(func, repopath=None):
+    print
+    print 'testing %s:' % func.__name__
+    print
+    sys.stdout.flush()
+    server = connect(repopath)
+    try:
+        return func(server)
+    finally:
+        server.stdin.close()
+        server.wait()
diff --git a/tests/test-commandserver.py b/tests/test-commandserver.py
--- a/tests/test-commandserver.py
+++ b/tests/test-commandserver.py
@@ -1,76 +1,7 @@
-import sys, os, struct, subprocess, cStringIO, re, shutil
+import sys, os, cStringIO, re, shutil
 
-def connect(path=None):
-    cmdline = ['hg', 'serve', '--cmdserver', 'pipe']
-    if path:
-        cmdline += ['-R', path]
-
-    server = subprocess.Popen(cmdline, stdin=subprocess.PIPE,
-                              stdout=subprocess.PIPE)
-
-    return server
-
-def writeblock(server, data):
-    server.stdin.write(struct.pack('>I', len(data)))
-    server.stdin.write(data)
-    server.stdin.flush()
-
-def readchannel(server):
-    data = server.stdout.read(5)
-    if not data:
-        raise EOFError
-    channel, length = struct.unpack('>cI', data)
-    if channel in 'IL':
-        return channel, length
-    else:
-        return channel, server.stdout.read(length)
-
-def sep(text):
-    return text.replace('\\', '/')
-
-def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None,
-               outfilter=lambda x: x):
-    print ' runcommand', ' '.join(args)
-    sys.stdout.flush()
-    server.stdin.write('runcommand\n')
-    writeblock(server, '\0'.join(args))
-
-    if not input:
-        input = cStringIO.StringIO()
-
-    while True:
-        ch, data = readchannel(server)
-        if ch == 'o':
-            output.write(outfilter(data))
-            output.flush()
-        elif ch == 'e':
-            error.write(data)
-            error.flush()
-        elif ch == 'I':
-            writeblock(server, input.read(data))
-        elif ch == 'L':
-            writeblock(server, input.readline(data))
-        elif ch == 'r':
-            ret, = struct.unpack('>i', data)
-            if ret != 0:
-                print ' [%d]' % ret
-            return ret
-        else:
-            print "unexpected channel %c: %r" % (ch, data)
-            if ch.isupper():
-                return
-
-def check(func, repopath=None):
-    print
-    print 'testing %s:' % func.__name__
-    print
-    sys.stdout.flush()
-    server = connect(repopath)
-    try:
-        return func(server)
-    finally:
-        server.stdin.close()
-        server.wait()
+sys.path.insert(0, os.path.join(os.environ['TESTDIR'], '..', 'contrib'))
+from hgclient import readchannel, sep, runcommand, check
 
 def unknowncommand(server):
     server.stdin.write('unknowncommand\n')


More information about the Mercurial-devel mailing list