D3230: wireproto: extract HTTP version 2 code to own module

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Apr 11 13:00:43 EDT 2018


This revision was automatically updated to reflect the committed changes.
Closed by commit rHG93397c4633f6: wireproto: extract HTTP version 2 code to own module (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3230?vs=7948&id=7991

REVISION DETAIL
  https://phab.mercurial-scm.org/D3230

AFFECTED FILES
  mercurial/httppeer.py
  mercurial/wireprotoserver.py
  mercurial/wireprotov2server.py
  tests/test-check-interfaces.py

CHANGE DETAILS

diff --git a/tests/test-check-interfaces.py b/tests/test-check-interfaces.py
--- a/tests/test-check-interfaces.py
+++ b/tests/test-check-interfaces.py
@@ -23,6 +23,7 @@
     vfs as vfsmod,
     wireprotoserver,
     wireprototypes,
+    wireprotov2server,
 )
 
 rootdir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
@@ -125,16 +126,16 @@
     ziverify.verifyClass(wireprototypes.baseprotocolhandler,
                          wireprotoserver.httpv1protocolhandler)
     ziverify.verifyClass(wireprototypes.baseprotocolhandler,
-                         wireprotoserver.httpv2protocolhandler)
+                         wireprotov2server.httpv2protocolhandler)
 
     sshv1 = wireprotoserver.sshv1protocolhandler(None, None, None)
     checkzobject(sshv1)
     sshv2 = wireprotoserver.sshv2protocolhandler(None, None, None)
     checkzobject(sshv2)
 
     httpv1 = wireprotoserver.httpv1protocolhandler(None, None, None)
     checkzobject(httpv1)
-    httpv2 = wireprotoserver.httpv2protocolhandler(None, None)
+    httpv2 = wireprotov2server.httpv2protocolhandler(None, None)
     checkzobject(httpv2)
 
     ziverify.verifyClass(repository.ifilestorage, filelog.filelog)
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotov2server.py
copy from mercurial/wireprotoserver.py
copy to mercurial/wireprotov2server.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotov2server.py
@@ -7,9 +7,6 @@
 from __future__ import absolute_import
 
 import contextlib
-import struct
-import sys
-import threading
 
 from .i18n import _
 from .thirdparty import (
@@ -19,279 +16,18 @@
     interface as zi,
 )
 from . import (
-    encoding,
     error,
-    hook,
     pycompat,
-    util,
     wireproto,
     wireprotoframing,
     wireprototypes,
 )
-from .utils import (
-    procutil,
-)
 
-stringio = util.stringio
-
-urlerr = util.urlerr
-urlreq = util.urlreq
-
-HTTP_OK = 200
-
-HGTYPE = 'application/mercurial-0.1'
-HGTYPE2 = 'application/mercurial-0.2'
-HGERRTYPE = 'application/hg-error'
 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
 
 HTTPV2 = wireprototypes.HTTPV2
-SSHV1 = wireprototypes.SSHV1
-SSHV2 = wireprototypes.SSHV2
 
