[PATCH 08 of 21 V2] speedy: client/server communication over tcp sockets
Tomasz Kleczek
tkleczek at fb.com
Thu Dec 13 20:52:20 CST 2012
# HG changeset patch
# User Tomasz Kleczek <tkleczek at fb.com>
# Date 1355422154 28800
# Node ID 31118161d007e4a274b151d9403640275667a070
# Parent 2020a29e9e1100521b2a70ab8ce704a2b754c2a0
speedy: client/server communication over tcp sockets
Adds `metaserve` command which starts the history server listening
for incoming client connections and ready to handle requests
(answer queries).
Port on which the server should run may be specified by the `speedy.port`
config option.
Also added client proxy that is responsible for communication client-side.
Network address of the server may be specified by the `speedy.host`
config option.
Both server and client use wireprotocol to (de)serialize query parametes.
diff --git a/hgext/speedy/__init__.py b/hgext/speedy/__init__.py
--- a/hgext/speedy/__init__.py
+++ b/hgext/speedy/__init__.py
@@ -4,6 +4,9 @@
# GNU General Public License version 2 or any later version.
import client
+import server
+
+cmdtable = server.cmdtable
def uisetup(ui):
if not ui.configbool('speedy', 'server', False):
diff --git a/hgext/speedy/client.py b/hgext/speedy/client.py
--- a/hgext/speedy/client.py
+++ b/hgext/speedy/client.py
@@ -9,6 +9,8 @@
from mercurial import localrepo
from mercurial.i18n import _
import server
+import protocol
+import tcptransport
import localtransport
def nodestorevs(repo, nodes):
@@ -89,12 +91,28 @@
return [ r for r in subset if r in revs ]
def _speedysetup(ui, repo):
- """Initialize speedy client."""
+ """Initialize speedy client.
- serverrepopath = ui.config('speedy', 'serverrepo', repo.root)
- serverrepo = localrepo.localrepository(ui, path=serverrepopath)
- mserver = server.makeserver(serverrepo)
- proxy = localtransport.localclient(mserver)
+ If client is run in nonlocal mode the initialization time is
+ negligible.
+
+ Also, there is no extra overhead if the user runs a query which
+ is not supported by the extension.
+ """
+
+ serverrepopath = ui.config('speedy', 'serverrepo', '')
+ host = ui.config('speedy', 'host', '')
+
+ if host:
+ if not serverrepopath:
+ raise util.Abort(_("config option 'serverrepo' required by option 'host'"))
+ proxy = tcptransport.tcpclient(host, protocol.wireprotocol)
+ else:
+ if not serverrepopath:
+ serverrepopath = repo.root
+ serverrepo = localrepo.localrepository(ui, path=serverrepopath)
+ mserver = server.makeserver(serverrepo)
+ proxy = localtransport.localclient(mserver)
mpeer = metapeer(proxy, repo, serverrepopath)
diff --git a/hgext/speedy/server.py b/hgext/speedy/server.py
--- a/hgext/speedy/server.py
+++ b/hgext/speedy/server.py
@@ -8,9 +8,18 @@
metaserver: contains all the logic behind query acceleration
"""
+import collections
from mercurial import revset
from mercurial import encoding
+from mercurial import cmdutil
+from mercurial.i18n import _
+from mercurial import util
import index
+import protocol
+import tcptransport
+
+cmdtable = {}
+command = cmdutil.command(cmdtable)
class metaserver(object):
"""Contains all the logic behind the query acceleration."""
@@ -41,11 +50,22 @@
}
def makeserver(repo):
- """Return an initialized metaserver instance.
-
- Update the indices to the most recent revision along the way.
- """
+ """Return an initialized metaserver instance."""
ctxs = [repo[r] for r in xrange(0, len(repo))]
indices = dict([(name, create(ctxs)) for name, create in
indicecfg.iteritems()])
return metaserver(repo, indices)
+
+ at command('metaserve', [], _(''))
+def metaserve(ui, repo, **opts):
+ """Starts a metadata server."""
+ meta = makeserver(repo)
+
+ port = util.getport(ui.config('speedy', 'port', 8999))
+
+ server = tcptransport.tcpserver(port, meta, protocol.wireprotocol)
+
+ serviceopts = collections.defaultdict(lambda: '')
+ ui.status(_('listening on port %d\n' % port))
+ cmdutil.service(serviceopts, runfn=server.serve_forever)
+
diff --git a/hgext/speedy/tcptransport.py b/hgext/speedy/tcptransport.py
new file mode 100644
--- /dev/null
+++ b/hgext/speedy/tcptransport.py
@@ -0,0 +1,91 @@
+# Copyright 2012 Facebook
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+"""Client/server communication over TCP sockets."""
+
+import SocketServer
+import socket
+import cStringIO
+import urlparse
+import transport
+
+def exactreader(read):
+ """Return a function that reads and returns a string of the specified
+ length.
+
+ read: a blocking function that takes an integer `sz` then reads and
+ returns a string with at most `sz` characters.
+
+ Used to wrap the socket.recv function.
+ """
+ # Maximum number of bytes to read from socket in one call.
+ maxread = 4096
+ def readexactly(sz):
+ """Read and return a string of length `sz`."""
+ data = read(sz)
+ if len(data) == sz:
+ # Fast path, no additional copying to/from buffer
+ return data
+ else:
+ buf = cStringIO.StringIO(data)
+ sz -= len(data)
+ while sz > 0:
+ # It is recommended
+ data = read(min(sz, maxread))
+ if not data:
+ raise socket.error("read too few bytes")
+ sz -= len(data)
+ buf.write(data)
+ return buf.getvalue()
+ return readexactly
+
+class tcpclient(transport.clientproxy):
+ """Sends queries to server using TCP sockets directly."""
+
+ def __init__(self, uri, protoclass):
+ parsed = urlparse.urlparse(uri, scheme='http')
+ self.port = parsed.port
+ self.host = parsed.hostname
+ self.protoclass = protoclass
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ def request(self, queryname, args):
+ """Send a single query to the server and return the response.
+
+ The arguments (de)serialization is done behind the scenes by the
+ provided protocol.
+
+ Blocks until the complete response is returned.
+ """
+ self._sock.connect((self.host, self.port))
+ try:
+ proto = self.protoclass(exactreader(self._sock.recv),
+ self._sock.sendall)
+ proto.serialize([queryname, args])
+ return proto.deserialize()
+ finally:
+ self._sock.close()
+
+class tcphandler(SocketServer.StreamRequestHandler):
+ """Handles single connection to the tcpserver."""
+
+ def handle(self):
+ """Read the query and return history server response.
+
+ The arguments (de)serialization is done behind the scences by the
+ provided protocol.
+ """
+ proto = self.server._protoclass(exactreader(self.rfile.read),
+ self.wfile.write)
+ queryname, args = proto.deserialize()
+ ret = getattr(self.server._mserver, queryname)(*args)
+ proto.serialize(ret)
+
+class tcpserver(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
+ """Server for the history queries."""
+ def __init__(self, port, mserver, protoclass):
+ SocketServer.TCPServer.__init__(self, ('', port), tcphandler)
+ self._mserver = mserver
+ self._protoclass = protoclass
diff --git a/tests/test-speedy.t b/tests/test-speedy.t
--- a/tests/test-speedy.t
+++ b/tests/test-speedy.t
@@ -134,3 +134,48 @@
chg1
chg0
+Testing socket server
+
+Writing server config file
+ $ cat >> $TESTTMP/serverrepo/.hg/hgrc <<EOF_END
+ > [speedy]
+ > port = 8123
+ > EOF_END
+
+Writing local config file
+ $ cat >> $TESTTMP/localrepo/.hg/hgrc <<EOF_END
+ > [speedy]
+ > host = localhost:8123
+ > EOF_END
+
+ $ cd $TESTTMP/serverrepo
+
+ $ (
+ > hg metaserve > out 2>&1 &
+ > METASERVER_PID=$!
+ > echo $METASERVER_PID 1> pidfile
+ > ) > /dev/null 2>&1
+ $ sleep 1
+
+ $ cd $TESTTMP/localrepo
+
+ $ hg log -r "reverse(user(testuser1))"
+ chg8
+ chgl6
+ chg5
+ chg4
+ chg3
+ chg2
+ chg0
+
+ $ cat >> $TESTTMP/localrepo/.hg/hgrc <<EOF_END
+ > [speedy]
+ > client = False
+ > EOF_END
+
+ $ cd $TESTTMP/serverrepo
+
+ $ kill `cat pidfile`
+ $ cat out
+ listening on port 8123
+ killed!
More information about the Mercurial-devel
mailing list