[PATCH 1 of 2] Add a progress indicator
Stefano Tortarolo
stefano.tortarolo at gmail.com
Sat Dec 13 11:49:45 CST 2008
# HG changeset patch
# User Stefano Tortarolo <stefano.tortarolo at gmail.com>
# Date 1229025213 -3600
# Node ID 36b7a09078cc38c0ffc779289912ca23655a0743
# Parent d8cd79fbed3c7b358dd820ff5dee44a7cff362bc
Add a progress indicator
diff --git a/mercurial/progress.py b/mercurial/progress.py
new file mode 100644
--- /dev/null
+++ b/mercurial/progress.py
@@ -0,0 +1,182 @@
+# progress.py - Progress indicator for Mercurial
+#
+# Copyright 2008 by Stefano Tortarolo <stefano.tortarolo at gmail dot com>
+#
+# This software may be used and distributed according to the terms
+# of the GNU General Public License, incorporated herein by reference.
+import time, sys
+
+def createprogress(msg, displayer, quiet=False, maxval=None, threshold=5):
+ 'Factory method, ensures that progresshandler has a suitable displayer'
+ if quiet:
+ displayer = None
+ if displayer is not None:
+ displayer = displayer(msg)
+ return progresshandler(maxval, displayer, threshold)
+
+class progresshandler:
+ 'This class provides a progress indicator'
+ def __init__(self, maxval, displayer, threshold):
+ self._threshold = threshold
+ self._creationtime = time.time()
+ self._value = 0
+ if maxval is not None:
+ maxval = float(maxval)
+ self._maxval = maxval
+ self._displayer = displayer
+ if displayer is not None:
+ self._activable = self._displayer.isactive() and threshold >= 0
+ self._displayer.setprogresshandler(self)
+ else:
+ self._activable = False
+
+ def update(self, value=1, delta=True):
+ 'Update the current status and display it'
+ if not self._activable:
+ return
+ if delta:
+ self._value += value
+ else:
+ self._value = value
+ if self._isactivated():
+ self._displayer.update()
+
+ def finish(self):
+ '''The process is done
+ Note that even though we could detect a completion using maxval <= value
+ we rely on finish to ensure that maxval is consistent'''
+ self._activable = False
+ if self._displayer is not None:
+ self._displayer.finish()
+
+ def interrupt(self):
+ 'Stop the progress indicator and notify the displayer'
+ self._activable = False
+ if self._displayer is not None:
+ self._displayer.interrupt()
+
+ def _isactivated(self):
+ 'Return whether the indicator is allowed to show something or not'
+ return (self._creationtime + self._threshold) < time.time()
+
+class defaultdisplayer:
+ 'A simple displayer to show the current progress status'
+ def __init__(self, msg, fd=sys.stderr):
+ self._fd = fd
+ self._progressmsg = '......'
+ self._clean = 0
+ self._progress = None
+ self._msg = msg
+ self._write(self._msg)
+
+ def setprogresshandler(self, progress):
+ 'Set the progress handler'
+ self._progress = progress
+
+ def _write(self, *args):
+ 'Show the given message'
+ if self.isactive():
+ msg = '%s' % (args)
+ self._fd.write('\r%s' % (msg))
+ clean = len(msg)
+ if clean > self._clean:
+ self._clean = clean
+
+ def getvalue(self):
+ return self._progress._value
+
+ def getmaxvalue(self):
+ return self._progress._maxval
+
+ def isdirty(self):
+ 'True if something has been showed'
+ return self._clean > 0
+
+ def interrupt(self):
+ 'Clean the output'
+ if self.isdirty():
+ status = ' '.ljust(self._clean) + '\r'
+ self._write(status)
+
+ def finish(self):
+ 'The process is done'
+ if self.isdirty():
+ status = self._msg + ' '.ljust(self._clean) + '\n'
+ self._write(status)
+
+ def update(self):
+ 'Display the current progress status'
+ maxval = self.getmaxvalue()
+ value = self.getvalue()
+ if maxval is not None:
+ # we have a maxval defined
+ status = " %d/%d" % (value, maxval)
+ else:
+ statuslen = len(self._progressmsg)
+ value %= statuslen + 1
+ status = self._progressmsg[:value]
+ status = status.ljust(statuslen)
+ msg = '\r%s%s' % (self._msg, status)
+ self._write(msg)
+
+ def isactive(self):
+ 'True if the displayer is able to show user its status'
+ return self._fd.isatty()
+
+if __name__ == '__main__':
+ print "% Test providing a maximum value"
+ p = createprogress(msg='Test maxval', displayer=defaultdisplayer,
+ threshold=2, maxval=5)
+ for i in range(5):
+ time.sleep(1)
+ p.update()
+ p.finish()
+
+ print "% Test with no arguments"
+ p = createprogress(msg='Test no maxval', displayer=defaultdisplayer,
+ threshold=2)
+ for i in range(5):
+ time.sleep(1)
+ p.update()
+ p.finish()
+
+ print "% Test completion before THRESHOLD seconds"
+ p = createprogress(msg='This must not appear',
+ displayer=defaultdisplayer, threshold=5)
+ p.update()
+ time.sleep(1)
+ p.update()
+ p.finish()
+
+ print "% Test without displayer"
+ p = createprogress(msg='Test without displayer', displayer=None, threshold=0)
+ p.update()
+
+ print "% Test interruption with dirty output"
+ p = createprogress(msg='Test interruption with dirty output',
+ displayer=defaultdisplayer, threshold=0)
+ p.update()
+ time.sleep(2)
+ p.interrupt()
+
+ print "% Test threshold -1 with show message (like simple print)"
+ p = createprogress(msg='Test simple print', displayer=defaultdisplayer,
+ threshold=-1)
+ p.update()
+ time.sleep(2)
+ p.finish()
+
+ print "% Provide a different display method"
+ class alternativedisplayer(defaultdisplayer):
+ def update(self):
+ remaining = self.getmaxvalue() - self.getvalue()
+ self._write('\rRemaining: %d' % remaining)
+ def finish(self):
+ self._write('\rOperation completed!\n')
+
+ p = createprogress(msg='Display', maxval=5, threshold=1,
+ displayer=alternativedisplayer)
+ for i in range(5):
+ time.sleep(1)
+ p.update()
+ p.finish()
diff --git a/mercurial/ui.py b/mercurial/ui.py
--- a/mercurial/ui.py
+++ b/mercurial/ui.py
@@ -7,7 +7,7 @@
from i18n import _
import errno, getpass, os, re, socket, sys, tempfile
-import ConfigParser, traceback, util
+import ConfigParser, traceback, util, progress
def dupconfig(orig):
new = util.configparser(orig.defaults())
@@ -65,6 +65,10 @@
self.overlay = util.configparser()
updateconfig(parentui.overlay, self.overlay)
self.buffers = parentui.buffers
+ self._displayer = None
+ self._progress = self.progress()
+ self.setprogressdisplayer(progress.defaultdisplayer)
+
def __getattr__(self, key):
return getattr(self.parentui, key)
@@ -375,6 +379,7 @@
return "".join(self.buffers.pop())
def write(self, *args):
+ self._progress.interrupt()
if self.buffers:
self.buffers[-1].extend([str(a) for a in args])
else:
@@ -383,6 +388,7 @@
def write_err(self, *args):
try:
+ self._progress.interrupt()
if not sys.stdout.closed: sys.stdout.flush()
for a in args:
sys.stderr.write(str(a))
@@ -425,6 +431,7 @@
if not self.interactive:
self.note(msg, ' ', default, "\n")
return default
+ self._progress.interrupt()
while True:
try:
r = self._readline(msg + ' ')
@@ -439,6 +446,7 @@
def getpass(self, prompt=None, default=None):
if not self.interactive: return default
+ self._progress.interrupt()
return getpass.getpass(prompt or _('password: '))
def status(self, *msg):
if not self.quiet: self.write(*msg)
@@ -449,6 +457,7 @@
def debug(self, *msg):
if self.debugflag: self.write(*msg)
def edit(self, text, user):
+ self._progress.interrupt()
(fd, name) = tempfile.mkstemp(prefix="hg-editor-", suffix=".txt",
text=True)
try:
@@ -479,6 +488,16 @@
traceback.print_exc()
return self.traceback
+ def progress(self, msg='', maxval=None, fd=sys.stderr):
+ 'Return a progress indicator object'
+ threshold = int(self.config('progress', 'threshold') or 5)
+ self._progress = progress.createprogress(msg, self._displayer,
+ self.quiet, maxval, threshold)
+ return self._progress
+
+ def setprogressdisplayer(self, displayer):
+ self._displayer = displayer
+
def geteditor(self):
'''return editor to use'''
return (os.environ.get("HGEDITOR") or
More information about the Mercurial-devel
mailing list