-def decodevaluefromheaders(req, headerprefix):
-    """Decode a long value from multiple HTTP request headers.
-
-    Returns the value as a bytes, not a str.
-    """
-    chunks = []
-    i = 1
-    while True:
-        v = req.headers.get(b'%s-%d' % (headerprefix, i))
-        if v is None:
-            break
-        chunks.append(pycompat.bytesurl(v))
-        i += 1
-
-    return ''.join(chunks)
-
- at zi.implementer(wireprototypes.baseprotocolhandler)
-class httpv1protocolhandler(object):
-    def __init__(self, req, ui, checkperm):
-        self._req = req
-        self._ui = ui
-        self._checkperm = checkperm
-        self._protocaps = None
-
-    @property
-    def name(self):
-        return 'http-v1'
-
-    def getargs(self, args):
-        knownargs = self._args()
-        data = {}
-        keys = args.split()
-        for k in keys:
-            if k == '*':
-                star = {}
-                for key in knownargs.keys():
-                    if key != 'cmd' and key not in keys:
-                        star[key] = knownargs[key][0]
-                data['*'] = star
-            else:
-                data[k] = knownargs[k][0]
-        return [data[k] for k in keys]
-
-    def _args(self):
-        args = self._req.qsparams.asdictoflists()
-        postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
-        if postlen:
-            args.update(urlreq.parseqs(
-                self._req.bodyfh.read(postlen), keep_blank_values=True))
-            return args
-
-        argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
-        args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
-        return args
-
-    def getprotocaps(self):
-        if self._protocaps is None:
-            value = decodevaluefromheaders(self._req, r'X-HgProto')
-            self._protocaps = set(value.split(' '))
-        return self._protocaps
-
-    def getpayload(self):
-        # Existing clients *always* send Content-Length.
-        length = int(self._req.headers[b'Content-Length'])
-
-        # If httppostargs is used, we need to read Content-Length
-        # minus the amount that was consumed by args.
-        length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
-        return util.filechunkiter(self._req.bodyfh, limit=length)
-
-    @contextlib.contextmanager
-    def mayberedirectstdio(self):
-        oldout = self._ui.fout
-        olderr = self._ui.ferr
-
-        out = util.stringio()
-
-        try:
-            self._ui.fout = out
-            self._ui.ferr = out
-            yield out
-        finally:
-            self._ui.fout = oldout
-            self._ui.ferr = olderr
-
-    def client(self):
-        return 'remote:%s:%s:%s' % (
-            self._req.urlscheme,
-            urlreq.quote(self._req.remotehost or ''),
-            urlreq.quote(self._req.remoteuser or ''))
-
-    def addcapabilities(self, repo, caps):
-        caps.append(b'batch')
-
-        caps.append('httpheader=%d' %
-                    repo.ui.configint('server', 'maxhttpheaderlen'))
-        if repo.ui.configbool('experimental', 'httppostargs'):
-            caps.append('httppostargs')
-
-        # FUTURE advertise 0.2rx once support is implemented
-        # FUTURE advertise minrx and mintx after consulting config option
-        caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
-
-        compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
-        if compengines:
-            comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
-                                 for e in compengines)
-            caps.append('compression=%s' % comptypes)
-
-        return caps
-
-    def checkperm(self, perm):
-        return self._checkperm(perm)
-
-# This method exists mostly so that extensions like remotefilelog can
-# disable a kludgey legacy method only over http. As of early 2018,
-# there are no other known users, so with any luck we can discard this
-# hook if remotefilelog becomes a first-party extension.
-def iscmd(cmd):
-    return cmd in wireproto.commands
-
-def handlewsgirequest(rctx, req, res, checkperm):
-    """Possibly process a wire protocol request.
-
-    If the current request is a wire protocol request, the request is
-    processed by this function.
-
-    ``req`` is a ``parsedrequest`` instance.
-    ``res`` is a ``wsgiresponse`` instance.
-
-    Returns a bool indicating if the request was serviced. If set, the caller
-    should stop processing the request, as a response has already been issued.
-    """
-    # Avoid cycle involving hg module.
-    from .hgweb import common as hgwebcommon
-
-    repo = rctx.repo
-
-    # HTTP version 1 wire protocol requests are denoted by a "cmd" query
-    # string parameter. If it isn't present, this isn't a wire protocol
-    # request.
-    if 'cmd' not in req.qsparams:
-        return False
-
-    cmd = req.qsparams['cmd']
-
-    # The "cmd" request parameter is used by both the wire protocol and hgweb.
-    # While not all wire protocol commands are available for all transports,
-    # if we see a "cmd" value that resembles a known wire protocol command, we
-    # route it to a protocol handler. This is better than routing possible
-    # wire protocol requests to hgweb because it prevents hgweb from using
-    # known wire protocol commands and it is less confusing for machine
-    # clients.
-    if not iscmd(cmd):
-        return False
-
-    # The "cmd" query string argument is only valid on the root path of the
-    # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
-    # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
-    # in this case. We send an HTTP 404 for backwards compatibility reasons.
-    if req.dispatchpath:
-        res.status = hgwebcommon.statusmessage(404)
-        res.headers['Content-Type'] = HGTYPE
-        # TODO This is not a good response to issue for this request. This
-        # is mostly for BC for now.
-        res.setbodybytes('0\n%s\n' % b'Not Found')
-        return True
-
-    proto = httpv1protocolhandler(req, repo.ui,
-                                  lambda perm: checkperm(rctx, req, perm))
-
-    # The permissions checker should be the only thing that can raise an
-    # ErrorResponse. It is kind of a layer violation to catch an hgweb
-    # exception here. So consider refactoring into a exception type that
-    # is associated with the wire protocol.
-    try:
-        _callhttp(repo, req, res, proto, cmd)
-    except hgwebcommon.ErrorResponse as e:
-        for k, v in e.headers:
-            res.headers[k] = v
-        res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
-        # TODO This response body assumes the failed command was
-        # "unbundle." That assumption is not always valid.
-        res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
-
-    return True
-
-def handlewsgiapirequest(rctx, req, res, checkperm):
-    """Handle requests to /api/*."""
-    assert req.dispatchparts[0] == b'api'
-
-    repo = rctx.repo
-
-    # This whole URL space is experimental for now. But we want to
-    # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
-    if not repo.ui.configbool('experimental', 'web.apiserver'):
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('Experimental API server endpoint not enabled'))
-        return
-
-    # The URL space is /api/<protocol>/*. The structure of URLs under varies
-    # by <protocol>.
-
-    # Registered APIs are made available via config options of the name of
-    # the protocol.
-    availableapis = set()
-    for k, v in API_HANDLERS.items():
-        section, option = v['config']
-        if repo.ui.configbool(section, option):
-            availableapis.add(k)
-
-    # Requests to /api/ list available APIs.
-    if req.dispatchparts == [b'api']:
-        res.status = b'200 OK'
-        res.headers[b'Content-Type'] = b'text/plain'
-        lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
-                   'one of the following:\n')]
-        if availableapis:
-            lines.extend(sorted(availableapis))
-        else:
-            lines.append(_('(no available APIs)\n'))
-        res.setbodybytes(b'\n'.join(lines))
-        return
-
-    proto = req.dispatchparts[1]
-
-    if proto not in API_HANDLERS:
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
-            proto, b', '.join(sorted(availableapis))))
-        return
-
-    if proto not in availableapis:
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('API %s not enabled\n') % proto)
-        return
-
-    API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
-                                   req.dispatchparts[2:])
-
-def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
+def handlehttpv2request(rctx, req, res, checkperm, urlparts):
     from .hgweb import common as hgwebcommon
 
     # URL space looks like: <permissions>/<command>, where <permission> can
