[PATCH 1 of 2] Add a progress indicator
Stefano Tortarolo
stefano.tortarolo at gmail.com
Thu Dec 11 16:25:34 CST 2008
# HG changeset patch
# User Stefano Tortarolo <stefano.tortarolo at gmail.com>
# Date 1229025213 -3600
# Node ID 5ab5ab5dca0b666901f6c31c56005ae9cfabee89
# Parent d8cd79fbed3c7b358dd820ff5dee44a7cff362bc
Add a progress indicator
diff --git a/mercurial/progress.py b/mercurial/progress.py
new file mode 100644
--- /dev/null
+++ b/mercurial/progress.py
@@ -0,0 +1,174 @@
+# progress.py - Progress indicator for Mercurial
+#
+# Copyright 2008 by Stefano Tortarolo <stefano.tortarolo at gmail dot com>
+#
+# This software may be used and distributed according to the terms
+# of the GNU General Public License, incorporated herein by reference.
+import time, sys
+
+def createprogress(msg, displayer=None, maxval=None, activable=True,
+ threshold=5, showmsg=False, fd=sys.stderr):
+ 'Factory method, ensures that progresshandler has a suitable displayer'
+ if displayer is None:
+ displayer = defaultdisplayer(fd)
+ if activable and showmsg:
+ displayer.write(msg)
+ return progresshandler(msg, displayer, maxval, activable, threshold)
+
+class progresshandler:
+ '''This class provides a progress indicator
+ '''
+ def __init__(self, msg, displayer, maxval, activable, threshold):
+ self._activable = activable and displayer.isactive() and threshold >= 0
+ self._creationtime = time.time()
+ self._threshold = threshold
+ self._msg = msg
+ self._value = 0
+ self._displayer = displayer
+ if maxval is not None:
+ maxval = float(maxval)
+ self._maxval = maxval
+
+ def update(self, value=1, delta=True):
+ 'Update the current status and display it'
+ if not self._activable:
+ return
+ if delta:
+ self._value += value
+ else:
+ self._value = value
+ if self._isactivated():
+ self._display()
+
+ def finish(self):
+ '''The process is done
+ Note that even though we could detect a completion using maxval <= value
+ we rely on finish to ensure that maxval is consistent'''
+ if ((self._activable and self._isactivated())
+ or self._displayer.isdirty()):
+ self._display(finish=True)
+ self._activable = False
+
+ def interrupt(self):
+ 'Stop the progress indicator and notify the displayer'
+ self._activable = False
+ if self._displayer.isdirty():
+ self._displayer.clear()
+
+ def _isactivated(self):
+ 'Return whether the indicator is allowed to show something or not'
+ return (self._creationtime + self._threshold) < time.time()
+
+ def _display(self, finish=False):
+ 'Pass the current status to the displayer'
+ self._displayer.display(self._msg, self._value, self._maxval, finish)
+
+class defaultdisplayer:
+ 'A simple displayer to show the current progress status'
+ def __init__(self, fd):
+ self._fd = fd
+ self._progressmsg = '......'
+ self._clean = 0
+
+ def write(self, *args):
+ 'Show the given message'
+ if self.isactive():
+ msg = '%s' % (args)
+ self._fd.write('\r%s' % (msg))
+ clean = len(msg)
+ if clean > self._clean:
+ self._clean = clean
+
+ def isdirty(self):
+ 'True if something has been showed'
+ return self._clean > 0
+
+ def clear(self):
+ 'Clean the output'
+ if self.isdirty():
+ status = ' '.ljust(self._clean) + '\r'
+ self.write(status)
+
+ def display(self, msg, value, maxval, finish):
+ 'Display the current progress status'
+ if finish:
+ status = ' '.ljust(self._clean) + '\n'
+ else:
+ if maxval is not None:
+ # we have a maxval defined
+ status = " %d/%d" % (value, maxval)
+ else:
+ statuslen = len(self._progressmsg)
+ value %= statuslen + 1
+ status = self._progressmsg[:value]
+ status = status.ljust(statuslen)
+ msg = '\r%s%s' % (msg, status)
+ self.write(msg)
+
+ def isactive(self):
+ 'True if the displayer is able to show user its status'
+ return self._fd.isatty()
+
+if __name__ == '__main__':
+ print "% Test providing a maximum value"
+ p = createprogress(msg='Test maxval', maxval=10)
+ for i in range(10):
+ time.sleep(1)
+ p.update()
+ p.finish()
+
+ print "% Test with no arguments"
+ p = createprogress(msg='Test no maxval', showmsg=True)
+ for i in range(10):
+ time.sleep(1)
+ p.update()
+ p.finish()
+
+ print "% Doesn't show anything before THRESHOLD seconds"
+ p = createprogress(msg='Test threshold', maxval=3, threshold=3)
+ p.update()
+ time.sleep(3)
+ p.update()
+ time.sleep(2)
+ p.update()
+ p.finish()
+
+ print "% Test completion before THRESHOLD seconds"
+ p = createprogress(msg='Test completion before threshold', threshold=5)
+ p.update()
+ time.sleep(1)
+ p.update()
+ p.finish()
+
+ print "% Test not activable progress"
+ p = createprogress(msg='Test not activable progress', activable=False,
+ threshold=0)
+ p.update()
+
+ print "% Test interruption with dirty output"
+ p = createprogress(msg='Test interruption with dirty output', threshold=0)
+ p.update()
+ time.sleep(2)
+ p.interrupt()
+
+ print "% Test threshold -1 with show message (like simple print)"
+ p = createprogress(msg='Test simple print', threshold=-1, showmsg=True)
+ p.update()
+ time.sleep(2)
+ p.finish()
+
+ print "% Provide a different display method"
+ class alternativedisplayer(defaultdisplayer):
+ def display(self, msg, value, maxval, finish):
+ remaining = maxval - value
+ if finish:
+ self._fd.write('\rOperation completed!\n')
+ else:
+ self._fd.write('\rRemaining: %d' % remaining)
+
+ p = createprogress(msg='Display', maxval=5, threshold=1,
+ displayer=alternativedisplayer(sys.stderr))
+ for i in range(5):
+ time.sleep(1)
+ p.update()
+ p.finish()
diff --git a/mercurial/ui.py b/mercurial/ui.py
--- a/mercurial/ui.py
+++ b/mercurial/ui.py
@@ -7,7 +7,7 @@
from i18n import _
import errno, getpass, os, re, socket, sys, tempfile
-import ConfigParser, traceback, util
+import ConfigParser, traceback, util, progress
def dupconfig(orig):
new = util.configparser(orig.defaults())
@@ -65,6 +65,7 @@
self.overlay = util.configparser()
updateconfig(parentui.overlay, self.overlay)
self.buffers = parentui.buffers
+ self._progress = self.progress('Fake progress', activable=False)
def __getattr__(self, key):
return getattr(self.parentui, key)
@@ -375,6 +376,7 @@
return "".join(self.buffers.pop())
def write(self, *args):
+ self._progress.interrupt()
if self.buffers:
self.buffers[-1].extend([str(a) for a in args])
else:
@@ -383,6 +385,7 @@
def write_err(self, *args):
try:
+ self._progress.interrupt()
if not sys.stdout.closed: sys.stdout.flush()
for a in args:
sys.stderr.write(str(a))
@@ -425,6 +428,7 @@
if not self.interactive:
self.note(msg, ' ', default, "\n")
return default
+ self._progress.interrupt()
while True:
try:
r = self._readline(msg + ' ')
@@ -439,6 +443,7 @@
def getpass(self, prompt=None, default=None):
if not self.interactive: return default
+ self._progress.interrupt()
return getpass.getpass(prompt or _('password: '))
def status(self, *msg):
if not self.quiet: self.write(*msg)
@@ -449,6 +454,7 @@
def debug(self, *msg):
if self.debugflag: self.write(*msg)
def edit(self, text, user):
+ self._progress.interrupt()
(fd, name) = tempfile.mkstemp(prefix="hg-editor-", suffix=".txt",
text=True)
try:
@@ -479,6 +485,14 @@
traceback.print_exc()
return self.traceback
+ def progress(self, msg, displayer=None, maxval=None, activable=True,
+ showmsg=False, fd=sys.stderr):
+ 'Return a progress indicator object'
+ threshold = int(self.config('ui', 'progressthreshold') or 5)
+ activable = activable and not self.quiet
+ return progress.createprogress(msg, displayer, maxval, activable,
+ threshold, showmsg, fd)
+
def geteditor(self):
'''return editor to use'''
return (os.environ.get("HGEDITOR") or
More information about the Mercurial-devel
mailing list