D4778: wireprotov2: client support for following content redirects

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Thu Sep 27 01:10:34 UTC 2018


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  And with the server actually sending content redirects, it is finally
  time to implement client support for following them!
  
  When a redirect response is seen, we wait until all data for that
  request has been received (it should be nearly immediate since no
  data is expected to follow the redirect message). Then we use
  a URL opener to make a request. We stuff that response into the
  client handler and construct a new response object to track it.
  
  When readdata() is called for servicing requests, we attempt to
  read data from the first redirected response. During data reading,
  data is processed similarly to as if it came from a frame payload.
  
  The existing test for the functionality demonstrates the client
  transparently following the redirect and obtaining the command
  response data from an alternate URL!
  
  There is still plenty of work to do here, including shoring up
  testing. I'm not convinced things will work in the presence of
  multiple redirect responses. And we don't yet implement support
  for integrity verification or configuring server certificates
  to validate the connection. But it's a start. And it should enable
  us to start experimenting with "real" caches.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D4778

AFFECTED FILES
  mercurial/httppeer.py
  mercurial/wireprotov2peer.py
  tests/test-wireproto-content-redirects.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-content-redirects.t b/tests/test-wireproto-content-redirects.t
--- a/tests/test-wireproto-content-redirects.t
+++ b/tests/test-wireproto-content-redirects.t
@@ -1354,8 +1354,33 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  abort: redirect responses not yet supported
-  [255]
+  (following redirect to http://*:$HGPORT/api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0) (glob)
+  s>     GET /api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0 HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     accept: application/mercurial-cbor\r\n
+  s>     host: *:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-cbor\r\n
+  s>     Content-Length: 91\r\n
+  s>     \r\n
+  s>     \xa1Jtotalitems\x01\xa2DnodeT\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&AGparents\x82T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
+  response: gen[
+    {
+      b'totalitems': 1
+    },
+    {
+      b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A',
+      b'parents': [
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+      ]
+    }
+  ]
 
   $ cat error.log
   $ killdaemons.py
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
--- a/mercurial/wireprotov2peer.py
+++ b/mercurial/wireprotov2peer.py
@@ -13,8 +13,12 @@
 from . import (
     encoding,
     error,
+    pycompat,
     sslutil,
+    url as urlmod,
+    util,
     wireprotoframing,
+    wireprototypes,
 )
 from .utils import (
     cborutil,
@@ -112,9 +116,10 @@
     events occur.
     """
 
-    def __init__(self, requestid, command):
+    def __init__(self, requestid, command, fromredirect=False):
         self.requestid = requestid
         self.command = command
+        self.fromredirect = fromredirect
 
         # Whether all remote input related to this command has been
         # received.
@@ -132,6 +137,7 @@
         self._pendingevents = []
         self._decoder = cborutil.bufferingdecoder()
         self._seeninitial = False
+        self._redirect = None
 
     def _oninputcomplete(self):
         with self._lock:
@@ -146,10 +152,19 @@
 
         with self._lock:
             for o in self._decoder.getavailable():
-                if not self._seeninitial:
+                if not self._seeninitial and not self.fromredirect:
                     self._handleinitial(o)
                     continue
 
+                # We should never see an object after a content redirect,
+                # as the spec says the main status object containing the
+                # content redirect is the only object in the stream. Fail
+                # if we see a misbehaving server.
+                if self._redirect:
+                    raise error.Abort(_('received unexpected response data '
+                                        'after content redirect; the remote is '
+                                        'buggy'))
+
                 self._pendingevents.append(o)
 
             self._serviceable.set()
@@ -160,7 +175,16 @@
             return
 
         elif o[b'status'] == b'redirect':
-            raise error.Abort(_('redirect responses not yet supported'))
+            l = o[b'location']
+            self._redirect = wireprototypes.alternatelocationresponse(
+                url=l[b'url'],
+                mediatype=l[b'mediatype'],
+                size=l.get(b'size'),
+                fullhashes=l.get(b'fullhashes'),
+                fullhashseed=l.get(b'fullhashseed'),
+                serverdercerts=l.get(b'serverdercerts'),
+                servercadercerts=l.get(b'servercadercerts'))
+            return
 
         atoms = [{'msg': o[b'error'][b'message']}]
         if b'args' in o[b'error']:
@@ -214,13 +238,17 @@
     with the higher-level peer API.
     """
 
-    def __init__(self, ui, clientreactor):
+    def __init__(self, ui, clientreactor, opener=None,
+                 requestbuilder=util.urlreq.request):
         self._ui = ui
         self._reactor = clientreactor
         self._requests = {}
         self._futures = {}
         self._responses = {}
+        self._redirects = []
         self._frameseof = False
+        self._opener = opener or urlmod.opener(ui)
+        self._requestbuilder = requestbuilder
 
     def callcommand(self, command, args, f, redirect=None):
         """Register a request to call a command.
@@ -269,7 +297,12 @@
                 self._ui.note(_('received %r\n') % frame)
                 self._processframe(frame)
 
-        if self._frameseof:
+        # Also try to read the first redirect.
+        if self._redirects:
+            if not self._processredirect(*self._redirects[0]):
+                self._redirects.pop(0)
+
+        if self._frameseof and not self._redirects:
             return None
 
         return True
@@ -318,10 +351,27 @@
         # This can raise. The caller can handle it.
         response._onresponsedata(meta['data'])
 
+        # If we got a content redirect response, we want to fetch it and
+        # expose the data as if we received it inline. But we also want to
+        # keep our internal request accounting in order. Our strategy is to
+        # basically put meaningful response handling on pause until EOS occurs
+        # and the stream accounting is in a good state. At that point, we follow
+        # the redirect and replace the response object with its data.
+
+        redirect = response._redirect
+        handlefuture = False if redirect else True
+
         if meta['eos']:
             response._oninputcomplete()
             del self._requests[frame.requestid]
 
+            if redirect:
+                self._followredirect(frame.requestid, redirect)
+                return
+
+        if not handlefuture:
+            return
+
         # If the command has a decoder, we wait until all input has been
         # received before resolving the future. Otherwise we resolve the
         # future immediately.
@@ -336,6 +386,82 @@
             self._futures[frame.requestid].set_result(decoded)
             del self._futures[frame.requestid]
 
+    def _followredirect(self, requestid, redirect):
+        """Called to initiate redirect following for a request."""
+        self._ui.note(_('(following redirect to %s)\n') % redirect.url)
+
+        # TODO handle framed responses.
+        if redirect.mediatype != b'application/mercurial-cbor':
+            raise error.Abort(_('cannot handle redirects for the %s media type')
+                              % redirect.mediatype)
+
+        if redirect.fullhashes:
+            self._ui.warn(_('(support for validating hashes on content '
+                            'redirects not supported)\n'))
+
+        if redirect.serverdercerts or redirect.servercadercerts:
+            self._ui.warn(_('(support for pinning server certificates on '
+                            'content redirects not supported)\n'))
+
+        headers = {
+            r'Accept': redirect.mediatype,
+        }
+
+        req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
+
+        try:
+            res = self._opener.open(req)
+        except util.urlerr.httperror as e:
+            if e.code == 401:
+                raise error.Abort(_('authorization failed'))
+            raise
+        except util.httplib.HTTPException as e:
+            self._ui.debug('http error requesting %s\n' % req.get_full_url())
+            self._ui.traceback()
+            raise IOError(None, e)
+
+        urlmod.wrapresponse(res)
+
+        # The existing response object is associated with frame data. Rather
+        # than try to normalize its state, just create a new object.
+        oldresponse = self._responses[requestid]
+        self._responses[requestid] = commandresponse(requestid,
+                                                     oldresponse.command,
+                                                     fromredirect=True)
+
+        self._redirects.append((requestid, res))
+
+    def _processredirect(self, rid, res):
+        """Called to continue processing a response from a redirect."""
+        response = self._responses[rid]
+
+        try:
+            data = res.read(32768)
+            response._onresponsedata(data)
+
+            # We're at end of stream.
+            if not data:
+                response._oninputcomplete()
+
+            if rid not in self._futures:
+                return
+
+            if response.command not in COMMAND_DECODERS:
+                self._futures[rid].set_result(response.objects())
+                del self._futures[rid]
+            elif response._inputcomplete:
+                decoded = COMMAND_DECODERS[response.command](response.objects())
+                self._futures[rid].set_result(decoded)
+                del self._futures[rid]
+
+            return bool(data)
+
+        except BaseException as e:
+            self._futures[rid].set_exception(e)
+            del self._futures[rid]
+            response._oninputcomplete()
+            return False
+
 def decodebranchmap(objs):
     # Response should be a single CBOR map of branch name to array of nodes.
     bm = next(objs)
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -513,7 +513,9 @@
     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
                                              buffersends=True)
 
-    handler = wireprotov2peer.clienthandler(ui, reactor)
+    handler = wireprotov2peer.clienthandler(ui, reactor,
+                                            opener=opener,
+                                            requestbuilder=requestbuilder)
 
     url = '%s/%s' % (apiurl, permission)
 



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list