D4491: exchangev2: fetch file revisions

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Fri Sep 14 23:17:50 EDT 2018


This revision was automatically updated to reflect the committed changes.
Closed by commit rHG039bf1eddc2e: exchangev2: fetch file revisions (authored by indygreg, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D4491?vs=10970&id=11069

REVISION DETAIL
  https://phab.mercurial-scm.org/D4491

AFFECTED FILES
  mercurial/exchangev2.py
  tests/test-wireproto-exchangev2.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-exchangev2.t b/tests/test-wireproto-exchangev2.t
--- a/tests/test-wireproto-exchangev2.t
+++ b/tests/test-wireproto-exchangev2.t
@@ -94,6 +94,37 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=922; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
+      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
+      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
+      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=389; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:caa2a465451d (3 drafts)
 
@@ -189,6 +220,34 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=376; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=249; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=109; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:4432d83626e8
 
@@ -268,6 +327,36 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=559; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
+      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
+      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=249; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets cd2534766bec:caa2a465451d (3 drafts)
   (run 'hg update' to get a working copy)
@@ -421,6 +510,37 @@
   received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
   received frame(size=922; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 2 commands
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
+      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
+      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
+    ],
+    'path': 'a'
+  }
+  sending command filedata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'nodes': [
+      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
+      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
+      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
+    ],
+    'path': 'b'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=389; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:caa2a465451d (1 drafts)
 
diff --git a/mercurial/exchangev2.py b/mercurial/exchangev2.py
--- a/mercurial/exchangev2.py
+++ b/mercurial/exchangev2.py
@@ -7,6 +7,7 @@
 
 from __future__ import absolute_import
 
+import collections
 import weakref
 
 from .i18n import _
@@ -58,7 +59,12 @@
                                remote.url(), pullop.gettransaction,
                                explicit=pullop.explicitbookmarks)
 
-    _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
+    manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
+
+    # Find all file nodes referenced by added manifests and fetch those
+    # revisions.
+    fnodes = _derivefilesfrommanifests(repo, manres['added'])
+    _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
 
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
@@ -291,4 +297,98 @@
 
     return {
         'added': added,
+        'linkrevs': linkrevs,
     }
+
+def _derivefilesfrommanifests(repo, manifestnodes):
+    """Determine what file nodes are relevant given a set of manifest nodes.
+
+    Returns a dict mapping file paths to dicts of file node to first manifest
+    node.
+    """
+    ml = repo.manifestlog
+    fnodes = collections.defaultdict(dict)
+
+    for manifestnode in manifestnodes:
+        m = ml.get(b'', manifestnode)
+
+        # TODO this will pull in unwanted nodes because it takes the storage
+        # delta into consideration. What we really want is something that takes
+        # the delta between the manifest's parents. And ideally we would
+        # ignore file nodes that are known locally. For now, ignore both
+        # these limitations. This will result in incremental fetches requesting
+        # data we already have. So this is far from ideal.
+        md = m.readfast()
+
+        for path, fnode in md.items():
+            fnodes[path].setdefault(fnode, manifestnode)
+
+    return fnodes
+
+def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
+    def iterrevisions(objs, progress):
+        for filerevision in objs:
+            node = filerevision[b'node']
+
+            if b'deltasize' in filerevision:
+                basenode = filerevision[b'deltabasenode']
+                delta = next(objs)
+            elif b'revisionsize' in filerevision:
+                basenode = nullid
+                revision = next(objs)
+                delta = mdiff.trivialdiffheader(len(revision)) + revision
+            else:
+                continue
+
+            yield (
+                node,
+                filerevision[b'parents'][0],
+                filerevision[b'parents'][1],
+                node,
+                basenode,
+                delta,
+                # Flags not yet supported.
+                0,
+            )
+
+            progress.increment()
+
+    progress = repo.ui.makeprogress(
+        _('files'), unit=_('chunks'),
+         total=sum(len(v) for v in fnodes.itervalues()))
+
+    # TODO make batch size configurable
+    batchsize = 10000
+    fnodeslist = [x for x in sorted(fnodes.items())]
+
+    for i in pycompat.xrange(0, len(fnodeslist), batchsize):
+        batch = [x for x in fnodeslist[i:i + batchsize]]
+        if not batch:
+            continue
+
+        with remote.commandexecutor() as e:
+            fs = []
+            locallinkrevs = {}
+
+            for path, nodes in batch:
+                fs.append((path, e.callcommand(b'filedata', {
+                    b'path': path,
+                    b'nodes': sorted(nodes),
+                    b'fields': {b'parents', b'revision'}
+                })))
+
+                locallinkrevs[path] = {
+                    node: linkrevs[manifestnode]
+                    for node, manifestnode in nodes.iteritems()}
+
+            for path, f in fs:
+                objs = f.result()
+
+                # Chomp off header objects.
+                next(objs)
+
+                store = repo.file(path)
+                store.addgroup(
+                    iterrevisions(objs, progress),
+                    locallinkrevs[path].__getitem__,
+                    weakref.proxy(tr))



To: indygreg, #hg-reviewers, durin42
Cc: mercurial-devel


More information about the Mercurial-devel mailing list