Add tests for the functions that were thereby revealed to have none.
# Make sure we got back at least an email from attribute exchange
ax_ns = None
- for name in self.request.arguments.keys():
+ for name in self.request.arguments:
if name.startswith("openid.ns.") and \
self.get_argument(name) == u("http://openid.net/srv/ax/1.0"):
ax_ns = name[10:]
"""Fetches the authenticated user data upon redirect."""
# Look to see if we are doing combined OpenID/OAuth
oauth_ns = ""
- for name, values in self.request.arguments.iteritems():
+ for name, values in self.request.arguments.items():
if name.startswith("openid.ns.") and \
- values[-1] == u("http://specs.openid.net/extensions/oauth/1.0"):
+ values[-1] == b"http://specs.openid.net/extensions/oauth/1.0":
oauth_ns = name[10:]
break
token = self.get_argument("openid." + oauth_ns + ".request_token", "")
[utf8("%s: %s" % i) for i in request.headers.get_all()])
else:
curl.setopt(pycurl.HTTPHEADER,
- [utf8("%s: %s" % i) for i in request.headers.iteritems()])
+ [utf8("%s: %s" % i) for i in request.headers.items()])
if request.header_callback:
curl.setopt(pycurl.HEADERFUNCTION, request.header_callback)
raise self.error
def __repr__(self):
- args = ",".join("%s=%r" % i for i in self.__dict__.iteritems())
+ args = ",".join("%s=%r" % i for i in sorted(self.__dict__.items()))
return "%s(%s)" % (self.__class__.__name__, args)
from __future__ import absolute_import, division, print_function, with_statement
-from tornado.auth import OpenIdMixin, OAuthMixin, OAuth2Mixin, TwitterMixin
+from tornado.auth import OpenIdMixin, OAuthMixin, OAuth2Mixin, TwitterMixin, GoogleMixin
from tornado.escape import json_decode
from tornado.testing import AsyncHTTPTestCase
from tornado.util import u
self.write(dict(screen_name=screen_name, name=screen_name.capitalize()))
+class GoogleOpenIdClientLoginHandler(RequestHandler, GoogleMixin):
+ def initialize(self, test):
+ self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
+
+ @asynchronous
+ def get(self):
+ if self.get_argument("openid.mode", None):
+ self.get_authenticated_user(self.on_user)
+ return
+ self.authenticate_redirect()
+
+ def on_user(self, user):
+ if user is None:
+ raise Exception("user is None")
+ self.finish(user)
+
+ def get_auth_http_client(self):
+ return self.settings['http_client']
+
+
class AuthTest(AsyncHTTPTestCase):
def get_app(self):
return Application(
('/oauth2/client/login', OAuth2ClientLoginHandler, dict(test=self)),
('/twitter/client/login', TwitterClientLoginHandler, dict(test=self)),
+ ('/google/client/openid_login', GoogleOpenIdClientLoginHandler, dict(test=self)),
# simulated servers
('/openid/server/authenticate', OpenIdServerAuthenticateHandler),
u('name'): u('Foo'),
u('screen_name'): u('foo'),
u('username'): u('foo')})
+
+ def test_google_redirect(self):
+ # same as test_openid_redirect
+ response = self.fetch('/google/client/openid_login', follow_redirects=False)
+ self.assertEqual(response.code, 302)
+ self.assertTrue(
+ '/openid/server/authenticate?' in response.headers['Location'])
+
+ def test_google_get_user(self):
+ response = self.fetch('/google/client/openid_login?openid.mode=blah&openid.ns.ax=http://openid.net/srv/ax/1.0&openid.ax.type.email=http://axschema.org/contact/email&openid.ax.value.email=foo@example.com', follow_redirects=False)
+ response.rethrow()
+ parsed = json_decode(response.body)
+ self.assertEqual(parsed["email"], "foo@example.com")
import sys
from tornado.escape import utf8
-from tornado.httpclient import HTTPRequest, _RequestProxy
+from tornado.httpclient import HTTPRequest, HTTPResponse, _RequestProxy
from tornado.iostream import IOStream
from tornado import netutil
from tornado.stack_context import ExceptionStackContext, NullContext
from tornado.util import u, bytes_type
from tornado.web import Application, RequestHandler, url
+try:
+ from io import BytesIO # python 3
+except ImportError:
+ from cStringIO import StringIO as BytesIO
class HelloWorldHandler(RequestHandler):
def get(self):
def test_defaults_none(self):
proxy = _RequestProxy(HTTPRequest('http://example.com/'), None)
self.assertIs(proxy.auth_username, None)
+
+
+class HTTPResponseTestCase(unittest.TestCase):
+ def test_str(self):
+ response = HTTPResponse(HTTPRequest('http://example.com'),
+ 200, headers={}, buffer=BytesIO())
+ s = str(response)
+ self.assertTrue(s.startswith('HTTPResponse('))
+ self.assertIn('code=200', s)
def restore_signal_handlers(saved):
- for sig, handler in saved.iteritems():
+ for sig, handler in saved.items():
signal.signal(sig, handler)
],
'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
}
- for test_name, blacklist in twisted_tests.iteritems():
+ for test_name, blacklist in twisted_tests.items():
try:
test_class = import_object(test_name)
except (ImportError, AttributeError):
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
+
+
+@wsgi_safe
+class ClearAllCookiesTest(SimpleHandlerTestCase):
+ class Handler(RequestHandler):
+ def get(self):
+ self.clear_all_cookies()
+ self.write('ok')
+
+ def test_clear_all_cookies(self):
+ response = self.fetch('/', headers={'Cookie': 'foo=bar; baz=xyzzy'})
+ set_cookies = sorted(response.headers.get_list('Set-Cookie'))
+ self.assertTrue(set_cookies[0].startswith('baz=;'))
+ self.assertTrue(set_cookies[1].startswith('foo=;'))
def clear_all_cookies(self):
"""Deletes all the cookies the user sent with this request."""
- for name in self.request.cookies.iterkeys():
+ for name in self.request.cookies:
self.clear_cookie(name)
def set_secure_cookie(self, name, value, expires_days=30, **kwargs):