D4491: exchangev2: fetch file revisions

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Sep 5 16:23:32 UTC 2018


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  Now that the server has an API for fetching file data, we can call into
  it to fetch file revisions.
  
  The implementation is relatively straightforward: we examine the
  manifests that we fetched and find all new file revisions referenced
  by them. We build up a mapping from file path to file nodes to
  manifest node. (The mapping to first manifest node allows us to
  map back to first changelog node/revision, which is used for the
  linkrev.)
  
  Once that map is built up, we iterate over it in a deterministic
  manner and fetch and store file data. The code is very similar
  to manifest fetching. So similar that we could probably extract the
  common bits into a generic function.
  
  With file data retrieval implemented, `hg clone` and `hg pull` are
  effectively feature complete, at least as far as the completeness
  of data transfer for essential repository data (changesets, manifests,
  files, phases, and bookmarks). We're still missing support for
  obsolescence markers, the hgtags fnodes cache, and the branchmap
  cache. But these are non-essential for the moment (and will be
  implemented later).
  
  This is a good point to assess the state of exchangev2 in terms of
  performance. I ran a local `hg clone` for the mozilla-unified
  repository using both version 1 and version 2 of the wire protocols
  and exchange methods. This is effectively comparing the performance
  of the wire protocol overhead and "getbundle" versus domain-specific
  commands. Wire protocol version 2 doesn't have compression implemented
  yet. So I tested version 1 with `server.compressionengines=none` to
  remove compression overhead from the equation.
  
  server
  before: user 220.420+0.000 sys 14.420+0.000
  after:  user 321.980+0.000 sys 18.990+0.000
  
  client
  before: real 561.650 secs (user 497.670+0.000 sys 28.160+0.000)
  after:  real 1226.260 secs (user 944.240+0.000 sys 354.150+0.000)
  
  We have substantial regressions on both client and server. This
  is obviously not desirable. I'm aware of some reasons:
  
  - Lack of hgtagsfnodes transfer (contributes significant CPU to client).
  - Lack of branch cache transfer (contributes significant CPU to client).
  - Little to no profiling / optimization performed on wire protocol version 2 code.
  - There appears to be a memory leak on the client and that is likely causing swapping on my machine.
  - Using multiple threads on the client may be counter-productive because Python.
  - We're not compressing on the server.
  - We're tracking file nodes on the client via manifest diffing rather than using linkrev shortcuts on the server.
  
  I'm pretty confident that most of these issues are addressable.
  
  But even if we can't get wire protocol version 2 on performance parity
  with "getbundle," I still think it is important to have the set of low
  level data-specific retrieval commands that we have implemented so
  far. This is because the existence of such commands allows flexibility
  in how clients access server data.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D4491

AFFECTED FILES
  mercurial/exchangev2.py
  tests/test-wireproto-exchangev2.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-exchangev2.t b/tests/test-wireproto-exchangev2.t
--- a/tests/test-wireproto-exchangev2.t
+++ b/tests/test-wireproto-exchangev2.t
@@ -94,6 +94,37 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=922; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
+      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
+      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
+      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=389; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:caa2a465451d
 
@@ -189,6 +220,34 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=376; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=249; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=109; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:4432d83626e8
 
@@ -268,6 +327,36 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=559; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
+      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
+      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=249; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets cd2534766bec:caa2a465451d
   (run 'hg update' to get a working copy)
@@ -421,6 +510,37 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=922; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
+      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
+      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
+      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=389; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:caa2a465451d
 
diff --git a/mercurial/exchangev2.py b/mercurial/exchangev2.py
--- a/mercurial/exchangev2.py
+++ b/mercurial/exchangev2.py
@@ -7,6 +7,7 @@
 
 from __future__ import absolute_import
 
+import collections
 import weakref
 
 from .i18n import _
@@ -58,7 +59,12 @@
                                remote.url(), pullop.gettransaction,
                                explicit=pullop.explicitbookmarks)
 
-    _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
+    manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
+
+    # Find all file nodes referenced by added manifests and fetch those
+    # revisions.
+    fnodes = _derivefilesfrommanifests(repo, manres['added'])
+    _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
 
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
@@ -291,4 +297,98 @@
 
     return {
         'added': added,
+        'linkrevs': linkrevs,
     }
+
+def _derivefilesfrommanifests(repo, manifestnodes):
+    """Determine what file nodes are relevant given a set of manifest nodes.
+
+    Returns a dict mapping file paths to dicts of file node to first manifest
+    node.
+    """
+    ml = repo.manifestlog
+    fnodes = collections.defaultdict(dict)
+
+    for manifestnode in manifestnodes:
+        m = ml.get(b'', manifestnode)
+
+        # TODO this will pull in unwanted nodes because it takes the storage
+        # delta into consideration. What we really want is something that takes
+        # the delta between the manifest's parents. And ideally we would
+        # ignore file nodes that are known locally. For now, ignore both
+        # these limitations. This will result in incremental fetches requesting
+        # data we already have. So this is far from ideal.
+        md = m.readfast()
+
+        for path, fnode in md.items():
+            fnodes[path].setdefault(fnode, manifestnode)
+
+    return fnodes
+
+def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
+    def iterrevisions(objs, progress):
+        for filerevision in objs:
+            node = filerevision[b'node']
+
+            if b'deltasize' in filerevision:
+                basenode = filerevision[b'deltabasenode']
+                delta = next(objs)
+            elif b'revisionsize' in filerevision:
+                basenode = nullid
+                revision = next(objs)
+                delta = mdiff.trivialdiffheader(len(revision)) + revision
+            else:
+                continue
+
+            yield (
+                node,
+                filerevision[b'parents'][0],
+                filerevision[b'parents'][1],
+                node,
+                basenode,
+                delta,
+                # Flags not yet supported.
+                0,
+            )
+
+            progress.increment()
+
+    progress = repo.ui.makeprogress(
+        _('files'), unit=_('chunks'),
+         total=sum(len(v) for v in fnodes.itervalues()))
+
+    # TODO make batch size configurable
+    batchsize = 10000
+    fnodeslist = [x for x in sorted(fnodes.items())]
+
+    for i in pycompat.xrange(0, len(fnodeslist), batchsize):
+        batch = [x for x in fnodeslist[i:i + batchsize]]
+        if not batch:
+            continue
+
+        with remote.commandexecutor() as e:
+            fs = []
+            locallinkrevs = {}
+
+            for path, nodes in batch:
+                fs.append((path, e.callcommand(b'filedata', {
+                    b'path': path,
+                    b'nodes': sorted(nodes),
+                    b'fields': {b'parents', b'revision'}
+                })))
+
+                locallinkrevs[path] = {
+                    node: linkrevs[manifestnode]
+                    for node, manifestnode in nodes.iteritems()}
+
+            for path, f in fs:
+                objs = f.result()
+
+                # Chomp off header objects.
+                next(objs)
+
+                store = repo.file(path)
+                store.addgroup(
+                    iterrevisions(objs, progress),
+                    locallinkrevs[path].__getitem__,
+                    weakref.proxy(tr))



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list