@@ -585,14 +321,6 @@
         raise error.ProgrammingError('unhandled event from reactor: %s' %
                                      action)
 
-# Maps API name to metadata so custom API can be registered.
-API_HANDLERS = {
-    HTTPV2: {
-        'config': ('experimental', 'web.api.http-v2'),
-        'handler': _handlehttpv2request,
-    },
-}
-
 @zi.implementer(wireprototypes.baseprotocolhandler)
 class httpv2protocolhandler(object):
     def __init__(self, req, ui, args=None):
@@ -634,445 +362,3 @@
 
     def checkperm(self, perm):
         raise NotImplementedError
-
-def _httpresponsetype(ui, proto, prefer_uncompressed):
-    """Determine the appropriate response type and compression settings.
-
-    Returns a tuple of (mediatype, compengine, engineopts).
-    """
-    # Determine the response media type and compression engine based
-    # on the request parameters.
-
-    if '0.2' in proto.getprotocaps():
-        # All clients are expected to support uncompressed data.
-        if prefer_uncompressed:
-            return HGTYPE2, util._noopengine(), {}
-
-        # Now find an agreed upon compression format.
-        compformats = wireproto.clientcompressionsupport(proto)
-        for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
-            if engine.wireprotosupport().name in compformats:
-                opts = {}
-                level = ui.configint('server', '%slevel' % engine.name())
-                if level is not None:
-                    opts['level'] = level
-
-                return HGTYPE2, engine, opts
-
-        # No mutually supported compression format. Fall back to the
-        # legacy protocol.
-
-    # Don't allow untrusted settings because disabling compression or
-    # setting a very high compression level could lead to flooding
-    # the server's network or CPU.
-    opts = {'level': ui.configint('server', 'zliblevel')}
-    return HGTYPE, util.compengines['zlib'], opts
-
-def _callhttp(repo, req, res, proto, cmd):
-    # Avoid cycle involving hg module.
-    from .hgweb import common as hgwebcommon
-
-    def genversion2(gen, engine, engineopts):
-        # application/mercurial-0.2 always sends a payload header
-        # identifying the compression engine.
-        name = engine.wireprotosupport().name
-        assert 0 < len(name) < 256
-        yield struct.pack('B', len(name))
-        yield name
-
-        for chunk in gen:
-            yield chunk
-
-    def setresponse(code, contenttype, bodybytes=None, bodygen=None):
-        if code == HTTP_OK:
-            res.status = '200 Script output follows'
-        else:
-            res.status = hgwebcommon.statusmessage(code)
-
-        res.headers['Content-Type'] = contenttype
-
-        if bodybytes is not None:
-            res.setbodybytes(bodybytes)
-        if bodygen is not None:
-            res.setbodygen(bodygen)
-
-    if not wireproto.commands.commandavailable(cmd, proto):
-        setresponse(HTTP_OK, HGERRTYPE,
-                    _('requested wire protocol command is not available over '
-                      'HTTP'))
-        return
-
-    proto.checkperm(wireproto.commands[cmd].permission)
-
-    rsp = wireproto.dispatch(repo, proto, cmd)
-
-    if isinstance(rsp, bytes):
-        setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
-    elif isinstance(rsp, wireprototypes.bytesresponse):
-        setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
-    elif isinstance(rsp, wireprototypes.streamreslegacy):
-        setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
-    elif isinstance(rsp, wireprototypes.streamres):
-        gen = rsp.gen
-
-        # This code for compression should not be streamres specific. It
-        # is here because we only compress streamres at the moment.
-        mediatype, engine, engineopts = _httpresponsetype(
-            repo.ui, proto, rsp.prefer_uncompressed)
-        gen = engine.compressstream(gen, engineopts)
-
-        if mediatype == HGTYPE2:
-            gen = genversion2(gen, engine, engineopts)
-
-        setresponse(HTTP_OK, mediatype, bodygen=gen)
-    elif isinstance(rsp, wireprototypes.pushres):
-        rsp = '%d\n%s' % (rsp.res, rsp.output)
-        setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
-    elif isinstance(rsp, wireprototypes.pusherr):
-        rsp = '0\n%s\n' % rsp.res
-        res.drain = True
-        setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
-    elif isinstance(rsp, wireprototypes.ooberror):
-        setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
-    else:
-        raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
-
-def _sshv1respondbytes(fout, value):
-    """Send a bytes response for protocol version 1."""
-    fout.write('%d\n' % len(value))
-    fout.write(value)
-    fout.flush()
-
-def _sshv1respondstream(fout, source):
-    write = fout.write
-    for chunk in source.gen:
-        write(chunk)
-    fout.flush()
-
-def _sshv1respondooberror(fout, ferr, rsp):
-    ferr.write(b'%s\n-\n' % rsp)
-    ferr.flush()
-    fout.write(b'\n')
-    fout.flush()
-
- at zi.implementer(wireprototypes.baseprotocolhandler)
-class sshv1protocolhandler(object):
-    """Handler for requests services via version 1 of SSH protocol."""
-    def __init__(self, ui, fin, fout):
-        self._ui = ui
-        self._fin = fin
-        self._fout = fout
-        self._protocaps = set()
-
-    @property
-    def name(self):
-        return wireprototypes.SSHV1
-
-    def getargs(self, args):
-        data = {}
-        keys = args.split()
-        for n in xrange(len(keys)):
-            argline = self._fin.readline()[:-1]
-            arg, l = argline.split()
-            if arg not in keys:
-                raise error.Abort(_("unexpected parameter %r") % arg)
-            if arg == '*':
-                star = {}
-                for k in xrange(int(l)):
-                    argline = self._fin.readline()[:-1]
-                    arg, l = argline.split()
-                    val = self._fin.read(int(l))
-                    star[arg] = val
-                data['*'] = star
-            else:
-                val = self._fin.read(int(l))
-                data[arg] = val
-        return [data[k] for k in keys]
-
-    def getprotocaps(self):
-        return self._protocaps
-
-    def getpayload(self):
-        # We initially send an empty response. This tells the client it is
-        # OK to start sending data. If a client sees any other response, it
-        # interprets it as an error.
-        _sshv1respondbytes(self._fout, b'')
-
-        # The file is in the form:
-        #
-        # <chunk size>\n<chunk>
-        # ...
-        # 0\n
-        count = int(self._fin.readline())
-        while count:
-            yield self._fin.read(count)
-            count = int(self._fin.readline())
-
-    @contextlib.contextmanager
-    def mayberedirectstdio(self):
-        yield None
-
-    def client(self):
-        client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
-        return 'remote:ssh:' + client
-
-    def addcapabilities(self, repo, caps):
-        if self.name == wireprototypes.SSHV1:
-            caps.append(b'protocaps')
-        caps.append(b'batch')
-        return caps
-
-    def checkperm(self, perm):
-        pass
-
-class sshv2protocolhandler(sshv1protocolhandler):
-    """Protocol handler for version 2 of the SSH protocol."""
-
-    @property
-    def name(self):
-        return wireprototypes.SSHV2
-
-    def addcapabilities(self, repo, caps):
-        return caps
-
-def _runsshserver(ui, repo, fin, fout, ev):
-    # This function operates like a state machine of sorts. The following
-    # states are defined:
-    #
-    # protov1-serving
-    #    Server is in protocol version 1 serving mode. Commands arrive on
-    #    new lines. These commands are processed in this state, one command
-    #    after the other.
-    #
-    # protov2-serving
-    #    Server is in protocol version 2 serving mode.
-    #
-    # upgrade-initial
-    #    The server is going to process an upgrade request.
-    #
-    # upgrade-v2-filter-legacy-handshake
-    #    The protocol is being upgraded to version 2. The server is expecting
-    #    the legacy handshake from version 1.
-    #
-    # upgrade-v2-finish
-    #    The upgrade to version 2 of the protocol is imminent.
-    #
-    # shutdown
-    #    The server is shutting down, possibly in reaction to a client event.
-    #
-    # And here are their transitions:
-    #
-    # protov1-serving -> shutdown
-    #    When server receives an empty request or encounters another
-    #    error.
-    #
-    # protov1-serving -> upgrade-initial
-    #    An upgrade request line was seen.
-    #
-    # upgrade-initial -> upgrade-v2-filter-legacy-handshake
-    #    Upgrade to version 2 in progress. Server is expecting to
-    #    process a legacy handshake.
-    #
-    # upgrade-v2-filter-legacy-handshake -> shutdown
-    #    Client did not fulfill upgrade handshake requirements.
-    #
-    # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
-    #    Client fulfilled version 2 upgrade requirements. Finishing that
-    #    upgrade.
-    #
-    # upgrade-v2-finish -> protov2-serving
-    #    Protocol upgrade to version 2 complete. Server can now speak protocol
-    #    version 2.
-    #
-    # protov2-serving -> protov1-serving
-    #    Ths happens by default since protocol version 2 is the same as
-    #    version 1 except for the handshake.
-
-    state = 'protov1-serving'
-    proto = sshv1protocolhandler(ui, fin, fout)
-    protoswitched = False
-
-    while not ev.is_set():
-        if state == 'protov1-serving':
-            # Commands are issued on new lines.
-            request = fin.readline()[:-1]
-
-            # Empty lines signal to terminate the connection.
-            if not request:
-                state = 'shutdown'
-                continue
-
-            # It looks like a protocol upgrade request. Transition state to
-            # handle it.
-            if request.startswith(b'upgrade '):
-                if protoswitched:
-                    _sshv1respondooberror(fout, ui.ferr,
-                                          b'cannot upgrade protocols multiple '
-                                          b'times')
-                    state = 'shutdown'
-                    continue
-
-                state = 'upgrade-initial'
-                continue
-
-            available = wireproto.commands.commandavailable(request, proto)
-
-            # This command isn't available. Send an empty response and go
-            # back to waiting for a new command.
-            if not available:
-                _sshv1respondbytes(fout, b'')
-                continue
-
-            rsp = wireproto.dispatch(repo, proto, request)
-
-            if isinstance(rsp, bytes):
-                _sshv1respondbytes(fout, rsp)
-            elif isinstance(rsp, wireprototypes.bytesresponse):
-                _sshv1respondbytes(fout, rsp.data)
-            elif isinstance(rsp, wireprototypes.streamres):
-                _sshv1respondstream(fout, rsp)
-            elif isinstance(rsp, wireprototypes.streamreslegacy):
-                _sshv1respondstream(fout, rsp)
-            elif isinstance(rsp, wireprototypes.pushres):
-                _sshv1respondbytes(fout, b'')
-                _sshv1respondbytes(fout, b'%d' % rsp.res)
-            elif isinstance(rsp, wireprototypes.pusherr):
-                _sshv1respondbytes(fout, rsp.res)
-            elif isinstance(rsp, wireprototypes.ooberror):
-                _sshv1respondooberror(fout, ui.ferr, rsp.message)
-            else:
-                raise error.ProgrammingError('unhandled response type from '
-                                             'wire protocol command: %s' % rsp)
-
-        # For now, protocol version 2 serving just goes back to version 1.
-        elif state == 'protov2-serving':
-            state = 'protov1-serving'
-            continue
-
-        elif state == 'upgrade-initial':
-            # We should never transition into this state if we've switched
-            # protocols.
-            assert not protoswitched
-            assert proto.name == wireprototypes.SSHV1
-
-            # Expected: upgrade <token> <capabilities>
-            # If we get something else, the request is malformed. It could be
-            # from a future client that has altered the upgrade line content.
-            # We treat this as an unknown command.
-            try:
-                token, caps = request.split(b' ')[1:]
-            except ValueError:
-                _sshv1respondbytes(fout, b'')
-                state = 'protov1-serving'
-                continue
-
-            # Send empty response if we don't support upgrading protocols.
-            if not ui.configbool('experimental', 'sshserver.support-v2'):
-                _sshv1respondbytes(fout, b'')
-                state = 'protov1-serving'
-                continue
-
-            try:
-                caps = urlreq.parseqs(caps)
-            except ValueError:
-                _sshv1respondbytes(fout, b'')
-                state = 'protov1-serving'
-                continue
-
-            # We don't see an upgrade request to protocol version 2. Ignore
-            # the upgrade request.
-            wantedprotos = caps.get(b'proto', [b''])[0]
-            if SSHV2 not in wantedprotos:
-                _sshv1respondbytes(fout, b'')
-                state = 'protov1-serving'
-                continue
-
-            # It looks like we can honor this upgrade request to protocol 2.
-            # Filter the rest of the handshake protocol request lines.
-            state = 'upgrade-v2-filter-legacy-handshake'
-            continue
-
-        elif state == 'upgrade-v2-filter-legacy-handshake':
-            # Client should have sent legacy handshake after an ``upgrade``
-            # request. Expected lines:
-            #
-            #    hello
-            #    between
-            #    pairs 81
-            #    0000...-0000...
-
-            ok = True
-            for line in (b'hello', b'between', b'pairs 81'):
-                request = fin.readline()[:-1]
-
-                if request != line:
-                    _sshv1respondooberror(fout, ui.ferr,
-                                          b'malformed handshake protocol: '
-                                          b'missing %s' % line)
-                    ok = False
-                    state = 'shutdown'
-                    break
-
-            if not ok:
-                continue
-
-            request = fin.read(81)
-            if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
-                _sshv1respondooberror(fout, ui.ferr,
-                                      b'malformed handshake protocol: '
-                                      b'missing between argument value')
-                state = 'shutdown'
-                continue
-
-            state = 'upgrade-v2-finish'
-            continue
-
-        elif state == 'upgrade-v2-finish':
-            # Send the upgrade response.
-            fout.write(b'upgraded %s %s\n' % (token, SSHV2))
-            servercaps = wireproto.capabilities(repo, proto)
-            rsp = b'capabilities: %s' % servercaps.data
-            fout.write(b'%d\n%s\n' % (len(rsp), rsp))
-            fout.flush()
-
-            proto = sshv2protocolhandler(ui, fin, fout)
-            protoswitched = True
-
-            state = 'protov2-serving'
-            continue
-
-        elif state == 'shutdown':
-            break
-
-        else:
-            raise error.ProgrammingError('unhandled ssh server state: %s' %
-                                         state)
-
-class sshserver(object):
-    def __init__(self, ui, repo, logfh=None):
-        self._ui = ui
-        self._repo = repo
-        self._fin = ui.fin
-        self._fout = ui.fout
-
-        # Log write I/O to stdout and stderr if configured.
-        if logfh:
-            self._fout = util.makeloggingfileobject(
-                logfh, self._fout, 'o', logdata=True)
-            ui.ferr = util.makeloggingfileobject(
-                logfh, ui.ferr, 'e', logdata=True)
-
-        hook.redirect(True)
-        ui.fout = repo.ui.fout = ui.ferr
-
-        # Prevent insertion/deletion of CRs
-        procutil.setbinary(self._fin)
-        procutil.setbinary(self._fout)
-
-    def serve_forever(self):
-        self.serveuntil(threading.Event())
-        sys.exit(0)
-
-    def serveuntil(self, ev):
-        """Serve until a threading.Event is set."""
-        _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -12,9 +12,6 @@
 import threading
 
 from .i18n import _
