]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add a test for digest auth in curl_httpclient.
authorBen Darnell <ben@bendarnell.com>
Sat, 25 May 2013 19:02:08 +0000 (15:02 -0400)
committerBen Darnell <ben@bendarnell.com>
Sat, 25 May 2013 19:02:08 +0000 (15:02 -0400)
I wrote this to test for a file descriptor bug, but can't reproduce it
on my system.

tornado/test/curl_httpclient_test.py

index 10e3e83fb3a668fdd6b0e466f46fdabe99a1006b..0ea96a330bb3dde1220d75f554c70725612646e0 100644 (file)
@@ -1,10 +1,13 @@
 from __future__ import absolute_import, division, print_function, with_statement
+
+from hashlib import md5
+
 from tornado.httpclient import HTTPRequest
 from tornado.stack_context import ExceptionStackContext
 from tornado.testing import AsyncHTTPTestCase
 from tornado.test import httpclient_test
 from tornado.test.util import unittest
-from tornado.web import Application
+from tornado.web import Application, RequestHandler
 
 try:
     import pycurl
@@ -24,6 +27,44 @@ class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
         return client
 
 
+class DigestAuthHandler(RequestHandler):
+    def get(self):
+        realm = 'test'
+        opaque = 'asdf'
+        # Real implementations would use a random nonce.
+        nonce = "1234"
+        username = 'foo'
+        password = 'bar'
+
+        auth_header = self.request.headers.get('Authorization', None)
+        if auth_header is not None:
+            auth_mode, params = auth_header.split(' ', 1)
+            assert auth_mode == 'Digest'
+            param_dict = {}
+            for pair in params.split(','):
+                k, v = pair.strip().split('=', 1)
+                if v[0] == '"' and v[-1] == '"':
+                    v = v[1:-1]
+                param_dict[k] = v
+            assert param_dict['realm'] == realm
+            assert param_dict['opaque'] == opaque
+            assert param_dict['nonce'] == nonce
+            assert param_dict['username'] == username
+            assert param_dict['uri'] == self.request.path
+            h1 = md5('%s:%s:%s' % (username, realm, password)).hexdigest()
+            h2 = md5('%s:%s' % (self.request.method,
+                                self.request.path)).hexdigest()
+            digest = md5('%s:%s:%s' % (h1, nonce, h2)).hexdigest()
+            if digest == param_dict['response']:
+                self.write('ok')
+            else:
+                self.write('fail')
+        else:
+            self.set_status(401)
+            self.set_header('WWW-Authenticate',
+                            'Digest realm="%s", nonce="%s", opaque="%s"' %
+                            (realm, nonce, opaque))
+
 @unittest.skipIf(pycurl is None, "pycurl module not present")
 class CurlHTTPClientTestCase(AsyncHTTPTestCase):
     def setUp(self):
@@ -31,7 +72,9 @@ class CurlHTTPClientTestCase(AsyncHTTPTestCase):
         self.http_client = CurlAsyncHTTPClient(self.io_loop)
 
     def get_app(self):
-        return Application([])
+        return Application([
+            ('/digest', DigestAuthHandler),
+        ])
 
     def test_prepare_curl_callback_stack_context(self):
         exc_info = []
@@ -48,3 +91,8 @@ class CurlHTTPClientTestCase(AsyncHTTPTestCase):
         self.wait()
         self.assertEqual(1, len(exc_info))
         self.assertIs(exc_info[0][0], ZeroDivisionError)
+
+    def test_digest_auth(self):
+        response = self.fetch('/digest', auth_mode='digest',
+                              auth_username='foo', auth_password='bar')
+        self.assertEqual(response.body, b'ok')