[PATCH 08 of 21 V2] speedy: client/server communication over tcp sockets

Tomasz Kleczek tkleczek at fb.com
Thu Dec 13 20:52:20 CST 2012


# HG changeset patch
# User Tomasz Kleczek <tkleczek at fb.com>
# Date 1355422154 28800
# Node ID 31118161d007e4a274b151d9403640275667a070
# Parent  2020a29e9e1100521b2a70ab8ce704a2b754c2a0
speedy: client/server communication over tcp sockets

Adds `metaserve` command which starts the history server listening
for incoming client connections and ready to handle requests
(answer queries).

Port on which the server should run may be specified by the `speedy.port`
config option.


Also added client proxy that is responsible for communication client-side.

Network address of the server may be specified by the `speedy.host`
config option.

Both server and client use wireprotocol to (de)serialize query parametes.

diff --git a/hgext/speedy/__init__.py b/hgext/speedy/__init__.py
--- a/hgext/speedy/__init__.py
+++ b/hgext/speedy/__init__.py
@@ -4,6 +4,9 @@
 # GNU General Public License version 2 or any later version.
 
 import client
+import server
+
+cmdtable = server.cmdtable
 
 def uisetup(ui):
     if not ui.configbool('speedy', 'server', False):
diff --git a/hgext/speedy/client.py b/hgext/speedy/client.py
--- a/hgext/speedy/client.py
+++ b/hgext/speedy/client.py
@@ -9,6 +9,8 @@
 from mercurial import localrepo
 from mercurial.i18n import _
 import server
+import protocol
+import tcptransport
 import localtransport
 
 def nodestorevs(repo, nodes):
@@ -89,12 +91,28 @@
     return [ r for r in subset if r in revs ]
 
 def _speedysetup(ui, repo):
-    """Initialize speedy client."""
+    """Initialize speedy client.
 
-    serverrepopath = ui.config('speedy', 'serverrepo', repo.root)
-    serverrepo = localrepo.localrepository(ui, path=serverrepopath)
-    mserver = server.makeserver(serverrepo)
-    proxy = localtransport.localclient(mserver)
+    If client is run in nonlocal mode the initialization time is
+    negligible.
+
+    Also, there is no extra overhead if the user runs a query which
+    is not supported by the extension.
+    """
+
+    serverrepopath = ui.config('speedy', 'serverrepo', '')
+    host = ui.config('speedy', 'host', '')
+
+    if host:
+        if not serverrepopath:
+            raise util.Abort(_("config option 'serverrepo' required by option 'host'"))
+        proxy = tcptransport.tcpclient(host, protocol.wireprotocol)
+    else:
+        if not serverrepopath:
+            serverrepopath = repo.root
+        serverrepo = localrepo.localrepository(ui, path=serverrepopath)
+        mserver = server.makeserver(serverrepo)
+        proxy = localtransport.localclient(mserver)
 
     mpeer = metapeer(proxy, repo, serverrepopath)
 
