From: Ben Darnell Date: Sat, 25 May 2013 19:02:08 +0000 (-0400) Subject: Add a test for digest auth in curl_httpclient. X-Git-Tag: v3.1.0~45 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ec7fd4e5042070cb45613f585c196bd64cb6a793;p=thirdparty%2Ftornado.git Add a test for digest auth in curl_httpclient. I wrote this to test for a file descriptor bug, but can't reproduce it on my system. --- diff --git a/tornado/test/curl_httpclient_test.py b/tornado/test/curl_httpclient_test.py index 10e3e83fb..0ea96a330 100644 --- a/tornado/test/curl_httpclient_test.py +++ b/tornado/test/curl_httpclient_test.py @@ -1,10 +1,13 @@ from __future__ import absolute_import, division, print_function, with_statement + +from hashlib import md5 + from tornado.httpclient import HTTPRequest from tornado.stack_context import ExceptionStackContext from tornado.testing import AsyncHTTPTestCase from tornado.test import httpclient_test from tornado.test.util import unittest -from tornado.web import Application +from tornado.web import Application, RequestHandler try: import pycurl @@ -24,6 +27,44 @@ class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase): return client +class DigestAuthHandler(RequestHandler): + def get(self): + realm = 'test' + opaque = 'asdf' + # Real implementations would use a random nonce. + nonce = "1234" + username = 'foo' + password = 'bar' + + auth_header = self.request.headers.get('Authorization', None) + if auth_header is not None: + auth_mode, params = auth_header.split(' ', 1) + assert auth_mode == 'Digest' + param_dict = {} + for pair in params.split(','): + k, v = pair.strip().split('=', 1) + if v[0] == '"' and v[-1] == '"': + v = v[1:-1] + param_dict[k] = v + assert param_dict['realm'] == realm + assert param_dict['opaque'] == opaque + assert param_dict['nonce'] == nonce + assert param_dict['username'] == username + assert param_dict['uri'] == self.request.path + h1 = md5('%s:%s:%s' % (username, realm, password)).hexdigest() + h2 = md5('%s:%s' % (self.request.method, + self.request.path)).hexdigest() + digest = md5('%s:%s:%s' % (h1, nonce, h2)).hexdigest() + if digest == param_dict['response']: + self.write('ok') + else: + self.write('fail') + else: + self.set_status(401) + self.set_header('WWW-Authenticate', + 'Digest realm="%s", nonce="%s", opaque="%s"' % + (realm, nonce, opaque)) + @unittest.skipIf(pycurl is None, "pycurl module not present") class CurlHTTPClientTestCase(AsyncHTTPTestCase): def setUp(self): @@ -31,7 +72,9 @@ class CurlHTTPClientTestCase(AsyncHTTPTestCase): self.http_client = CurlAsyncHTTPClient(self.io_loop) def get_app(self): - return Application([]) + return Application([ + ('/digest', DigestAuthHandler), + ]) def test_prepare_curl_callback_stack_context(self): exc_info = [] @@ -48,3 +91,8 @@ class CurlHTTPClientTestCase(AsyncHTTPTestCase): self.wait() self.assertEqual(1, len(exc_info)) self.assertIs(exc_info[0][0], ZeroDivisionError) + + def test_digest_auth(self): + response = self.fetch('/digest', auth_mode='digest', + auth_username='foo', auth_password='bar') + self.assertEqual(response.body, b'ok')