D2470: wireproto: allow direct stream processing for unbundle
joerg.sonnenberger (Joerg Sonnenberger)
phabricator at mercurial-scm.org
Fri Apr 6 19:09:16 EDT 2018
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG2d965bfeb8f6: wireproto: allow direct stream processing for unbundle (authored by joerg.sonnenberger, committed by ).
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D2470?vs=7816&id=7817
REVISION DETAIL
https://phab.mercurial-scm.org/D2470
AFFECTED FILES
hgext/largefiles/proto.py
mercurial/configitems.py
mercurial/help/config.txt
mercurial/wireproto.py
mercurial/wireprotoserver.py
mercurial/wireprototypes.py
tests/test-push-http.t
CHANGE DETAILS
diff --git a/tests/test-push-http.t b/tests/test-push-http.t
--- a/tests/test-push-http.t
+++ b/tests/test-push-http.t
@@ -23,7 +23,7 @@
$ echo a >> a
$ hg ci -mb
$ req() {
- > hg serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
+ > hg $1 serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
> cat hg.pid >> $DAEMON_PIDS
> hg --cwd ../test2 push http://localhost:$HGPORT/
> exitstatus=$?
@@ -70,6 +70,58 @@
> echo "phase-move: $HG_NODE: $HG_OLDPHASE -> $HG_PHASE"
> EOF
+#if bundle1
+ $ cat >> .hg/hgrc <<EOF
+ > allow_push = *
+ > [hooks]
+ > changegroup = sh -c "printenv.py changegroup 0"
+ > pushkey = sh -c "printenv.py pushkey 0"
+ > txnclose-phase.test = sh $TESTTMP/hook.sh
+ > EOF
+ $ req "--debug --config extensions.blackbox="
+ listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
+ pushing to http://localhost:$HGPORT/
+ searching for changes
+ remote: redirecting incoming bundle to */hg-unbundle-* (glob)
+ remote: adding changesets
+ remote: add changeset ba677d0156c1
+ remote: adding manifests
+ remote: adding file changes
+ remote: adding a revisions
+ remote: added 1 changesets with 1 changes to 1 files
+ remote: updating the branch cache
+ remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+ remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
+ remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+ remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
+ remote: running hook changegroup: sh -c "printenv.py changegroup 0"
+ remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
+ % serve errors
+ $ hg rollback
+ repository tip rolled back to revision 0 (undo serve)
+ $ req "--debug --config server.streamunbundle=True --config extensions.blackbox="
+ listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
+ pushing to http://localhost:$HGPORT/
+ searching for changes
+ remote: adding changesets
+ remote: add changeset ba677d0156c1
+ remote: adding manifests
+ remote: adding file changes
+ remote: adding a revisions
+ remote: added 1 changesets with 1 changes to 1 files
+ remote: updating the branch cache
+ remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+ remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
+ remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
+ remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
+ remote: running hook changegroup: sh -c "printenv.py changegroup 0"
+ remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
+ % serve errors
+ $ hg rollback
+ repository tip rolled back to revision 0 (undo serve)
+#endif
+
+#if bundle2
$ cat >> .hg/hgrc <<EOF
> allow_push = *
> [hooks]
@@ -86,11 +138,11 @@
remote: added 1 changesets with 1 changes to 1 files
remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
- remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle1 !)
- remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle2 !)
+ remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
% serve errors
$ hg rollback
repository tip rolled back to revision 0 (undo serve)
+#endif
expect success, server lacks the httpheader capability
diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py
--- a/mercurial/wireprototypes.py
+++ b/mercurial/wireprototypes.py
@@ -123,10 +123,11 @@
Returns a list of capabilities as declared by the client for
the current request (or connection for stateful protocol handlers)."""
- def forwardpayload(fp):
- """Read the raw payload and forward to a file.
+ def getpayload():
+ """Provide a generator for the raw payload.
- The payload is read in full before the function returns.
+ The caller is responsible for ensuring that the full payload is
+ processed.
"""
def mayberedirectstdio():
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -106,15 +106,14 @@
self._protocaps = set(value.split(' '))
return self._protocaps
- def forwardpayload(self, fp):
+ def getpayload(self):
# Existing clients *always* send Content-Length.
length = int(self._req.headers[b'Content-Length'])
# If httppostargs is used, we need to read Content-Length
# minus the amount that was consumed by args.
length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
- for s in util.filechunkiter(self._req.bodyfh, limit=length):
- fp.write(s)
+ return util.filechunkiter(self._req.bodyfh, limit=length)
@contextlib.contextmanager
def mayberedirectstdio(self):
@@ -610,7 +609,7 @@
# Protocol capabilities are currently not implemented for HTTP V2.
return set()
- def forwardpayload(self, fp):
+ def getpayload(self):
raise NotImplementedError
@contextlib.contextmanager
@@ -783,7 +782,7 @@
def getprotocaps(self):
return self._protocaps
- def forwardpayload(self, fpout):
+ def getpayload(self):
# We initially send an empty response. This tells the client it is
# OK to start sending data. If a client sees any other response, it
# interprets it as an error.
@@ -796,7 +795,7 @@
# 0\n
count = int(self._fin.readline())
while count:
- fpout.write(self._fin.read(count))
+ yield self._fin.read(count)
count = int(self._fin.readline())
@contextlib.contextmanager
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -1082,14 +1082,33 @@
with proto.mayberedirectstdio() as output:
try:
exchange.check_heads(repo, their_heads, 'preparing changes')
+ cleanup = lambda: None
+ try:
+ payload = proto.getpayload()
+ if repo.ui.configbool('server', 'streamunbundle'):
+ def cleanup():
+ # Ensure that the full payload is consumed, so
+ # that the connection doesn't contain trailing garbage.
+ for p in payload:
+ pass
+ fp = util.chunkbuffer(payload)
+ else:
+ # write bundle data to temporary file as it can be big
+ fp, tempname = None, None
+ def cleanup():
+ if fp:
+ fp.close()
+ if tempname:
+ os.unlink(tempname)
+ fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
+ repo.ui.debug('redirecting incoming bundle to %s\n' %
+ tempname)
+ fp = os.fdopen(fd, pycompat.sysstr('wb+'))
+ r = 0
+ for p in payload:
+ fp.write(p)
+ fp.seek(0)
- # write bundle data to temporary file because it can be big
- fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
- fp = os.fdopen(fd, r'wb+')
- r = 0
- try:
- proto.forwardpayload(fp)
- fp.seek(0)
gen = exchange.readbundle(repo.ui, fp, None)
if (isinstance(gen, changegroupmod.cg1unpacker)
and not bundle1allowed(repo, 'push')):
@@ -1112,8 +1131,7 @@
r, output.getvalue() if output else '')
finally:
- fp.close()
- os.unlink(tempname)
+ cleanup()
except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
# handle non-bundle2 case first
diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt
--- a/mercurial/help/config.txt
+++ b/mercurial/help/config.txt
@@ -1791,6 +1791,11 @@
are highly recommended. Partial clones will still be allowed.
(default: False)
+``streamunbundle``
+ When set, servers will apply data sent from the client directly,
+ otherwise it will be written to a temporary file first. This option
+ effectively prevents concurrent pushes.
+
``concurrent-push-mode``
Level of allowed race condition between two pushing clients.
diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -917,6 +917,9 @@
coreconfigitem('server', 'disablefullbundle',
default=False,
)
+coreconfigitem('server', 'streamunbundle',
+ default=False,
+)
coreconfigitem('server', 'maxhttpheaderlen',
default=1024,
)
diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py
--- a/hgext/largefiles/proto.py
+++ b/hgext/largefiles/proto.py
@@ -41,7 +41,8 @@
tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
try:
- proto.forwardpayload(tmpfp)
+ for p in proto.getpayload():
+ tmpfp.write(p)
tmpfp._fp.seek(0)
if sha != lfutil.hexsha1(tmpfp._fp):
raise IOError(0, _('largefile contents do not match hash'))
To: joerg.sonnenberger, #hg-reviewers, lothiraldan, indygreg
Cc: indygreg, lothiraldan, mercurial-devel
More information about the Mercurial-devel
mailing list