[PATCH 3 of 4 flagprocessor v9] revlog: flag processor

Remi Chaintron remi at fb.com
Tue Jan 10 08:31:02 EST 2017


# HG changeset patch
# User Remi Chaintron <remi at fb.com>
# Date 1484050279 0
#      Tue Jan 10 12:11:19 2017 +0000
# Node ID 36f683ac689bce47243d80b697608d6bc9cc13bb
# Parent  f88e4cbc4f2902ca7e6e73bd6267cc40594e2610
revlog: flag processor

Add the ability for revlog objects to process revision flags and apply
registered transforms on read/write operations.

This patch introduces:
- the 'revlog._processflags()' method that looks at revision flags and applies
  flag processors registered on them. Due to the need to handle non-commutative
  operations, flag transforms are applied in stable order but the order in which
  the transforms are applied is reversed between read and write operations.
- the 'addflagprocessor()' method allowing to register processors on flags.
  Flag processors are defined as a 3-tuple of (read, write, raw) functions to be
  applied depending on the operation being performed.
- an update on 'revlog.addrevision()' behavior. The current flagprocessor design
  relies on extensions to wrap around 'addrevision()' to set flags on revision
  data, and on the flagprocessor to perform the actual transformation of its
  contents. In the lfs case, this means we need to process flags before we meet
  the 2GB size check, leading to performing some operations before it happens:
  - if flags are set on the revision data, we assume some extensions might be
    modifying the contents using the flag processor next, and we compute the
    node for the original revision data (still allowing extension to override
    the node by wrapping around 'addrevision()').
  - we then invoke the flag processor to apply registered transforms (in lfs's
    case, drastically reducing the size of large blobs).
  - finally, we proceed with the 2GB size check.

Note: In the case a cachedelta is passed to 'addrevision()' and we detect the
flag processor modified the revision data, we chose to trust the flag processor
and drop the cachedelta.

diff --git a/mercurial/bundlerepo.py b/mercurial/bundlerepo.py
--- a/mercurial/bundlerepo.py
+++ b/mercurial/bundlerepo.py
@@ -148,7 +148,10 @@
             delta = self._chunk(chain.pop())
             text = mdiff.patches(text, [delta])
 
-        self.checkhash(text, node, rev=rev)
+        text, validatehash = self._processflags(text, self.flags(rev),
+                                                'read', raw=raw)
+        if validatehash:
+            self.checkhash(text, node, rev=rev)
         self._cache = (node, rev, text)
         return text
 
diff --git a/mercurial/revlog.py b/mercurial/revlog.py
--- a/mercurial/revlog.py
+++ b/mercurial/revlog.py
@@ -55,7 +55,11 @@
 # revlog index flags
 REVIDX_ISCENSORED = (1 << 15) # revision has censor metadata, must be verified
 REVIDX_DEFAULT_FLAGS = 0
-REVIDX_KNOWN_FLAGS = REVIDX_ISCENSORED
+# stable order in which flags need to be processed and their processors applied
+REVIDX_FLAGS_ORDER = [
+    REVIDX_ISCENSORED,
+]
+REVIDX_KNOWN_FLAGS = reduce(lambda x, y: x | y, REVIDX_FLAGS_ORDER)
 
 # max size of revlog with inline data
 _maxinline = 131072
@@ -64,6 +68,39 @@
 RevlogError = error.RevlogError
 LookupError = error.LookupError
 CensoredNodeError = error.CensoredNodeError
+ProgrammingError = error.ProgrammingError
+
+# Store flag processors (cf. 'addflagprocessor()' to register)
+_flagprocessors = {}
+
+def addflagprocessor(flag, processor):
+    """Register a flag processor on a revision data flag.
+
+    Invariant:
+    - Flags need to be defined in REVIDX_KNOWN_FLAGS and REVIDX_FLAGS_ORDER.
+    - Only one flag processor can be registered on a specific flag.
+    - flagprocessors must be 3-tuples of functions (read, write, raw) with the
+      following signatures:
+          - (read)  f(self, text) -> newtext, bool
+          - (write) f(self, text) -> newtext, bool
+          - (raw)   f(self, text) -> bool
+      The boolean returned by these transforms is used to determine whether
+      'newtext' can be used for hash integrity checking.
+
+      Note: The 'raw' transform is used for changegroup generation and in some
+      debug commands. In this case the transform only indicates whether the
+      contents can be used for hash integrity checks.
+    """
+    if not flag & REVIDX_KNOWN_FLAGS:
+        msg = _("cannot register processor on unknown flag '%#x'.") % (flag)
+        raise ProgrammingError(msg)
+    if flag not in REVIDX_FLAGS_ORDER:
+        msg = _("flag '%#x' undefined in REVIDX_FLAGS_ORDER.") % (flag)
+        raise ProgrammingError(msg)
+    if flag in _flagprocessors:
+        msg = _("cannot register multiple processors on flag '%#x'.") % (flag)
+        raise error.Abort(msg)
+    _flagprocessors[flag] = processor
 
 def getoffset(q):
     return int(q >> 16)
