D2470: wireproto: allow direct stream processing for unbundle

joerg.sonnenberger (Joerg Sonnenberger) phabricator at mercurial-scm.org
Fri Apr 6 19:09:16 EDT 2018


This revision was automatically updated to reflect the committed changes.
Closed by commit rHG2d965bfeb8f6: wireproto: allow direct stream processing for unbundle (authored by joerg.sonnenberger, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D2470?vs=7816&id=7817

REVISION DETAIL
  https://phab.mercurial-scm.org/D2470

AFFECTED FILES
  hgext/largefiles/proto.py
  mercurial/configitems.py
  mercurial/help/config.txt
  mercurial/wireproto.py
  mercurial/wireprotoserver.py
  mercurial/wireprototypes.py
  tests/test-push-http.t

CHANGE DETAILS

diff --git a/tests/test-push-http.t b/tests/test-push-http.t
--- a/tests/test-push-http.t
+++ b/tests/test-push-http.t
@@ -23,7 +23,7 @@
   $ echo a >> a
   $ hg ci -mb
   $ req() {
-  >     hg serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
+  >     hg $1 serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
   >     cat hg.pid >> $DAEMON_PIDS
   >     hg --cwd ../test2 push http://localhost:$HGPORT/
   >     exitstatus=$?
@@ -70,6 +70,58 @@
   > echo "phase-move: $HG_NODE:  $HG_OLDPHASE -> $HG_PHASE"
   > EOF
 
+#if bundle1
+  $ cat >> .hg/hgrc <<EOF
+  > allow_push = *
+  > [hooks]
+  > changegroup = sh -c "printenv.py changegroup 0"
+  > pushkey = sh -c "printenv.py pushkey 0"
+  > txnclose-phase.test = sh $TESTTMP/hook.sh 
+  > EOF
+  $ req "--debug --config extensions.blackbox="
+  listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
+  pushing to http://localhost:$HGPORT/
+  searching for changes
+  remote: redirecting incoming bundle to */hg-unbundle-* (glob)
+  remote: adding changesets
+  remote: add changeset ba677d0156c1
+  remote: adding manifests
+  remote: adding file changes
+  remote: adding a revisions
+  remote: added 1 changesets with 1 changes to 1 files
+  remote: updating the branch cache
+  remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+  remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b:  draft -> public
+  remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+  remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872:   -> public
+  remote: running hook changegroup: sh -c "printenv.py changegroup 0"
+  remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
+  % serve errors
+  $ hg rollback
+  repository tip rolled back to revision 0 (undo serve)
+  $ req "--debug --config server.streamunbundle=True --config extensions.blackbox="
+  listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
+  pushing to http://localhost:$HGPORT/
+  searching for changes
+  remote: adding changesets
+  remote: add changeset ba677d0156c1
+  remote: adding manifests
+  remote: adding file changes
+  remote: adding a revisions
+  remote: added 1 changesets with 1 changes to 1 files
+  remote: updating the branch cache
+  remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+  remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b:  draft -> public
+  remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+  remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872:   -> public
+  remote: running hook changegroup: sh -c "printenv.py changegroup 0"
+  remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
+  % serve errors
+  $ hg rollback
+  repository tip rolled back to revision 0 (undo serve)
+#endif
+
+#if bundle2
   $ cat >> .hg/hgrc <<EOF
   > allow_push = *
   > [hooks]
@@ -86,11 +138,11 @@
   remote: added 1 changesets with 1 changes to 1 files
   remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b:  draft -> public
   remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872:   -> public
-  remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle1 !)
-  remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle2 !)
+  remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
   % serve errors
   $ hg rollback
   repository tip rolled back to revision 0 (undo serve)
+#endif
 
 expect success, server lacks the httpheader capability
 
diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -123,10 +123,11 @@
         Returns a list of capabilities as declared by the client for
         the current request (or connection for stateful protocol handlers)."""
 
-    def forwardpayload(fp):
-        """Read the raw payload and forward to a file.
+    def getpayload():
+        """Provide a generator for the raw payload.
 
-        The payload is read in full before the function returns.
+        The caller is responsible for ensuring that the full payload is
+        processed.
         """
 
     def mayberedirectstdio():
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -106,15 +106,14 @@
             self._protocaps = set(value.split(' '))
         return self._protocaps
 
