[PATCH 3 of 6] tests: add code to handle HTTP digests on the server side

Matt Harbison mharbison72 at gmail.com
Tue Feb 5 21:35:46 EST 2019


# HG changeset patch
# User Matt Harbison <matt_harbison at yahoo.com>
# Date 1549391559 18000
#      Tue Feb 05 13:32:39 2019 -0500
# Node ID 576aec428b05a88584505b9c4af541e273e1014a
# Parent  4bad819762856a2a6b5c9faeb52a15fa63b1ca9e
tests: add code to handle HTTP digests on the server side

It's not hooked up yet.  Mostly this was cargoculted and simplified from some
python.org code[1].  It's not trying to test the security as much as it is
trying to make sure that clients are sending out the right data when challenged.
(And they aren't on py3.)

[1] http://svn.python.org/projects/sandbox/trunk/digestauth/digestauth.py

diff --git a/tests/httpserverauth.py b/tests/httpserverauth.py
--- a/tests/httpserverauth.py
+++ b/tests/httpserverauth.py
@@ -1,8 +1,89 @@
 from __future__ import absolute_import
 
 import base64
+import hashlib
 
 from mercurial.hgweb import common
+from mercurial import (
+    node,
+)
+
+def parse_keqv_list(req, l):
+    """Parse list of key=value strings where keys are not duplicated."""
+    parsed = {}
+    for elt in l:
+        k, v = elt.split(b'=', 1)
+        if v[0:1] == b'"' and v[-1:] == b'"':
+            v = v[1:-1]
+        parsed[k] = v
+    return parsed
+
+class digestauthserver(object):
+    def __init__(self):
+        self._user_hashes = {}
+
+    def gethashers(self):
+        def _md5sum(x):
+            m = hashlib.md5()
+            m.update(x)
+            return node.hex(m.digest())
+
+        h = _md5sum
+
+        kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
+        return h, kd
+
+    def adduser(self, user, password, realm):
+        h, kd = self.gethashers()
+        a1 = h(b'%s:%s:%s' % (user, realm, password))
+        self._user_hashes[(user, realm)] = a1
+
+    def makechallenge(self, realm):
+        # We aren't testing the protocol here, just that the bytes make the
+        # proper round trip.  So hardcoded seems fine.
+        nonce = b'064af982c5b571cea6450d8eda91c20d'
+        return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm,
+                       nonce)
+
+    def checkauth(self, req, header):
+        log = req.rawenv[b'wsgi.errors']
+
+        h, kd = self.gethashers()
+        resp = parse_keqv_list(req, header.split(b', '))
+
+        if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
+            log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
+            raise common.ErrorResponse(common.HTTP_FORBIDDEN,
+                                       b"unknown algorithm")
+        user = resp[b'username']
+        realm = resp[b'realm']
+        nonce = resp[b'nonce']
+
+        ha1 = self._user_hashes.get((user, realm))
+        if not ha1:
+            log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
+            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")
+
+        qop = resp.get(b'qop', b'auth')
+        if qop != b'auth':
+            log.write(b"Unsupported qop: %s" % qop)
+            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")
+
+        cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
+        if not cnonce or not ncvalue:
+            log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
+            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")
+
+        a2 = b'%s:%s' % (req.method, resp[b'uri'])
+        noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))
+
+        respdig = kd(ha1, noncebit)
+        if respdig != resp[b'response']:
+            log.write(b'User/realm "%s/%s" gave %s, but expected %s'
+                      % (user, realm, resp[b'response'], respdig))
+            return False
+
+        return True
 
 def perform_authentication(hgweb, req, op):
     auth = req.headers.get(b'Authorization')


More information about the Mercurial-devel mailing list