[PATCH 3 of 4 flagprocessor v8] revlog: flag processor

Remi Chaintron remi at fb.com
Thu Jan 5 12:42:03 EST 2017


# HG changeset patch
# User Remi Chaintron <remi at fb.com>
# Date 1483636648 0
#      Thu Jan 05 17:17:28 2017 +0000
# Node ID 8701df1c04340e9951481dc4c366ba550b4e790f
# Parent  a93b1ff78332f4f78e77ec9aaa2c63f7f975399c
revlog: flag processor

Add the ability for revlog objects to process revision flags and apply
registered transforms on read/write operations.

This patch introduces:
- the 'revlog._processflags()' method that looks at revision flags and applies
  flag processors registered on them. Due to the need to handle non-commutative
  operations, flag transforms are applied in stable order but the order in which
  the transforms are applied is reversed between read and write operations.
- the 'addflagprocessor()' method allowing to register processors on flags.
  Flag processors are defined as a 3-tuple of (read, write, raw) functions to be
  applied depending on the operation being performed.
- an update on 'revlog.addrevision()' behavior. The current flagprocessor design
  relies on extensions to wrap around 'addrevision()' to set flags on revision
  data, and on the flagprocessor to perform the actual transformation of its
  contents. In the lfs case, this means we need to process flags before we meet
  the 2GB size check, leading to performing some operations before it happens:
  - if flags are set on the revision data, we assume some extensions might be
    modifying the contents using the flag processor next, and we compute the
    node for the original revision data (still allowing extension to override
    the node by wrapping around 'addrevision()').
  - we then invoke the flag processor to apply registered transforms (in lfs's
    case, drastically reducing the size of large blobs).
  - finally, we proceed with the 2GB size check.

Note: In the case a cachedelta is passed to 'addrevision()' and we detect the
flag processor modified the revision data, we chose to trust the flag processor
and drop the cachedelta.

diff --git a/mercurial/bundlerepo.py b/mercurial/bundlerepo.py
--- a/mercurial/bundlerepo.py
+++ b/mercurial/bundlerepo.py
@@ -148,7 +148,10 @@
             delta = self._chunk(chain.pop())
             text = mdiff.patches(text, [delta])
 
-        self.checkhash(text, node, rev=rev)
+        text, validatehash = self._processflags(text, self.flags(rev),
+                                                'read', raw=raw)
+        if validatehash:
+            self.checkhash(text, node, rev=rev)
         self._cache = (node, rev, text)
         return text
 
diff --git a/mercurial/revlog.py b/mercurial/revlog.py
--- a/mercurial/revlog.py
+++ b/mercurial/revlog.py
@@ -56,6 +56,10 @@
 REVIDX_ISCENSORED = (1 << 15) # revision has censor metadata, must be verified
 REVIDX_DEFAULT_FLAGS = 0
 REVIDX_KNOWN_FLAGS = REVIDX_ISCENSORED
+# stable order in which flags need to be processed and their processors applied
+REVIDX_FLAGS_ORDER = [
+    REVIDX_ISCENSORED,
+]
 
 # max size of revlog with inline data
 _maxinline = 131072
@@ -64,6 +68,39 @@
 RevlogError = error.RevlogError
 LookupError = error.LookupError
 CensoredNodeError = error.CensoredNodeError
+ProgrammingError = error.ProgrammingError
+
+# Store flag processors (cf. 'addflagprocessor()' to register)
+_flagprocessors = { }
+
+def addflagprocessor(flag, processor):
+    """Register a flag processor on a revision data flag.
+
+    Invariant:
+    - Flags need to be defined in REVIDX_KNOWN_FLAGS and REVIDX_FLAGS_ORDER.
+    - Only one flag processor can be registered on a specific flag.
+    - flagprocessors must be 3-tuples of functions (read, write, raw) with the
+      following signatures:
+          - (read)  f(self, text) -> newtext, bool
+          - (write) f(self, text) -> newtext, bool
+          - (raw)   f(self, text) -> bool
+      The boolean returned by these transforms is used to determine whether
+      'newtext' can be used for hash integrity checking.
+
+      Note: The 'raw' transform is used for changegroup generation and in some
+      debug commands. In this case the transform only indicates whether the
+      contents can be used for hash integrity checks.
+    """
+    if not flag & REVIDX_KNOWN_FLAGS:
+        raise ProgrammingError(_(
+            "cannot register processor on unknown flag '%x'." % (flag)))
+    if flag not in REVIDX_FLAGS_ORDER:
+        raise ProgrammingError(_(
+            "flag '%x' undefined in REVIDX_FLAGS_ORDER." % (flag)))
+    if flag in _flagprocessors:
+        raise error.Abort(_(
+            "cannot register multiple processors on flag '%x'." % (flag)))
+    _flagprocessors[flag] = processor
 
 def getoffset(q):
     return int(q >> 16)
