[PATCH 1 of 2] Add a progress indicator

Stefano Tortarolo stefano.tortarolo at gmail.com
Sat Dec 13 11:49:45 CST 2008


# HG changeset patch
# User Stefano Tortarolo <stefano.tortarolo at gmail.com>
# Date 1229025213 -3600
# Node ID 36b7a09078cc38c0ffc779289912ca23655a0743
# Parent  d8cd79fbed3c7b358dd820ff5dee44a7cff362bc
Add a progress indicator

diff --git a/mercurial/progress.py b/mercurial/progress.py
new file mode 100644
--- /dev/null
+++ b/mercurial/progress.py
@@ -0,0 +1,182 @@
+# progress.py - Progress indicator for Mercurial
+#
+# Copyright 2008 by Stefano Tortarolo <stefano.tortarolo at gmail dot com>
+#
+# This software may be used and distributed according to the terms
+# of the GNU General Public License, incorporated herein by reference.
+import time, sys
+
+def createprogress(msg, displayer, quiet=False, maxval=None, threshold=5):
+    'Factory method, ensures that progresshandler has a suitable displayer'
+    if quiet:
+        displayer = None
+    if displayer is not None:
+        displayer = displayer(msg)
+    return progresshandler(maxval, displayer, threshold)
+
+class progresshandler:
+    'This class provides a progress indicator'
+    def __init__(self, maxval, displayer, threshold):
+        self._threshold = threshold
+        self._creationtime = time.time()
+        self._value = 0
+        if maxval is not None:
+            maxval = float(maxval)
+        self._maxval = maxval
+        self._displayer = displayer
+        if displayer is not None:
+            self._activable = self._displayer.isactive() and threshold >= 0
+            self._displayer.setprogresshandler(self)
+        else:
+            self._activable = False
+
+    def update(self, value=1, delta=True):
+        'Update the current status and display it'
+        if not self._activable:
+            return
+        if delta:
+            self._value += value
+        else:
+            self._value = value
+        if self._isactivated():
+           self._displayer.update()
+
+    def finish(self):
+        '''The process is done
+        Note that even though we could detect a completion using maxval <= value
+        we rely on finish to ensure that maxval is consistent'''
+        self._activable = False
+        if self._displayer is not None:
+            self._displayer.finish()
+
+    def interrupt(self):
+        'Stop the progress indicator and notify the displayer'
+        self._activable = False
+        if self._displayer is not None:
+            self._displayer.interrupt()
+
+    def _isactivated(self):
+        'Return whether the indicator is allowed to show something or not'
+        return (self._creationtime + self._threshold) < time.time()
+
+class defaultdisplayer:
+    'A simple displayer to show the current progress status'
+    def __init__(self, msg, fd=sys.stderr):
+        self._fd = fd
+        self._progressmsg = '......'
+        self._clean = 0
+        self._progress = None
+        self._msg = msg
+        self._write(self._msg)
+
+    def setprogresshandler(self, progress):
+        'Set the progress handler'
+        self._progress = progress
+
+    def _write(self, *args):
+        'Show the given message'
+        if self.isactive():
+            msg = '%s' % (args) 
+            self._fd.write('\r%s' % (msg))
+            clean = len(msg)
+            if clean > self._clean:
+                    self._clean = clean
+
+    def getvalue(self):
+        return self._progress._value
+
+    def getmaxvalue(self):
+        return self._progress._maxval
+
+    def isdirty(self):
+        'True if something has been showed'
+        return self._clean > 0
+
+    def interrupt(self):
+        'Clean the output'
+        if self.isdirty():
+            status = ' '.ljust(self._clean) + '\r'
+            self._write(status)
+
+    def finish(self):
+        'The process is done'
+        if self.isdirty():
+            status = self._msg + ' '.ljust(self._clean) + '\n'
+            self._write(status)
+
+    def update(self):
+        'Display the current progress status'
+        maxval = self.getmaxvalue()
+        value = self.getvalue()
+        if maxval is not None:
+            # we have a maxval defined
+            status = " %d/%d" % (value, maxval)
+        else:
+            statuslen = len(self._progressmsg)
+            value %= statuslen + 1
+            status = self._progressmsg[:value]
+            status = status.ljust(statuslen)
+        msg = '\r%s%s' % (self._msg, status)
+        self._write(msg)
+
+    def isactive(self):
+        'True if the displayer is able to show user its status'
+        return self._fd.isatty()
+
+if __name__ == '__main__':
+    print "% Test providing a maximum value"
+    p = createprogress(msg='Test maxval', displayer=defaultdisplayer, 
+                                                threshold=2, maxval=5)
+    for i in range(5):
+        time.sleep(1)
+        p.update()
+    p.finish()
+
+    print "% Test with no arguments"
+    p = createprogress(msg='Test no maxval', displayer=defaultdisplayer,
+                                                            threshold=2)
+    for i in range(5):
+        time.sleep(1)
+        p.update()
+    p.finish()
+
+    print "% Test completion before THRESHOLD seconds" 
+    p = createprogress(msg='This must not appear', 
+                                        displayer=defaultdisplayer, threshold=5)
+    p.update()
+    time.sleep(1)
+    p.update()
+    p.finish()
+
+    print "% Test without displayer" 
+    p = createprogress(msg='Test without displayer', displayer=None, threshold=0)
+    p.update()
+
+    print "% Test interruption with dirty output" 
+    p = createprogress(msg='Test interruption with dirty output', 
+                                    displayer=defaultdisplayer, threshold=0)
+    p.update()
+    time.sleep(2)
+    p.interrupt()
+
+    print "% Test threshold -1 with show message (like simple print)" 
+    p = createprogress(msg='Test simple print', displayer=defaultdisplayer, 
+                                        threshold=-1)
+    p.update()
+    time.sleep(2)
+    p.finish()
+
+    print "% Provide a different display method"
+    class alternativedisplayer(defaultdisplayer):
+        def update(self):
+            remaining = self.getmaxvalue() - self.getvalue()
+            self._write('\rRemaining: %d' % remaining)
+        def finish(self):
+            self._write('\rOperation completed!\n')
+
+    p = createprogress(msg='Display', maxval=5, threshold=1, 
+                                    displayer=alternativedisplayer)
+    for i in range(5):
+        time.sleep(1)
+        p.update()
+    p.finish()
diff --git a/mercurial/ui.py b/mercurial/ui.py
--- a/mercurial/ui.py
+++ b/mercurial/ui.py
@@ -7,7 +7,7 @@
 
 from i18n import _
 import errno, getpass, os, re, socket, sys, tempfile