@@ -1231,11 +1268,6 @@
         if rev is None:
             rev = self.rev(node)
 
-        # check rev flags
-        if self.flags(rev) & ~REVIDX_KNOWN_FLAGS:
-            raise RevlogError(_('incompatible revision flag %x') %
-                              (self.flags(rev) & ~REVIDX_KNOWN_FLAGS))
-
         chain, stopped = self._deltachain(rev, stoprev=cachedrev)
         if stopped:
             text = self._cache[2]
@@ -1249,7 +1281,12 @@
             bins = bins[1:]
 
         text = mdiff.patches(text, bins)
-        self.checkhash(text, node, rev=rev)
+
+        text, validatehash = self._processflags(text, self.flags(rev), 'read',
+                                                raw=raw)
+        if validatehash:
+            self.checkhash(text, node, rev=rev)
+
         self._cache = (node, rev, text)
         return text
 
@@ -1261,6 +1298,65 @@
         """
         return hash(text, p1, p2)
 
+    def _processflags(self, text, flags, operation, raw=False):
+        """Inspect revision data flags and applies transforms defined by
+        registered flag processors.
+
+        ``text`` - the revision data to process
+        ``flags`` - the revision flags
+        ``operation`` - the operation being performed (read or write)
+        ``raw`` - an optional argument describing if the raw transform should be
+        applied.
+
+        This method processes the flags in the order (or reverse order if
+        ``operation`` is 'write') defined by REVIDX_FLAGS_ORDER, applying the
+        flag processors registered for present flags. The order of flags defined
+        in REVIDX_FLAGS_ORDER needs to be stable to allow non-commutativity.
+
+        Returns a 2-tuple of ``(text, validatehash)`` where ``text`` is the
+        processed text and ``validatehash`` is a bool indicating whether the
+        returned text should be checked for hash integrity.
+
+        Note: If the ``raw`` argument is set, it has precedence over the
+        operation and will only update the value of ``validatehash``.
+        """
+        if not operation in ('read', 'write'):
+            raise ProgrammingError(_("invalid '%s' operation ") % (operation))
+        # Check all flags are known.
+        if flags & ~REVIDX_KNOWN_FLAGS:
+            raise RevlogError(_("incompatible revision flag '%#x'") %
+                              (flags & ~REVIDX_KNOWN_FLAGS))
+        validatehash = True
+        # Depending on the operation (read or write), the order might be
+        # reversed due to non-commutative transforms.
+        orderedflags = REVIDX_FLAGS_ORDER
+        if operation == 'write':
+            orderedflags = reversed(orderedflags)
+
+        for flag in orderedflags:
+            # If a flagprocessor has been registered for a known flag, apply the
+            # related operation transform and update result tuple.
+            if flag & flags:
+                vhash = True
+
+                if flag not in _flagprocessors:
+                    message = _("missing processor for flag '%#x'") % (flag)
+                    raise RevlogError(message)
+
+                processor = _flagprocessors[flag]
+                if processor is not None:
+                    readtransform, writetransform, rawtransform = processor
+
+                    if raw:
+                        vhash = rawtransform(self, text)
+                    elif operation == 'read':
+                        text, vhash = readtransform(self, text)
+                    else: # write operation
+                        text, vhash = writetransform(self, text)
+                validatehash = validatehash and vhash
+
+        return text, validatehash
+
     def checkhash(self, text, node, p1=None, p2=None, rev=None):
         """Check node hash integrity.
 
@@ -1345,6 +1441,17 @@
             raise RevlogError(_("attempted to add linkrev -1 to %s")
                               % self.indexfile)
 