@@ -1231,11 +1268,6 @@
         if rev is None:
             rev = self.rev(node)
 
-        # check rev flags
-        if self.flags(rev) & ~REVIDX_KNOWN_FLAGS:
-            raise RevlogError(_('incompatible revision flag %x') %
-                              (self.flags(rev) & ~REVIDX_KNOWN_FLAGS))
-
         chain, stopped = self._deltachain(rev, stoprev=cachedrev)
         if stopped:
             text = self._cache[2]
@@ -1249,7 +1281,12 @@
             bins = bins[1:]
 
         text = mdiff.patches(text, bins)
-        self.checkhash(text, node, rev=rev)
+
+        text, validatehash = self._processflags(text, self.flags(rev), 'read',
+                                                raw=raw)
+        if validatehash:
+            self.checkhash(text, node, rev=rev)
+
         self._cache = (node, rev, text)
         return text
 
@@ -1261,6 +1298,58 @@
         """
         return hash(text, p1, p2)
 
+    def _processflags(self, text, flags, operation, raw=False):
+        """Inspect revision data flags and applies transforms defined by
+        registered flag processors.
+
+        ``text`` - the revision data to process
+        ``flags`` - the revision flags
+        ``operation`` - the operation being performed (read of write)
+        ``raw`` - an optional argument describing if the raw transform should be
+        applied.
+
+        This method processes the flags in the order (or reverse order if
+        ``operation`` is 'write') defined by REVIDX_FLAGS_ORDER, applying the
+        flag processors registered for present flags. The order of flags defined
+        in REVIDX_FLAGS_ORDER needs to be stable to allow non-commutativity.
+
+        Returns a 2-tuple of ``(text, validatehash)`` where ``text`` is the
+        processed text and ``validatehash`` is a bool indicating whether the
+        returned text should be checked for hash integrity.
+
+        Note: If the ``raw`` argument is set, it has precedence over the
+        operation and will only update the value of ``validatehash``.
+        """
+        if not operation in ['read', 'write']:
+            raise ProgrammingError(_("invalid operation '%s'") % (operation))
+        # Check all flags are known.
+        if flags & ~REVIDX_KNOWN_FLAGS:
+            raise RevlogError(_("incompatible revision flag '%x'") %
+                              (flags & ~REVIDX_KNOWN_FLAGS))
+        validatehash = True
+        # Depending on the operation (read or write), the order might be
+        # reversed due to non-commutative transforms.
+        orderedflags = REVIDX_FLAGS_ORDER
+        if operation == 'write':
+            orderedflags = reversed(orderedflags)
+
+        for flag in orderedflags:
+            # If a flagprocessor has been registered for a known flag, apply the
+            # related operation transform and update result tuple.
+            if flag & flags:
+                vhash = True
+                processor = _flagprocessors.get(flag, None)
+
+                if raw:
+                    vhash = processor[2](self, text)
+                elif operation == 'read':
+                    text, vhash = processor[0](self, text)
+                else: # write operation
+                    text, vhash = processor[1](self, text)
+                validatehash = validatehash and vhash
+
+        return text, validatehash
+
     def checkhash(self, text, node, p1=None, p2=None, rev=None):
         """Check node hash integrity.
 
@@ -1345,6 +1434,15 @@
             raise RevlogError(_("attempted to add linkrev -1 to %s")
                               % self.indexfile)
 