-import ConfigParser, traceback, util
+import ConfigParser, traceback, util, progress
 
 def dupconfig(orig):
     new = util.configparser(orig.defaults())
@@ -65,6 +65,10 @@
                     self.overlay = util.configparser()
                 updateconfig(parentui.overlay, self.overlay)
             self.buffers = parentui.buffers
+        self._displayer = None
+        self._progress = self.progress()
+        self.setprogressdisplayer(progress.defaultdisplayer)
+
 
     def __getattr__(self, key):
         return getattr(self.parentui, key)
@@ -375,6 +379,7 @@
         return "".join(self.buffers.pop())
 
     def write(self, *args):
+        self._progress.interrupt()
         if self.buffers:
             self.buffers[-1].extend([str(a) for a in args])
         else:
@@ -383,6 +388,7 @@
 
     def write_err(self, *args):
         try:
+            self._progress.interrupt()
             if not sys.stdout.closed: sys.stdout.flush()
             for a in args:
                 sys.stderr.write(str(a))
@@ -425,6 +431,7 @@
         if not self.interactive:
             self.note(msg, ' ', default, "\n")
             return default
+        self._progress.interrupt()
         while True:
             try:
                 r = self._readline(msg + ' ')
@@ -439,6 +446,7 @@
 
     def getpass(self, prompt=None, default=None):
         if not self.interactive: return default
+        self._progress.interrupt()
         return getpass.getpass(prompt or _('password: '))
     def status(self, *msg):
         if not self.quiet: self.write(*msg)
@@ -449,6 +457,7 @@
     def debug(self, *msg):
         if self.debugflag: self.write(*msg)
     def edit(self, text, user):
+        self._progress.interrupt()
         (fd, name) = tempfile.mkstemp(prefix="hg-editor-", suffix=".txt",
                                       text=True)
         try:
@@ -479,6 +488,16 @@
             traceback.print_exc()
         return self.traceback
 
+    def progress(self, msg='', maxval=None, fd=sys.stderr):
+        'Return a progress indicator object'
+        threshold = int(self.config('progress', 'threshold') or 5)
+        self._progress = progress.createprogress(msg, self._displayer, 
+                                        self.quiet, maxval, threshold)
+        return self._progress
+
+    def setprogressdisplayer(self, displayer):
+        self._displayer = displayer
+
     def geteditor(self):
         '''return editor to use'''
         return (os.environ.get("HGEDITOR") or


More information about the Mercurial-devel mailing list