-from .thirdparty import (
-    cbor,
-)
 from .thirdparty.zope import (
     interface as zi,
 )
@@ -25,8 +22,8 @@
     pycompat,
     util,
     wireproto,
-    wireprotoframing,
     wireprototypes,
+    wireprotov2server,
 )
 from .utils import (
     procutil,
@@ -42,9 +39,7 @@
 HGTYPE = 'application/mercurial-0.1'
 HGTYPE2 = 'application/mercurial-0.2'
 HGERRTYPE = 'application/hg-error'
-FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
 
-HTTPV2 = wireprototypes.HTTPV2
 SSHV1 = wireprototypes.SSHV1
 SSHV2 = wireprototypes.SSHV2
 
@@ -291,350 +286,14 @@
     API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
                                    req.dispatchparts[2:])
 
-def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
-    from .hgweb import common as hgwebcommon
-
-    # URL space looks like: <permissions>/<command>, where <permission> can
-    # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
-
-    # Root URL does nothing meaningful... yet.
-    if not urlparts:
-        res.status = b'200 OK'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('HTTP version 2 API handler'))
-        return
-
-    if len(urlparts) == 1:
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('do not know how to process %s\n') %
-                         req.dispatchpath)
-        return
-
-    permission, command = urlparts[0:2]
-
-    if permission not in (b'ro', b'rw'):
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('unknown permission: %s') % permission)
-        return
-
-    if req.method != 'POST':
-        res.status = b'405 Method Not Allowed'
-        res.headers[b'Allow'] = b'POST'
-        res.setbodybytes(_('commands require POST requests'))
-        return
-
-    # At some point we'll want to use our own API instead of recycling the
-    # behavior of version 1 of the wire protocol...
-    # TODO return reasonable responses - not responses that overload the
-    # HTTP status line message for error reporting.
-    try:
-        checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
-    except hgwebcommon.ErrorResponse as e:
-        res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
-        for k, v in e.headers:
-            res.headers[k] = v
-        res.setbodybytes('permission denied')
-        return
-
-    # We have a special endpoint to reflect the request back at the client.
-    if command == b'debugreflect':
-        _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
-        return
-
-    # Extra commands that we handle that aren't really wire protocol
-    # commands. Think extra hard before making this hackery available to
-    # extension.
-    extracommands = {'multirequest'}
-
-    if command not in wireproto.commandsv2 and command not in extracommands:
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
-        return
-
-    repo = rctx.repo
-    ui = repo.ui
-
-    proto = httpv2protocolhandler(req, ui)
-
-    if (not wireproto.commandsv2.commandavailable(command, proto)
-        and command not in extracommands):
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('invalid wire protocol command: %s') % command)
-        return
-
-    # TODO consider cases where proxies may add additional Accept headers.
-    if req.headers.get(b'Accept') != FRAMINGTYPE:
-        res.status = b'406 Not Acceptable'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
-                           % FRAMINGTYPE)
-        return
-
-    if req.headers.get(b'Content-Type') != FRAMINGTYPE:
-        res.status = b'415 Unsupported Media Type'
-        # TODO we should send a response with appropriate media type,
-        # since client does Accept it.
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('client MUST send Content-Type header with '
-                           'value: %s\n') % FRAMINGTYPE)
-        return
-
-    _processhttpv2request(ui, repo, req, res, permission, command, proto)
-
-def _processhttpv2reflectrequest(ui, repo, req, res):
-    """Reads unified frame protocol request and dumps out state to client.
-
-    This special endpoint can be used to help debug the wire protocol.
-
-    Instead of routing the request through the normal dispatch mechanism,
-    we instead read all frames, decode them, and feed them into our state
-    tracker. We then dump the log of all that activity back out to the
-    client.
-    """
-    import json
-
-    # Reflection APIs have a history of being abused, accidentally disclosing
-    # sensitive data, etc. So we have a config knob.
-    if not ui.configbool('experimental', 'web.api.debugreflect'):
-        res.status = b'404 Not Found'
-        res.headers[b'Content-Type'] = b'text/plain'
-        res.setbodybytes(_('debugreflect service not available'))
-        return
-
-    # We assume we have a unified framing protocol request body.
-
-    reactor = wireprotoframing.serverreactor()
-    states = []
-
-    while True:
-        frame = wireprotoframing.readframe(req.bodyfh)
-
-        if not frame:
-            states.append(b'received: <no frame>')
-            break
-
-        states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
-                                                  frame.requestid,
-                                                  frame.payload))
-
-        action, meta = reactor.onframerecv(frame)
-        states.append(json.dumps((action, meta), sort_keys=True,
-                                 separators=(', ', ': ')))
-
-    action, meta = reactor.oninputeof()
-    meta['action'] = action
-    states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
-
-    res.status = b'200 OK'
-    res.headers[b'Content-Type'] = b'text/plain'
-    res.setbodybytes(b'\n'.join(states))
-
-def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
-    """Post-validation handler for HTTPv2 requests.
-
-    Called when the HTTP request contains unified frame-based protocol
-    frames for evaluation.
-    """
-    # TODO Some HTTP clients are full duplex and can receive data before
-    # the entire request is transmitted. Figure out a way to indicate support
-    # for that so we can opt into full duplex mode.
-    reactor = wireprotoframing.serverreactor(deferoutput=True)
-    seencommand = False
-
-    outstream = reactor.makeoutputstream()
-
-    while True:
-        frame = wireprotoframing.readframe(req.bodyfh)
-        if not frame:
-            break
-
-        action, meta = reactor.onframerecv(frame)
-
-        if action == 'wantframe':
-            # Need more data before we can do anything.
-            continue
-        elif action == 'runcommand':
-            sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
-                                           reqcommand, reactor, outstream,
-                                           meta, issubsequent=seencommand)
-
-            if sentoutput:
-                return
-
-            seencommand = True
-
-        elif action == 'error':
-            # TODO define proper error mechanism.
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(meta['message'] + b'\n')
-            return
-        else:
-            raise error.ProgrammingError(
-                'unhandled action from frame processor: %s' % action)
-
-    action, meta = reactor.oninputeof()
-    if action == 'sendframes':
-        # We assume we haven't started sending the response yet. If we're
-        # wrong, the response type will raise an exception.
-        res.status = b'200 OK'
-        res.headers[b'Content-Type'] = FRAMINGTYPE
-        res.setbodygen(meta['framegen'])
-    elif action == 'noop':
-        pass
-    else:
-        raise error.ProgrammingError('unhandled action from frame processor: %s'
-                                     % action)
-
-def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
-                      outstream, command, issubsequent):
-    """Dispatch a wire protocol command made from HTTPv2 requests.
-
-    The authenticated permission (``authedperm``) along with the original
-    command from the URL (``reqcommand``) are passed in.
-    """
-    # We already validated that the session has permissions to perform the
-    # actions in ``authedperm``. In the unified frame protocol, the canonical
-    # command to run is expressed in a frame. However, the URL also requested
-    # to run a specific command. We need to be careful that the command we
-    # run doesn't have permissions requirements greater than what was granted
-    # by ``authedperm``.
-    #
-    # Our rule for this is we only allow one command per HTTP request and
-    # that command must match the command in the URL. However, we make
-    # an exception for the ``multirequest`` URL. This URL is allowed to
-    # execute multiple commands. We double check permissions of each command
-    # as it is invoked to ensure there is no privilege escalation.
-    # TODO consider allowing multiple commands to regular command URLs
-    # iff each command is the same.
-
-    proto = httpv2protocolhandler(req, ui, args=command['args'])
-
-    if reqcommand == b'multirequest':
-        if not wireproto.commandsv2.commandavailable(command['command'], proto):
-            # TODO proper error mechanism
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('wire protocol command not available: %s') %
-                             command['command'])
-            return True
-
-        # TODO don't use assert here, since it may be elided by -O.
-        assert authedperm in (b'ro', b'rw')
-        wirecommand = wireproto.commandsv2[command['command']]
-        assert wirecommand.permission in ('push', 'pull')
-
-        if authedperm == b'ro' and wirecommand.permission != 'pull':
-            # TODO proper error mechanism
-            res.status = b'403 Forbidden'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('insufficient permissions to execute '
-                               'command: %s') % command['command'])
-            return True
-
-        # TODO should we also call checkperm() here? Maybe not if we're going
-        # to overhaul that API. The granted scope from the URL check should
-        # be good enough.
-
-    else:
-        # Don't allow multiple commands outside of ``multirequest`` URL.
-        if issubsequent:
-            # TODO proper error mechanism
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('multiple commands cannot be issued to this '
-                               'URL'))
-            return True
-
-        if reqcommand != command['command']:
-            # TODO define proper error mechanism
-            res.status = b'200 OK'
-            res.headers[b'Content-Type'] = b'text/plain'
-            res.setbodybytes(_('command in frame must match command in URL'))
-            return True
-
-    rsp = wireproto.dispatch(repo, proto, command['command'])
-
-    res.status = b'200 OK'
-    res.headers[b'Content-Type'] = FRAMINGTYPE
-
-    if isinstance(rsp, wireprototypes.bytesresponse):
-        action, meta = reactor.onbytesresponseready(outstream,
-                                                    command['requestid'],
-                                                    rsp.data)
-    elif isinstance(rsp, wireprototypes.cborresponse):
-        encoded = cbor.dumps(rsp.value, canonical=True)
-        action, meta = reactor.onbytesresponseready(outstream,
-                                                    command['requestid'],
-                                                    encoded,
-                                                    iscbor=True)
-    else:
-        action, meta = reactor.onapplicationerror(
-            _('unhandled response type from wire proto command'))
-
-    if action == 'sendframes':
-        res.setbodygen(meta['framegen'])
-        return True
-    elif action == 'noop':
-        return False
-    else:
-        raise error.ProgrammingError('unhandled event from reactor: %s' %
-                                     action)
-
 # Maps API name to metadata so custom API can be registered.
 API_HANDLERS = {
-    HTTPV2: {
+    wireprotov2server.HTTPV2: {
         'config': ('experimental', 'web.api.http-v2'),
-        'handler': _handlehttpv2request,
+        'handler': wireprotov2server.handlehttpv2request,
     },
 }
 
- at zi.implementer(wireprototypes.baseprotocolhandler)
-class httpv2protocolhandler(object):
-    def __init__(self, req, ui, args=None):
-        self._req = req
-        self._ui = ui
-        self._args = args
-
-    @property
-    def name(self):
-        return HTTPV2
-
-    def getargs(self, args):
-        data = {}
-        for k, typ in args.items():
-            if k == '*':
-                raise NotImplementedError('do not support * args')
-            elif k in self._args:
-                # TODO consider validating value types.
-                data[k] = self._args[k]
-
-        return data
-
-    def getprotocaps(self):
-        # Protocol capabilities are currently not implemented for HTTP V2.
-        return set()
-
-    def getpayload(self):
-        raise NotImplementedError
-
-    @contextlib.contextmanager
-    def mayberedirectstdio(self):
-        raise NotImplementedError
-
-    def client(self):
-        raise NotImplementedError
-
-    def addcapabilities(self, repo, caps):
-        return caps
-
-    def checkperm(self, perm):
-        raise NotImplementedError
-
 def _httpresponsetype(ui, proto, prefer_uncompressed):
     """Determine the appropriate response type and compression settings.
 
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -29,7 +29,7 @@
     util,
     wireproto,
     wireprotoframing,
-    wireprotoserver,
+    wireprotov2server,
 )
 
 httplib = util.httplib
@@ -504,13 +504,13 @@
             'pull': 'ro',
         }[permission]
 
-        url = '%s/api/%s/%s/%s' % (self.url, wireprotoserver.HTTPV2, permission,
-                                   name)
+        url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2,
+                                   permission, name)
 
         # TODO modify user-agent to reflect v2.
         headers = {
-            r'Accept': wireprotoserver.FRAMINGTYPE,
-            r'Content-Type': wireprotoserver.FRAMINGTYPE,
+            r'Accept': wireprotov2server.FRAMINGTYPE,
+            r'Content-Type': wireprotov2server.FRAMINGTYPE,
         }
 
         # TODO this should be part of a generic peer for the frame-based



To: indygreg, #hg-reviewers, durin42
Cc: durin42, mercurial-devel


More information about the Mercurial-devel mailing list