+        if flags:
+            node = node or self.hash(text, p1, p2)
+
+        newtext, validatehash = self._processflags(text, flags, 'write')
+
+        if newtext != text and cachedelta is not None:
+            cachedelta = None
+        text = newtext
+
         if len(text) > _maxentrysize:
             raise RevlogError(
                 _("%s: size of %d bytes exceeds maximum revlog storage of 2GiB")
@@ -1354,10 +1452,14 @@
         if node in self.nodemap:
             return node
 
+        if validatehash:
+            self.checkhash(text, node, p1=p1, p2=p2)
+
         dfh = None
         if not self._inline:
             dfh = self.opener(self.datafile, "a+")
         ifh = self.opener(self.indexfile, "a+", checkambig=self._checkambig)
+
         try:
             return self._addrevision(node, text, transaction, link, p1, p2,
                                      flags, cachedelta, ifh, dfh)
@@ -1423,7 +1525,7 @@
         - text is optional (can be None); if not set, cachedelta must be set.
           if both are set, they must correspond to each other.
         - raw is optional; if set to True, it indicates the revision data is to
-          be treated by processflags() as raw. It is usually set by changegroup
+          be treated by _processflags() as raw. It is usually set by changegroup
           generation and debug commands.
         """
         btext = [text]
@@ -1448,7 +1550,11 @@
                 btext[0] = mdiff.patch(basetext, delta)
 
             try:
-                self.checkhash(btext[0], node, p1=p1, p2=p2)
+                btext[0], validatehash = self._processflags(btext[0],
+                                                            flags, 'read',
+                                                            raw=raw)
+                if validatehash:
+                    self.checkhash(btext[0], node, p1=p1, p2=p2)
                 if flags & REVIDX_ISCENSORED:
                     raise RevlogError(_('node %s is not censored') % node)
             except CensoredNodeError:
diff --git a/tests/base64ext.py b/tests/base64ext.py
new file mode 100644
--- /dev/null
+++ b/tests/base64ext.py
@@ -0,0 +1,42 @@
+# coding=UTF-8
+
+from __future__ import absolute_import
+
+import base64
+
+from mercurial import (
+    extensions,
+    filelog,
+    revlog,
+)
+
+import flagprocessorsetup
+
+def bypass(self, text):
+    return False
+
+def encode(self, text):
+    return (base64.b64encode(text), False)
+
+def decode(self, text):
+    return (base64.b64decode(text), True)
+
+def addrevision(orig, self, text, transaction, link, p1, p2, cachedelta=None,
+                node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
+    if '[BASE64]' in text:
+        flags |= flagprocessorsetup.REVIDX_USR1
+    return orig(self, text, transaction, link, p1, p2, cachedelta=cachedelta,
+                node=node, flags=flags)
+
+def extsetup(ui):
+    wrapfunction = extensions.wrapfunction
+    wrapfunction(filelog.filelog, 'addrevision', addrevision)
+
+    revlog.addflagprocessor(
+        flagprocessorsetup.REVIDX_USR1,
+        (
+            decode,
+            encode,
+            bypass,
+        ),
+    )
diff --git a/tests/flagprocessorsetup.py b/tests/flagprocessorsetup.py
new file mode 100644
--- /dev/null
+++ b/tests/flagprocessorsetup.py
@@ -0,0 +1,43 @@
+# coding=UTF-8
+
+from __future__ import absolute_import
+
+import base64
+
+from mercurial import (
+    changegroup,
+    extensions,
+    revlog,
+)
+
+REVIDX_USR0 = (1 << 3)
+REVIDX_USR1 = (1 << 2)
+REVIDX_USR2 = (1 << 1)
+REVIDX_USR3 = 1
+
+def supportedoutgoingversions(orig, repo):
+    versions = orig(repo)
+    versions.discard('01')
+    versions.discard('02')
+    versions.add('03')
+    return versions
+
+def allsupportedversions(orig, ui):
+    versions = orig(ui)
+    versions.add('03')
+    return versions
+
+def extsetup(ui):
+    # Enable changegroup3 for flags to be sent over the wire
+    wrapfunction = extensions.wrapfunction
+    wrapfunction(changegroup,
+                 'supportedoutgoingversions',
+                 supportedoutgoingversions)
+    wrapfunction(changegroup,
+                 'allsupportedversions',
+                 allsupportedversions)
+
+    # Test only: add new flags for test extensions to use
+    testflags = [REVIDX_USR0, REVIDX_USR1, REVIDX_USR2, REVIDX_USR3]
+    revlog.REVIDX_KNOWN_FLAGS |= reduce(lambda x, y: x | y, testflags)
+    revlog.REVIDX_FLAGS_ORDER.extend(testflags)
diff --git a/tests/gzipext.py b/tests/gzipext.py
new file mode 100644
--- /dev/null
+++ b/tests/gzipext.py
@@ -0,0 +1,42 @@
+# coding=UTF-8
+
+from __future__ import absolute_import
+
+import zlib
+
+from mercurial import (
+    extensions,
+    filelog,
+    revlog,
+)
+
+import flagprocessorsetup
+
+def bypass(self, text):
+    return False
+
+def compress(self, text):
+    return (zlib.compress(text), False)
+
+def decompress(self, text):
+    return (zlib.decompress(text), True)
+
+def addrevision(orig, self, text, transaction, link, p1, p2, cachedelta=None,
+                node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
+    if '[GZIP]' in text:
+        flags |= flagprocessorsetup.REVIDX_USR2
+    return orig(self, text, transaction, link, p1, p2, cachedelta=cachedelta,
+                node=node, flags=flags)
+
+def extsetup(ui):
+    wrapfunction = extensions.wrapfunction
+    wrapfunction(filelog.filelog, 'addrevision', addrevision)
+
+    revlog.addflagprocessor(
+        flagprocessorsetup.REVIDX_USR2,
+        (
+            decompress,
+            compress,
+            bypass
+        )
+    )
diff --git a/tests/nopext.py b/tests/nopext.py
new file mode 100644
--- /dev/null
+++ b/tests/nopext.py
@@ -0,0 +1,37 @@
+# coding=UTF-8
+
+from __future__ import absolute_import
+
+from mercurial import (
+    extensions,
+    filelog,
+    revlog,
+)
+
+import flagprocessorsetup
+
+def validatehash(self, text):
+    return True
+
+def donothing(self, text):
+    return (text, True)
+
+def addrevision(orig, self, text, transaction, link, p1, p2, cachedelta=None,
+                node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
+    if '[NOP]' in text:
+        flags |= flagprocessorsetup.REVIDX_USR0
+    return orig(self, text, transaction, link, p1, p2, cachedelta=cachedelta,
+                node=node, flags=flags)
+
+def extsetup(ui):
+    wrapfunction = extensions.wrapfunction
+    wrapfunction(filelog.filelog, 'addrevision', addrevision)
+
+    revlog.addflagprocessor(
+        flagprocessorsetup.REVIDX_USR0,
+        (
+            donothing,
+            donothing,
+            validatehash,
+        )
+    )
diff --git a/tests/test-flagprocessor.t b/tests/test-flagprocessor.t
new file mode 100644
--- /dev/null
+++ b/tests/test-flagprocessor.t
@@ -0,0 +1,176 @@
+  $ hg init server
+  $ cd server
+  $ cat >> .hg/hgrc << EOF
+  > [extensions]
+  > flagprocessorsetup=$TESTDIR/flagprocessorsetup.py
+  > nop=$TESTDIR/nopext.py
+  > base64=$TESTDIR/base64ext.py
+  > gzip=$TESTDIR/gzipext.py
+  > EOF
+  $ cd ../
+
+# Clone server and enable extensions
+  $ hg clone -q server client
+  $ cd client
+  $ cat >> .hg/hgrc << EOF
+  > [extensions]
+  > flagprocessorsetup=$TESTDIR/flagprocessorsetup.py
+  > nop=$TESTDIR/nopext.py
+  > base64=$TESTDIR/base64ext.py
+  > gzip=$TESTDIR/gzipext.py
+  > EOF
+
+# Commit file that will trigger the nop extension
+  $ echo '[NOP]' > nop
+  $ hg commit -Aqm "nop"
+
+# Commit file that will trigger the base64 extension
+  $ echo '[BASE64]' > base64
+  $ hg commit -Aqm 'base64'
+
+# Commit file that will trigger the gzip extension
+  $ echo '[GZIP]' > gzip
+  $ hg commit -Aqm 'gzip'
+
+# Commit file that will trigger nop and base64
+  $ echo '[NOP][BASE64]' > nop-base64
+  $ hg commit -Aqm 'nop+base64'
+
+# Commit file that will trigger nop and gzip
+  $ echo '[NOP][GZIP]' > nop-gzip
+  $ hg commit -Aqm 'nop+gzip'
+
+# Commit file that will trigger base64 and gzip
+  $ echo '[BASE64][GZIP]' > base64-gzip
+  $ hg commit -Aqm 'base64+gzip'
+
+# Commit file that will trigger base64, gzip and nop
+  $ echo '[BASE64][GZIP][NOP]' > base64-gzip-nop
+  $ hg commit -Aqm 'base64+gzip+nop'
+
+# TEST: ensure the revision data is consistent
+  $ hg cat nop
+  [NOP]
+  $ hg debugdata nop 0
+  [NOP]
+
+  $ hg cat -r . base64
+  [BASE64]
+  $ hg debugdata base64 0
+  W0JBU0U2NF0K (no-eol)
+
+  $ hg cat -r . gzip
+  [GZIP]
+  $ hg debugdata gzip 0
+  x\x9c\x8bv\x8f\xf2\x0c\x88\xe5\x02\x00\x08\xc8\x01\xfd (no-eol) (esc)
+
+  $ hg cat -r . nop-base64
+  [NOP][BASE64]
+  $ hg debugdata nop-base64 0
+  W05PUF1bQkFTRTY0XQo= (no-eol)
+
+  $ hg cat -r . nop-gzip
+  [NOP][GZIP]
+  $ hg debugdata nop-gzip 0
+  x\x9c\x8b\xf6\xf3\x0f\x88\x8dv\x8f\xf2\x0c\x88\xe5\x02\x00\x199\x03\xa2 (no-eol) (esc)
+
+  $ hg cat -r . base64-gzip
+  [BASE64][GZIP]
+  $ hg debugdata base64-gzip 0
+  eJyLdnIMdjUziY12j/IMiOUCACLBBDo= (no-eol)
+
+  $ hg cat -r . base64-gzip-nop
+  [BASE64][GZIP][NOP]
+  $ hg debugdata base64-gzip-nop 0
+  eJyLdnIMdjUziY12j/IMiI328w+I5QIAPj8F3w== (no-eol)
+
+# Push to the server
+  $ hg push
+  pushing to $TESTTMP/server
+  searching for changes
+  adding changesets
+  adding manifests
+  adding file changes
+  added 7 changesets with 7 changes to 7 files
+
+# Initialize new client (not cloning) and setup extension
+  $ cd ..
+  $ hg init client2
+  $ cd client2
+  $ cat >> .hg/hgrc << EOF
+  > [paths]
+  > default = $TESTTMP/server
+  > [extensions]
+  > flagprocessorsetup=$TESTDIR/flagprocessorsetup.py
+  > nop=$TESTDIR/nopext.py
+  > base64=$TESTDIR/base64ext.py
+  > gzip=$TESTDIR/gzipext.py
+  > EOF
+
+# Pull from server and update to latest revision
+  $ hg pull default
+  pulling from $TESTTMP/server
+  requesting all changes
+  adding changesets
+  adding manifests
+  adding file changes
+  added 7 changesets with 7 changes to 7 files
+  (run 'hg update' to get a working copy)
+  $ hg update
+  7 files updated, 0 files merged, 0 files removed, 0 files unresolved
+
+# TEST: ensure the revision data is consistent
+  $ hg cat nop
+  [NOP]
+  $ hg debugdata nop 0
+  [NOP]
+
+  $ hg cat -r . base64
+  [BASE64]
+  $ hg debugdata base64 0
+  W0JBU0U2NF0K (no-eol)
+
+  $ hg cat -r . gzip
+  [GZIP]
+  $ hg debugdata gzip 0
+  x\x9c\x8bv\x8f\xf2\x0c\x88\xe5\x02\x00\x08\xc8\x01\xfd (no-eol) (esc)
+
+  $ hg cat -r . nop-base64
+  [NOP][BASE64]
+  $ hg debugdata nop-base64 0
+  W05PUF1bQkFTRTY0XQo= (no-eol)
+
+  $ hg cat -r . nop-gzip
+  [NOP][GZIP]
+  $ hg debugdata nop-gzip 0
+  x\x9c\x8b\xf6\xf3\x0f\x88\x8dv\x8f\xf2\x0c\x88\xe5\x02\x00\x199\x03\xa2 (no-eol) (esc)
+
+  $ hg cat -r . base64-gzip
+  [BASE64][GZIP]
+  $ hg debugdata base64-gzip 0
+  eJyLdnIMdjUziY12j/IMiOUCACLBBDo= (no-eol)
+
+  $ hg cat -r . base64-gzip-nop
+  [BASE64][GZIP][NOP]
+  $ hg debugdata base64-gzip-nop 0
+  eJyLdnIMdjUziY12j/IMiI328w+I5QIAPj8F3w== (no-eol)
+
+# Create new client and enable extensions registering processors on the same flag
+  $ cd ../
+  $ hg init server2
+  $ cd server2
+  $ cat >> .hg/hgrc << EOF
+  > [extensions]
+  > flagprocessorsetup=$TESTDIR/flagprocessorsetup.py
+  > nop=$TESTDIR/nopext.py
+  > nop2=$TESTDIR/nopext.py
+  > EOF
+
+# TEST: ensure we cannot register several flag processors on the same flag
+  $ echo 'test' > file
+  $ hg commit -Aqm 'add file'
+  abort: cannot register multiple processors on flag '8'.
+  [255]
+
+
+


More information about the Mercurial-devel mailing list