-    def forwardpayload(self, fp):
+    def getpayload(self):
         # Existing clients *always* send Content-Length.
         length = int(self._req.headers[b'Content-Length'])
 
         # If httppostargs is used, we need to read Content-Length
         # minus the amount that was consumed by args.
         length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
-        for s in util.filechunkiter(self._req.bodyfh, limit=length):
-            fp.write(s)
+        return util.filechunkiter(self._req.bodyfh, limit=length)
 
     @contextlib.contextmanager
     def mayberedirectstdio(self):
@@ -610,7 +609,7 @@
         # Protocol capabilities are currently not implemented for HTTP V2.
         return set()
 
-    def forwardpayload(self, fp):
+    def getpayload(self):
         raise NotImplementedError
 
     @contextlib.contextmanager
@@ -783,7 +782,7 @@
     def getprotocaps(self):
         return self._protocaps
 
-    def forwardpayload(self, fpout):
+    def getpayload(self):
         # We initially send an empty response. This tells the client it is
         # OK to start sending data. If a client sees any other response, it
         # interprets it as an error.
@@ -796,7 +795,7 @@
         # 0\n
         count = int(self._fin.readline())
         while count:
-            fpout.write(self._fin.read(count))
+            yield self._fin.read(count)
             count = int(self._fin.readline())
 
     @contextlib.contextmanager
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -1082,14 +1082,33 @@
     with proto.mayberedirectstdio() as output:
         try:
             exchange.check_heads(repo, their_heads, 'preparing changes')
+            cleanup = lambda: None
+            try:
+                payload = proto.getpayload()
+                if repo.ui.configbool('server', 'streamunbundle'):
+                    def cleanup():
+                        # Ensure that the full payload is consumed, so
+                        # that the connection doesn't contain trailing garbage.
+                        for p in payload:
+                            pass
+                    fp = util.chunkbuffer(payload)
+                else:
+                    # write bundle data to temporary file as it can be big
+                    fp, tempname = None, None
+                    def cleanup():
+                        if fp:
+                            fp.close()
+                        if tempname:
+                            os.unlink(tempname)
+                    fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
+                    repo.ui.debug('redirecting incoming bundle to %s\n' %
+                        tempname)
+                    fp = os.fdopen(fd, pycompat.sysstr('wb+'))
+                    r = 0
+                    for p in payload:
+                        fp.write(p)
+                    fp.seek(0)
 
-            # write bundle data to temporary file because it can be big
-            fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
-            fp = os.fdopen(fd, r'wb+')
-            r = 0
-            try:
-                proto.forwardpayload(fp)
-                fp.seek(0)
                 gen = exchange.readbundle(repo.ui, fp, None)
                 if (isinstance(gen, changegroupmod.cg1unpacker)
                     and not bundle1allowed(repo, 'push')):
@@ -1112,8 +1131,7 @@
                     r, output.getvalue() if output else '')
 
             finally:
-                fp.close()
-                os.unlink(tempname)
+                cleanup()
 
         except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
             # handle non-bundle2 case first
diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt
--- a/mercurial/help/config.txt
+++ b/mercurial/help/config.txt
@@ -1791,6 +1791,11 @@
     are highly recommended. Partial clones will still be allowed.
     (default: False)
 
+``streamunbundle``
+    When set, servers will apply data sent from the client directly,
+    otherwise it will be written to a temporary file first. This option
+    effectively prevents concurrent pushes.
+
 ``concurrent-push-mode``
     Level of allowed race condition between two pushing clients.
 
diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -917,6 +917,9 @@
 coreconfigitem('server', 'disablefullbundle',
     default=False,
 )
+coreconfigitem('server', 'streamunbundle',
+    default=False,
+)
 coreconfigitem('server', 'maxhttpheaderlen',
     default=1024,
 )
diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py
--- a/hgext/largefiles/proto.py
+++ b/hgext/largefiles/proto.py
@@ -41,7 +41,8 @@
         tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
 
         try:
-            proto.forwardpayload(tmpfp)
+            for p in proto.getpayload():
+                tmpfp.write(p)
             tmpfp._fp.seek(0)
             if sha != lfutil.hexsha1(tmpfp._fp):
                 raise IOError(0, _('largefile contents do not match hash'))



To: joerg.sonnenberger, #hg-reviewers, lothiraldan, indygreg
Cc: indygreg, lothiraldan, mercurial-devel


More information about the Mercurial-devel mailing list