[PATCH 1 of 2] Add a progress indicator

Stefano Tortarolo stefano.tortarolo at gmail.com
Thu Dec 11 16:25:34 CST 2008


# HG changeset patch
# User Stefano Tortarolo <stefano.tortarolo at gmail.com>
# Date 1229025213 -3600
# Node ID 5ab5ab5dca0b666901f6c31c56005ae9cfabee89
# Parent  d8cd79fbed3c7b358dd820ff5dee44a7cff362bc
Add a progress indicator

diff --git a/mercurial/progress.py b/mercurial/progress.py
new file mode 100644
--- /dev/null
+++ b/mercurial/progress.py
@@ -0,0 +1,174 @@
+# progress.py - Progress indicator for Mercurial
+#
+# Copyright 2008 by Stefano Tortarolo <stefano.tortarolo at gmail dot com>
+#
+# This software may be used and distributed according to the terms
+# of the GNU General Public License, incorporated herein by reference.
+import time, sys
+
+def createprogress(msg, displayer=None, maxval=None, activable=True,
+                        threshold=5, showmsg=False, fd=sys.stderr):
+    'Factory method, ensures that progresshandler has a suitable displayer'
+    if displayer is None:
+        displayer = defaultdisplayer(fd)
+    if activable and showmsg:
+        displayer.write(msg)
+    return progresshandler(msg, displayer, maxval, activable, threshold)
+
+class progresshandler:
+    '''This class provides a progress indicator
+    '''
+    def __init__(self, msg, displayer, maxval, activable, threshold):
+        self._activable = activable and displayer.isactive() and threshold >= 0
+        self._creationtime = time.time()
+        self._threshold = threshold
+        self._msg = msg
+        self._value = 0
+        self._displayer = displayer
+        if maxval is not None:
+            maxval = float(maxval)
+        self._maxval = maxval
+
+    def update(self, value=1, delta=True):
+        'Update the current status and display it'
+        if not self._activable:
+            return
+        if delta:
+            self._value += value
+        else:
+            self._value = value
+        if self._isactivated():
+           self._display()
+
+    def finish(self):
+        '''The process is done
+        Note that even though we could detect a completion using maxval <= value
+        we rely on finish to ensure that maxval is consistent'''
+        if ((self._activable and self._isactivated()) 
+                            or self._displayer.isdirty()):
+            self._display(finish=True)
+        self._activable = False
+
+    def interrupt(self):
+        'Stop the progress indicator and notify the displayer'
+        self._activable = False
+        if self._displayer.isdirty():
+            self._displayer.clear()
+
+    def _isactivated(self):
+        'Return whether the indicator is allowed to show something or not'
+        return (self._creationtime + self._threshold) < time.time()
+
+    def _display(self, finish=False):
+        'Pass the current status to the displayer'
+        self._displayer.display(self._msg, self._value, self._maxval, finish)
+
+class defaultdisplayer:
+    'A simple displayer to show the current progress status'
+    def __init__(self, fd):
+        self._fd = fd
+        self._progressmsg = '......'
+        self._clean = 0
+
+    def write(self, *args):
+        'Show the given message'
+        if self.isactive():
+            msg = '%s' % (args) 
+            self._fd.write('\r%s' % (msg))
+            clean = len(msg)
+            if clean > self._clean:
+                    self._clean = clean
+
+    def isdirty(self):
+        'True if something has been showed'
+        return self._clean > 0
+
+    def clear(self):
+        'Clean the output'
+        if self.isdirty():
+            status = ' '.ljust(self._clean) + '\r'
+            self.write(status)
+
+    def display(self, msg, value, maxval, finish):
+        'Display the current progress status'
+        if finish:
+            status = ' '.ljust(self._clean) + '\n'
+        else:
+            if maxval is not None:
+                # we have a maxval defined
+                status = " %d/%d" % (value, maxval)
+            else:
+                statuslen = len(self._progressmsg)
+                value %= statuslen + 1
+                status = self._progressmsg[:value]
+                status = status.ljust(statuslen)
+        msg = '\r%s%s' % (msg, status)
+        self.write(msg)
+
+    def isactive(self):
+        'True if the displayer is able to show user its status'
+        return self._fd.isatty()
+
+if __name__ == '__main__':
+    print "% Test providing a maximum value"
+    p = createprogress(msg='Test maxval', maxval=10)
+    for i in range(10):
+        time.sleep(1)
+        p.update()
+    p.finish()
+
+    print "% Test with no arguments"
+    p = createprogress(msg='Test no maxval', showmsg=True)
+    for i in range(10):
+        time.sleep(1)
+        p.update()
+    p.finish()
+
+    print "% Doesn't show anything before THRESHOLD seconds" 
+    p = createprogress(msg='Test threshold', maxval=3, threshold=3)
+    p.update()
+    time.sleep(3)
+    p.update()
+    time.sleep(2)
+    p.update()
+    p.finish()
+
+    print "% Test completion before THRESHOLD seconds" 
+    p = createprogress(msg='Test completion before threshold', threshold=5)
+    p.update()
+    time.sleep(1)
+    p.update()
+    p.finish()
+
+    print "% Test not activable progress" 
+    p = createprogress(msg='Test not activable progress', activable=False,
+                            threshold=0)
+    p.update()
+
+    print "% Test interruption with dirty output" 
+    p = createprogress(msg='Test interruption with dirty output', threshold=0)
+    p.update()
+    time.sleep(2)
+    p.interrupt()
+
+    print "% Test threshold -1 with show message (like simple print)" 
+    p = createprogress(msg='Test simple print', threshold=-1, showmsg=True)
+    p.update()
+    time.sleep(2)
+    p.finish()
+
+    print "% Provide a different display method"
+    class alternativedisplayer(defaultdisplayer):
+        def display(self, msg, value, maxval, finish):
+            remaining = maxval - value
+            if finish:
+                self._fd.write('\rOperation completed!\n')
+            else:
+                self._fd.write('\rRemaining: %d' % remaining)
+
+    p = createprogress(msg='Display', maxval=5, threshold=1, 
+                                    displayer=alternativedisplayer(sys.stderr))
+    for i in range(5):
+        time.sleep(1)
+        p.update()
+    p.finish()
diff --git a/mercurial/ui.py b/mercurial/ui.py
--- a/mercurial/ui.py
+++ b/mercurial/ui.py
@@ -7,7 +7,7 @@
 
 from i18n import _
 import errno, getpass, os, re, socket, sys, tempfile
