D5135: exchangev2: support for calling rawstorefiledata to retrieve raw files

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Wed Oct 17 13:17:39 UTC 2018


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This is somewhat hacky. For that I apologize.
  
  At the 4.8 Sprint, we decided we wanted to land support in wireprotov2 for doing
  a partial clone with changelog and manifestlog bootstrapped from a "stream clone"
  like primitive.
  
  This commit implements the client-side bits necessary to facilitate that.
  
  If the new server-side command for obtaining raw files data is available, we
  call it to get the raw files for the changelog and manifestlog. Then we
  fall through to an incremental pull. But when fetching files data, instead
  of using the list of a changesets and manifests that we fetched via the
  "changesetdata" command, we do a linear scan of the repo and resolve the
  changeset and manifest nodes along with the manifest linkrevs.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D5135

AFFECTED FILES
  mercurial/exchangev2.py
  tests/test-wireproto-exchangev2.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-exchangev2.t b/tests/test-wireproto-exchangev2.t
--- a/tests/test-wireproto-exchangev2.t
+++ b/tests/test-wireproto-exchangev2.t
@@ -963,3 +963,185 @@
   client-narrow-2/.hg/store/00manifest.i
   client-narrow-2/.hg/store/data/dir0/d.i
 #endif
+
+--stream will use rawfiledata to transfer changelog and manifestlog, then
+fall through to get files data
+
+  $ hg --debug clone --stream -U http://localhost:$HGPORT client-stream-0
+  using http://localhost:$HGPORT/
+  sending capabilities command
+  sending 1 commands
+  sending command rawstorefiledata: {
+    'files': [
+      'changelog',
+      'manifestlog'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=1275; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  updating the branch cache
+  query 1; heads
+  sending 2 commands
+  sending command heads: {}
+  sending command known: {
+    'nodes': [
+      '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=22; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=2; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
+  searching for changes
+  all remote heads known locally
+  sending 1 commands
+  sending command changesetdata: {
+    'fields': set([
+      'bookmarks',
+      'parents',
+      'phase',
+      'revision'
+    ]),
+    'revisions': [
+      {
+        'heads': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'roots': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetdagrange'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=13; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  checking for updated bookmarks
+  sending 1 commands
+  sending command filesdata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'haveparents': True,
+    'revisions': [
+      {
+        'nodes': [
+          '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+          '\xb7\t8\x08\x92\xb1\x93\xc1\t\x1d:\x81\x7fp`R\xe3F\x82\x1b',
+          'G\xfe\x01*\xb27\xa8\xc7\xfc\x0cx\xf9\xf2mXf\xee\xf3\xf8%',
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=1133; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)
+
+--stream + --include/--exclude will only obtain some files
+
+  $ hg --debug --config extensions.pullext=$TESTDIR/pullext.py clone --stream --include dir0/ -U http://localhost:$HGPORT client-stream-2
+  using http://localhost:$HGPORT/
+  sending capabilities command
+  sending 1 commands
+  sending command rawstorefiledata: {
+    'files': [
+      'changelog',
+      'manifestlog'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=1275; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  updating the branch cache
+  query 1; heads
+  sending 2 commands
+  sending command heads: {}
+  sending command known: {
+    'nodes': [
+      '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=22; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=2; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
+  searching for changes
+  all remote heads known locally
+  sending 1 commands
+  sending command changesetdata: {
+    'fields': set([
+      'bookmarks',
+      'parents',
+      'phase',
+      'revision'
+    ]),
+    'revisions': [
+      {
+        'heads': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'roots': [
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetdagrange'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=13; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  checking for updated bookmarks
+  sending 1 commands
+  sending command filesdata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'haveparents': True,
+    'pathfilter': {
+      'include': [
+        'path:dir0'
+      ]
+    },
+    'revisions': [
+      {
+        'nodes': [
+          '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+          '\xb7\t8\x08\x92\xb1\x93\xc1\t\x1d:\x81\x7fp`R\xe3F\x82\x1b',
+          'G\xfe\x01*\xb27\xa8\xc7\xfc\x0cx\xf9\xf2mXf\xee\xf3\xf8%',
+          '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
+  }
+  received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
+  received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=449; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)
+
+#if reporevlogstore
+  $ find client-stream-2/.hg/store -type f -name '*.i' | sort
+  client-stream-2/.hg/store/00changelog.i
+  client-stream-2/.hg/store/00manifest.i
+  client-stream-2/.hg/store/data/dir0/c.i
+  client-stream-2/.hg/store/data/dir0/d.i
+#endif
diff --git a/mercurial/exchangev2.py b/mercurial/exchangev2.py
--- a/mercurial/exchangev2.py
+++ b/mercurial/exchangev2.py
@@ -29,6 +29,18 @@
     """Pull using wire protocol version 2."""
     repo = pullop.repo
     remote = pullop.remote
+
+    usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
+
+    # If this is a clone and it was requested to perform a "stream clone",
+    # we obtain the raw files data from the remote then fall back to an
+    # incremental pull. This is somewhat hacky and is not nearly robust enough
+    # for long-term usage.
+    if usingrawchangelogandmanifest:
+        with repo.transaction('clone'):
+            _fetchrawstorefiles(repo, remote)
+            repo.invalidate(clearfilecache=True)
+
     tr = pullop.trmanager.transaction()
 
     # We don't use the repo's narrow matcher here because the patterns passed
@@ -79,11 +91,122 @@
 
     manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
 
+    # If obtaining the raw store files, we need to scan the full repo to
+    # derive all the changesets, manifests, and linkrevs.
+    if usingrawchangelogandmanifest:
+        csetsforfiles = []
+        mnodesforfiles = []
+        manifestlinkrevs = {}
+
+        for rev in repo:
+            ctx = repo[rev]
+            mnode = ctx.manifestnode()
+
+            csetsforfiles.append(ctx.node())
+            mnodesforfiles.append(mnode)
+            manifestlinkrevs[mnode] = rev
+
+    else:
+        csetsforfiles = csetres['added']
+        mnodesforfiles = manres['added']
+        manifestlinkrevs = manres['linkrevs']
+
     # Find all file nodes referenced by added manifests and fetch those
     # revisions.
-    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, manres['added'])
-    _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetres['added'],
-                         manres['linkrevs'])
+    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
+    _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles,
+                         manifestlinkrevs)
+
+def _checkuserawstorefiledata(pullop):
+    """Check whether we should use rawstorefiledata command to retrieve data."""
+
+    repo = pullop.repo
+    remote = pullop.remote
+
+    # Command to obtain raw store data isn't available.
+    if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
+        return False
+
+    # Only honor if user requested stream clone operation.
+    if not pullop.streamclonerequested:
+        return False
+
+    # Only works on empty repos.
+    if len(repo):
+        return False
+
+    # TODO This is super hacky. There needs to be a storage API for this. We
+    # also need to check for compatibility with the remote.
+    if b'revlogv1' not in repo.requirements:
+        return False
+
+    return True
+
+def _fetchrawstorefiles(repo, remote):
+    with remote.commandexecutor() as e:
+        objs = e.callcommand(b'rawstorefiledata', {
+            b'files': [b'changelog', b'manifestlog'],
+        }).result()
+
+        # First object is a summary of files data that follows.
+        overall = next(objs)
+
+        progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'],
+                                        unit=_('bytes'))
+        with progress:
+            progress.update(0)
+
+            # Next are pairs of file metadata, data.
+            while True:
+                try:
+                    filemeta = next(objs)
+                except StopIteration:
+                    break
+
+                for k in (b'location', b'path', b'size'):
+                    if k not in filemeta:
+                        raise error.Abort(_(b'remote file data missing key: %s')
+                                          % k)
+
+                if filemeta[b'location'] == b'store':
+                    vfs = repo.svfs
+                else:
+                    raise error.Abort(_(b'invalid location for raw file data: '
+                                        b'%s') % filemeta[b'location'])
+
+                bytesremaining = filemeta[b'size']
+
+                with vfs.open(filemeta[b'path'], b'wb') as fh:
+                    while True:
+                        try:
+                            chunk = next(objs)
+                        except StopIteration:
+                            break
+
+                        bytesremaining -= len(chunk)
+
+                        if bytesremaining < 0:
+                            raise error.Abort(_(
+                                b'received invalid number of bytes for file '
+                                b'data; expected %d, got extra') %
+                                              filemeta[b'size'])
+
+                        progress.increment(step=len(chunk))
+                        fh.write(chunk)
+
+                        try:
+                            if chunk.islast:
+                                break
+                        except AttributeError:
+                            raise error.Abort(_(
+                                b'did not receive indefinite length bytestring '
+                                b'for file data'))
+
+                if bytesremaining:
+                    raise error.Abort(_(b'received invalid number of bytes for'
+                                        b'file data; expected %d got %d') %
+                                      (filemeta[b'size'],
+                                       filemeta[b'size'] - bytesremaining))
 
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list