+        if flags:
+            node = node or self.hash(text, p1, p2)
+
+        newtext, validatehash = self._processflags(text, flags, 'write')
+
+        # If the flag processor modifies the revision data, ignore any provided
+        # cachedelta.
+        if newtext != text:
+            cachedelta = None
+        text = newtext
+
         if len(text) > _maxentrysize:
             raise RevlogError(
                 _("%s: size of %d bytes exceeds maximum revlog storage of 2GiB")
@@ -1354,6 +1461,9 @@
         if node in self.nodemap:
             return node
 
+        if validatehash:
+            self.checkhash(text, node, p1=p1, p2=p2)
+
         dfh = None
         if not self._inline:
             dfh = self.opener(self.datafile, "a+")
@@ -1448,7 +1558,10 @@
                 btext[0] = mdiff.patch(basetext, delta)
 
             try:
-                self.checkhash(btext[0], node, p1=p1, p2=p2)
+                res = self._processflags(btext[0], flags, 'read', raw=raw)
+                btext[0], validatehash = res
+                if validatehash:
+                    self.checkhash(btext[0], node, p1=p1, p2=p2)
                 if flags & REVIDX_ISCENSORED:
                     raise RevlogError(_('node %s is not censored') % node)
             except CensoredNodeError:
diff --git a/tests/flagprocessorext.py b/tests/flagprocessorext.py
new file mode 100644
--- /dev/null
+++ b/tests/flagprocessorext.py
@@ -0,0 +1,136 @@
+# coding=UTF-8
+
+from __future__ import absolute_import
+
+import base64
+import zlib
+
+from mercurial import (
+    changegroup,
+    extensions,
+    filelog,
+    revlog,
+)
+
+# Test only: These flags are defined here only in the context of testing the
+# behavior of the flag processor. The canonical way to add flags is to get in
+# touch with the community and make them known in revlog.
+REVIDX_NOOP = (1 << 3)
+REVIDX_BASE64 = (1 << 2)
+REVIDX_GZIP = (1 << 1)
+REVIDX_FAIL = 1
+
+def validatehash(self, text):
+    return True
+
+def bypass(self, text):
+    return False
+
+def noopdonothing(self, text):
+    return (text, True)
+
+def b64encode(self, text):
+    return (base64.b64encode(text), False)
+
+def b64decode(self, text):
+    return (base64.b64decode(text), True)
+
+def gzipcompress(self, text):
+    return (zlib.compress(text), False)
+
+def gzipdecompress(self, text):
+    return (zlib.decompress(text), True)
+
+def supportedoutgoingversions(orig, repo):
+    versions = orig(repo)
+    versions.discard('01')
+    versions.discard('02')
+    versions.add('03')
+    return versions
+
+def allsupportedversions(orig, ui):
+    versions = orig(ui)
+    versions.add('03')
+    return versions
+
+def noopaddrevision(orig, self, text, transaction, link, p1, p2,
+                    cachedelta=None, node=None,
+                    flags=revlog.REVIDX_DEFAULT_FLAGS):
+    if '[NOOP]' in text:
+        flags |= REVIDX_NOOP
+    return orig(self, text, transaction, link, p1, p2, cachedelta=cachedelta,
+                node=node, flags=flags)
+
+def b64addrevision(orig, self, text, transaction, link, p1, p2,
+                   cachedelta=None, node=None,
+                   flags=revlog.REVIDX_DEFAULT_FLAGS):
+    if '[BASE64]' in text:
+        flags |= REVIDX_BASE64
+    return orig(self, text, transaction, link, p1, p2, cachedelta=cachedelta,
+                node=node, flags=flags)
+
+def gzipaddrevision(orig, self, text, transaction, link, p1, p2,
+                    cachedelta=None, node=None,
+                    flags=revlog.REVIDX_DEFAULT_FLAGS):
+    if '[GZIP]' in text:
+        flags |= REVIDX_GZIP
+    return orig(self, text, transaction, link, p1, p2, cachedelta=cachedelta,
+                node=node, flags=flags)
+
+def failaddrevision(orig, self, text, transaction, link, p1, p2,
+                    cachedelta=None, node=None,
+                    flags=revlog.REVIDX_DEFAULT_FLAGS):
+    # This addrevision wrapper is meant to add a flag we will not have
+    # transforms registered for, ensuring we handle this error case.
+    if '[FAIL]' in text:
+        flags |= REVIDX_FAIL
+    return orig(self, text, transaction, link, p1, p2, cachedelta=cachedelta,
+                node=node, flags=flags)
+
+def extsetup(ui):
+    # Enable changegroup3 for flags to be sent over the wire
+    wrapfunction = extensions.wrapfunction
+    wrapfunction(changegroup,
+                 'supportedoutgoingversions',
+                 supportedoutgoingversions)
+    wrapfunction(changegroup,
+                 'allsupportedversions',
+                 allsupportedversions)
+
+    # Teach revlog about our test flags
+    testflags = [REVIDX_NOOP, REVIDX_BASE64, REVIDX_GZIP, REVIDX_FAIL]
+    revlog.REVIDX_KNOWN_FLAGS |= reduce(lambda x, y: x | y, testflags)
+    revlog.REVIDX_FLAGS_ORDER.extend(testflags)
+
+    # Add wrappers for addrevision, responsible to set flags depending on the
+    # revision data contents.
+    wrapfunction(filelog.filelog, 'addrevision', noopaddrevision)
+    wrapfunction(filelog.filelog, 'addrevision', b64addrevision)
+    wrapfunction(filelog.filelog, 'addrevision', gzipaddrevision)
+    wrapfunction(filelog.filelog, 'addrevision', failaddrevision)
+
+    # Register flag processors for each extension
+    revlog.addflagprocessor(
+        REVIDX_NOOP,
+        (
+            noopdonothing,
+            noopdonothing,
+            validatehash,
+        )
+    )
+    revlog.addflagprocessor(
+        REVIDX_BASE64,
+        (
+            b64decode,
+            b64encode,
+            bypass,
+        ),
+    )
+    revlog.addflagprocessor(
+        REVIDX_GZIP,
+        (
+            gzipdecompress,
+            gzipcompress,
+            bypass
+        )
+    )
diff --git a/tests/test-flagprocessor.t b/tests/test-flagprocessor.t
new file mode 100644
--- /dev/null
+++ b/tests/test-flagprocessor.t
@@ -0,0 +1,167 @@
+# Create server
+  $ hg init server
+  $ cd server
+  $ cat >> .hg/hgrc << EOF
+  > [extensions]
+  > extension=$TESTDIR/flagprocessorext.py
+  > EOF
+  $ cd ../
+
+# Clone server and enable extensions
+  $ hg clone -q server client
+  $ cd client
+  $ cat >> .hg/hgrc << EOF
+  > [extensions]
+  > extension=$TESTDIR/flagprocessorext.py
+  > EOF
+
+# Commit file that will trigger the noop extension
+  $ echo '[NOOP]' > noop
+  $ hg commit -Aqm "noop"
+
+# Commit file that will trigger the base64 extension
+  $ echo '[BASE64]' > base64
+  $ hg commit -Aqm 'base64'
+
+# Commit file that will trigger the gzip extension
+  $ echo '[GZIP]' > gzip
+  $ hg commit -Aqm 'gzip'
+
+# Commit file that will trigger noop and base64
+  $ echo '[NOOP][BASE64]' > noop-base64
+  $ hg commit -Aqm 'noop+base64'
+
+# Commit file that will trigger noop and gzip
+  $ echo '[NOOP][GZIP]' > noop-gzip
+  $ hg commit -Aqm 'noop+gzip'
+
+# Commit file that will trigger base64 and gzip
+  $ echo '[BASE64][GZIP]' > base64-gzip
+  $ hg commit -Aqm 'base64+gzip'
+
+# Commit file that will trigger base64, gzip and noop
+  $ echo '[BASE64][GZIP][NOOP]' > base64-gzip-noop
+  $ hg commit -Aqm 'base64+gzip+noop'
+
+# TEST: ensure the revision data is consistent
+  $ hg cat noop
+  [NOOP]
+  $ hg debugdata noop 0
+  [NOOP]
+
+  $ hg cat -r . base64
+  [BASE64]
+  $ hg debugdata base64 0
+  W0JBU0U2NF0K (no-eol)
+
+  $ hg cat -r . gzip
+  [GZIP]
+  $ hg debugdata gzip 0
+  x\x9c\x8bv\x8f\xf2\x0c\x88\xe5\x02\x00\x08\xc8\x01\xfd (no-eol) (esc)
+
+  $ hg cat -r . noop-base64
+  [NOOP][BASE64]
+  $ hg debugdata noop-base64 0
+  W05PT1BdW0JBU0U2NF0K (no-eol)
+
+  $ hg cat -r . noop-gzip
+  [NOOP][GZIP]
+  $ hg debugdata noop-gzip 0
+  x\x9c\x8b\xf6\xf3\xf7\x0f\x88\x8dv\x8f\xf2\x0c\x88\xe5\x02\x00\x1dH\x03\xf1 (no-eol) (esc)
+
+  $ hg cat -r . base64-gzip
+  [BASE64][GZIP]
+  $ hg debugdata base64-gzip 0
+  eJyLdnIMdjUziY12j/IMiOUCACLBBDo= (no-eol)
+
+  $ hg cat -r . base64-gzip-noop
+  [BASE64][GZIP][NOOP]
+  $ hg debugdata base64-gzip-noop 0
+  eJyLdnIMdjUziY12j/IMiI328/cPiOUCAESjBi4= (no-eol)
+
+# Push to the server
+  $ hg push
+  pushing to $TESTTMP/server
+  searching for changes
+  adding changesets
+  adding manifests
+  adding file changes
+  added 7 changesets with 7 changes to 7 files
+
+# Initialize new client (not cloning) and setup extension
+  $ cd ..
+  $ hg init client2
+  $ cd client2
+  $ cat >> .hg/hgrc << EOF
+  > [paths]
+  > default = $TESTTMP/server
+  > [extensions]
+  > extension=$TESTDIR/flagprocessorext.py
+  > EOF
+
+# Pull from server and update to latest revision
+  $ hg pull default
+  pulling from $TESTTMP/server
+  requesting all changes
+  adding changesets
+  adding manifests
+  adding file changes
+  added 7 changesets with 7 changes to 7 files
+  (run 'hg update' to get a working copy)
+  $ hg update
+  7 files updated, 0 files merged, 0 files removed, 0 files unresolved
+
+# TEST: ensure the revision data is consistent
+  $ hg cat noop
+  [NOOP]
+  $ hg debugdata noop 0
+  [NOOP]
+
+  $ hg cat -r . base64
+  [BASE64]
+  $ hg debugdata base64 0
+  W0JBU0U2NF0K (no-eol)
+
+  $ hg cat -r . gzip
+  [GZIP]
+  $ hg debugdata gzip 0
+  x\x9c\x8bv\x8f\xf2\x0c\x88\xe5\x02\x00\x08\xc8\x01\xfd (no-eol) (esc)
+
+  $ hg cat -r . noop-base64
+  [NOOP][BASE64]
+  $ hg debugdata noop-base64 0
+  W05PT1BdW0JBU0U2NF0K (no-eol)
+
+  $ hg cat -r . noop-gzip
+  [NOOP][GZIP]
+  $ hg debugdata noop-gzip 0
+  x\x9c\x8b\xf6\xf3\xf7\x0f\x88\x8dv\x8f\xf2\x0c\x88\xe5\x02\x00\x1dH\x03\xf1 (no-eol) (esc)
+
+  $ hg cat -r . base64-gzip
+  [BASE64][GZIP]
+  $ hg debugdata base64-gzip 0
+  eJyLdnIMdjUziY12j/IMiOUCACLBBDo= (no-eol)
+
+  $ hg cat -r . base64-gzip-noop
+  [BASE64][GZIP][NOOP]
+  $ hg debugdata base64-gzip-noop 0
+  eJyLdnIMdjUziY12j/IMiI328/cPiOUCAESjBi4= (no-eol)
+
+# TEST: ensure a missing processor is handled
+  $ echo '[FAIL][BASE64][GZIP][NOOP]' > fail-base64-gzip-noop
+  $ hg commit -Aqm 'fail+base64+gzip+noop'
+  abort: missing processor for flag '0x1'!
+  [255]
+
+# TEST: ensure we cannot register several flag processors on the same flag
+  $ cat >> .hg/hgrc << EOF
+  > [extensions]
+  > extension=$TESTDIR/flagprocessorext.py
+  > duplicate=$TESTDIR/flagprocessorext.py
+  > EOF
+  $ echo 'this should fail' > file
+  $ hg commit -Aqm 'add file'
+  abort: cannot register multiple processors on flag '0x8'.
+  [255]
+
+


More information about the Mercurial-devel mailing list