-import ConfigParser, traceback, util
+import ConfigParser, traceback, util, progress
 
 def dupconfig(orig):
     new = util.configparser(orig.defaults())
@@ -65,6 +65,7 @@
                     self.overlay = util.configparser()
                 updateconfig(parentui.overlay, self.overlay)
             self.buffers = parentui.buffers
+        self._progress = self.progress('Fake progress', activable=False)
 
     def __getattr__(self, key):
         return getattr(self.parentui, key)
@@ -375,6 +376,7 @@
         return "".join(self.buffers.pop())
 
     def write(self, *args):
+        self._progress.interrupt()
         if self.buffers:
             self.buffers[-1].extend([str(a) for a in args])
         else:
@@ -383,6 +385,7 @@
 
     def write_err(self, *args):
         try:
+            self._progress.interrupt()
             if not sys.stdout.closed: sys.stdout.flush()
             for a in args:
                 sys.stderr.write(str(a))
@@ -425,6 +428,7 @@
         if not self.interactive:
             self.note(msg, ' ', default, "\n")
             return default
+        self._progress.interrupt()
         while True:
             try:
                 r = self._readline(msg + ' ')
@@ -439,6 +443,7 @@
 
     def getpass(self, prompt=None, default=None):
         if not self.interactive: return default
+        self._progress.interrupt()
         return getpass.getpass(prompt or _('password: '))
     def status(self, *msg):
         if not self.quiet: self.write(*msg)
@@ -449,6 +454,7 @@
     def debug(self, *msg):
         if self.debugflag: self.write(*msg)
     def edit(self, text, user):
+        self._progress.interrupt()
         (fd, name) = tempfile.mkstemp(prefix="hg-editor-", suffix=".txt",
                                       text=True)
         try:
@@ -479,6 +485,14 @@
             traceback.print_exc()
         return self.traceback
 
+    def progress(self, msg, displayer=None, maxval=None, activable=True,
+                        showmsg=False, fd=sys.stderr):
+        'Return a progress indicator object'
+        threshold = int(self.config('ui', 'progressthreshold') or 5)
+        activable = activable and not self.quiet
+        return progress.createprogress(msg, displayer, maxval, activable,
+                                        threshold, showmsg, fd)
+
     def geteditor(self):
         '''return editor to use'''
         return (os.environ.get("HGEDITOR") or


More information about the Mercurial-devel mailing list