diff --git a/hgext/speedy/server.py b/hgext/speedy/server.py
--- a/hgext/speedy/server.py
+++ b/hgext/speedy/server.py
@@ -8,9 +8,18 @@
 metaserver: contains all the logic behind query acceleration
 """
 
+import collections
 from mercurial import revset
 from mercurial import encoding
+from mercurial import cmdutil
+from mercurial.i18n import _
+from mercurial import util
 import index
+import protocol
+import tcptransport
+
+cmdtable = {}
+command = cmdutil.command(cmdtable)
 
 class metaserver(object):
     """Contains all the logic behind the query acceleration."""
@@ -41,11 +50,22 @@
 }
 
 def makeserver(repo):
-    """Return an initialized metaserver instance.
-
-    Update the indices to the most recent revision along the way.
-    """
+    """Return an initialized metaserver instance."""
     ctxs = [repo[r] for r in xrange(0, len(repo))]
     indices = dict([(name, create(ctxs)) for name, create in
         indicecfg.iteritems()])
     return metaserver(repo, indices)
+
+ at command('metaserve', [], _(''))
+def metaserve(ui, repo, **opts):
+    """Starts a metadata server."""
+    meta = makeserver(repo)
+
+    port = util.getport(ui.config('speedy', 'port', 8999))
+
+    server = tcptransport.tcpserver(port, meta, protocol.wireprotocol)
+
+    serviceopts = collections.defaultdict(lambda: '')
+    ui.status(_('listening on port %d\n' % port))
+    cmdutil.service(serviceopts, runfn=server.serve_forever)
+
diff --git a/hgext/speedy/tcptransport.py b/hgext/speedy/tcptransport.py
new file mode 100644
--- /dev/null
+++ b/hgext/speedy/tcptransport.py
@@ -0,0 +1,91 @@
+# Copyright 2012 Facebook
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+"""Client/server communication over TCP sockets."""
+
+import SocketServer
+import socket
+import cStringIO
+import urlparse
+import transport
+
+def exactreader(read):
+    """Return a function that reads and returns a string of the specified
+    length.
+
+    read: a blocking function that takes an integer `sz` then reads and
+        returns a string with at most `sz` characters.
+
+    Used to wrap the socket.recv function.
+    """
+    # Maximum number of bytes to read from socket in one call.
+    maxread = 4096
+    def readexactly(sz):
+        """Read and return a string of length `sz`."""
+        data = read(sz)
+        if len(data) == sz:
+            # Fast path, no additional copying to/from buffer
+            return data
+        else:
+            buf = cStringIO.StringIO(data)
+            sz -= len(data)
+            while sz > 0:
+                # It is recommended
+                data = read(min(sz, maxread))
+                if not data:
+                    raise socket.error("read too few bytes")
+                sz -= len(data)
+                buf.write(data)
+            return buf.getvalue()
+    return readexactly
+
+class tcpclient(transport.clientproxy):
+    """Sends queries to server using TCP sockets directly."""
+
+    def __init__(self, uri, protoclass):
+        parsed = urlparse.urlparse(uri, scheme='http')
+        self.port = parsed.port
+        self.host = parsed.hostname
+        self.protoclass = protoclass
+        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+    def request(self, queryname, args):
+        """Send a single query to the server and return the response.
+
+        The arguments (de)serialization is done behind the scenes by the
+        provided protocol.
+
+        Blocks until the complete response is returned.
+        """
+        self._sock.connect((self.host, self.port))
+        try:
+            proto = self.protoclass(exactreader(self._sock.recv),
+                    self._sock.sendall)
+            proto.serialize([queryname, args])
+            return proto.deserialize()
+        finally:
+            self._sock.close()
+
+class tcphandler(SocketServer.StreamRequestHandler):
+    """Handles single connection to the tcpserver."""
+
+    def handle(self):
+        """Read the query and return history server response.
+
+        The arguments (de)serialization is done behind the scences by the
+        provided protocol.
+        """
+        proto = self.server._protoclass(exactreader(self.rfile.read),
+            self.wfile.write)
+        queryname, args = proto.deserialize()
+        ret = getattr(self.server._mserver, queryname)(*args)
+        proto.serialize(ret)
+
+class tcpserver(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
+    """Server for the history queries."""
+    def __init__(self, port, mserver, protoclass):
+        SocketServer.TCPServer.__init__(self, ('', port), tcphandler)
+        self._mserver = mserver
+        self._protoclass = protoclass
diff --git a/tests/test-speedy.t b/tests/test-speedy.t
--- a/tests/test-speedy.t
+++ b/tests/test-speedy.t
@@ -134,3 +134,48 @@
   chg1
   chg0
 
+Testing socket server
+
+Writing server config file
+  $ cat >> $TESTTMP/serverrepo/.hg/hgrc <<EOF_END
+  > [speedy]
+  > port = 8123
+  > EOF_END
+
+Writing local config file
+  $ cat >> $TESTTMP/localrepo/.hg/hgrc <<EOF_END
+  > [speedy]
+  > host = localhost:8123
+  > EOF_END
+
+  $ cd $TESTTMP/serverrepo
+
+  $ (
+  > hg metaserve > out 2>&1 & 
+  > METASERVER_PID=$!
+  > echo $METASERVER_PID 1> pidfile
+  > ) > /dev/null 2>&1
+  $ sleep 1
+
+  $ cd $TESTTMP/localrepo
+
+  $ hg log -r "reverse(user(testuser1))"
+  chg8
+  chgl6
+  chg5
+  chg4
+  chg3
+  chg2
+  chg0
+
+  $ cat >> $TESTTMP/localrepo/.hg/hgrc <<EOF_END
+  > [speedy]
+  > client = False
+  > EOF_END
+
+  $ cd $TESTTMP/serverrepo
+
+  $ kill `cat pidfile`
+  $ cat out
+  listening on port 8123
+  killed!


More information about the Mercurial-devel mailing list