import hmac
import time
import uuid
+import warnings
from tornado.concurrent import (Future, return_future, chain_future,
future_set_exc_info,
Note that when using this decorator the ``callback`` parameter
inside the function will actually be a future.
+
+ .. deprecated:: 5.1
+ Will be removed in 6.0.
"""
replacer = ArgReplacer(f, 'callback')
future = Future()
callback, args, kwargs = replacer.replace(future, args, kwargs)
if callback is not None:
+ warnings.warn("callback arguments are deprecated, use the returned Future instead",
+ DeprecationWarning)
future.add_done_callback(
functools.partial(_auth_future_to_callback, callback))
is present and `authenticate_redirect` if it is not).
The result of this method will generally be used to set a cookie.
+
+ .. deprecated:: 5.1
+
+ The ``callback`` argument is deprecated and will be removed in 6.0.
+ Use the returned awaitable object instead.
"""
# Verify the OpenID response via direct request to the OP
args = dict((k, v[-1]) for k, v in self.request.arguments.items())
requests to this service on behalf of the user. The dictionary will
also contain other fields such as ``name``, depending on the service
used.
+
+ .. deprecated:: 5.1
+
+ The ``callback`` argument is deprecated and will be removed in 6.0.
+ Use the returned awaitable object instead.
"""
future = callback
request_key = escape.utf8(self.get_argument("oauth_token"))
:hide:
.. versionadded:: 4.3
+
+ .. deprecated:: 5.1
+
+ The ``callback`` argument is deprecated and will be removed in 6.0.
+ Use the returned awaitable object instead.
"""
all_args = {}
if access_token:
.. testoutput::
:hide:
+ .. deprecated:: 5.1
+
+ The ``callback`` argument is deprecated and will be removed in 6.0.
+ Use the returned awaitable object instead.
"""
if path.startswith('http:') or path.startswith('https:'):
# Raw urls are useful for e.g. search which doesn't follow the
.. testoutput::
:hide:
+ .. deprecated:: 5.1
+
+ The ``callback`` argument is deprecated and will be removed in 6.0.
+ Use the returned awaitable object instead.
""" # noqa: E501
http = self.get_auth_http_client()
body = urllib_parse.urlencode({
.. versionchanged:: 4.5
The ``session_expires`` field was updated to support changes made to the
Facebook API in March 2017.
+
+ .. deprecated:: 5.1
+
+ The ``callback`` argument is deprecated and will be removed in 6.0.
+ Use the returned awaitable object instead.
"""
http = self.get_auth_http_client()
args = {
functools.partial(self._on_access_token, redirect_uri, client_id,
client_secret, callback, fields))
+ @gen.coroutine
def _on_access_token(self, redirect_uri, client_id, client_secret,
future, fields, response):
if response.error:
"expires_in": args.get("expires_in")
}
- self.facebook_request(
+ user = yield self.facebook_request(
path="/me",
- callback=functools.partial(
- self._on_get_user_info, future, session, fields),
access_token=session["access_token"],
appsecret_proof=hmac.new(key=client_secret.encode('utf8'),
msg=session["access_token"].encode('utf8'),
fields=",".join(fields)
)
- def _on_get_user_info(self, future, session, fields, user):
if user is None:
future_set_result_unless_cancelled(future, None)
return
.. versionchanged:: 3.1
Added the ability to override ``self._FACEBOOK_BASE_URL``.
+
+ .. deprecated:: 5.1
+
+ The ``callback`` argument is deprecated and will be removed in 6.0.
+ Use the returned awaitable object instead.
"""
url = self._FACEBOOK_BASE_URL + path
# Thanks to the _auth_return_future decorator, our "callback"
from __future__ import absolute_import, division, print_function
+import warnings
+
from tornado.auth import (
AuthError, OpenIdMixin, OAuthMixin, OAuth2Mixin,
GoogleOAuth2Mixin, FacebookGraphMixin, TwitterMixin,
from tornado.web import RequestHandler, Application, asynchronous, HTTPError
-class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
+class OpenIdClientLoginHandlerLegacy(RequestHandler, OpenIdMixin):
def initialize(self, test):
self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
@asynchronous
def get(self):
if self.get_argument('openid.mode', None):
- self.get_authenticated_user(
- self.on_user, http_client=self.settings['http_client'])
- return
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ self.get_authenticated_user(
+ self.on_user, http_client=self.settings['http_client'])
+ return
res = self.authenticate_redirect()
assert isinstance(res, Future)
assert res.done()
self.finish(user)
+class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
+ def initialize(self, test):
+ self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
+
+ @gen.coroutine
+ def get(self):
+ if self.get_argument('openid.mode', None):
+ user = yield self.get_authenticated_user(http_client=self.settings['http_client'])
+ if user is None:
+ raise Exception("user is None")
+ self.finish(user)
+ return
+ res = self.authenticate_redirect()
+ assert isinstance(res, Future)
+ assert res.done()
+
+
class OpenIdServerAuthenticateHandler(RequestHandler):
def post(self):
if self.get_argument('openid.mode') != 'check_authentication':
self.write('is_valid:true')
-class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
+class OAuth1ClientLoginHandlerLegacy(RequestHandler, OAuthMixin):
def initialize(self, test, version):
self._OAUTH_VERSION = version
self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
@asynchronous
def get(self):
if self.get_argument('oauth_token', None):
- self.get_authenticated_user(
- self.on_user, http_client=self.settings['http_client'])
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ self.get_authenticated_user(
+ self.on_user, http_client=self.settings['http_client'])
return
res = self.authorize_redirect(http_client=self.settings['http_client'])
assert isinstance(res, Future)
callback(dict(email='foo@example.com'))
+class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
+ def initialize(self, test, version):
+ self._OAUTH_VERSION = version
+ self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
+ self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
+ self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/oauth1/server/access_token')
+
+ def _oauth_consumer_token(self):
+ return dict(key='asdf', secret='qwer')
+
+ @gen.coroutine
+ def get(self):
+ if self.get_argument('oauth_token', None):
+ user = yield self.get_authenticated_user(http_client=self.settings['http_client'])
+ if user is None:
+ raise Exception("user is None")
+ self.finish(user)
+ return
+ yield self.authorize_redirect(http_client=self.settings['http_client'])
+
+ def _oauth_get_user(self, access_token, callback):
+ if self.get_argument('fail_in_get_user', None):
+ raise Exception("failing in get_user")
+ if access_token != dict(key='uiop', secret='5678'):
+ raise Exception("incorrect access token %r" % access_token)
+ callback(dict(email='foo@example.com'))
+
+
class OAuth1ClientLoginCoroutineHandler(OAuth1ClientLoginHandler):
"""Replaces OAuth1ClientLoginCoroutineHandler's get() with a coroutine."""
@gen.coroutine
return self.settings['http_client']
-class TwitterClientLoginHandler(TwitterClientHandler):
+class TwitterClientLoginHandlerLegacy(TwitterClientHandler):
@asynchronous
def get(self):
if self.get_argument("oauth_token", None):
self.finish(user)
+class TwitterClientLoginHandler(TwitterClientHandler):
+ @gen.coroutine
+ def get(self):
+ if self.get_argument("oauth_token", None):
+ user = yield self.get_authenticated_user()
+ if user is None:
+ raise Exception("user is None")
+ self.finish(user)
+ return
+ yield self.authorize_redirect()
+
+
class TwitterClientLoginGenEngineHandler(TwitterClientHandler):
@asynchronous
@gen.engine
yield self.authorize_redirect()
-class TwitterClientShowUserHandler(TwitterClientHandler):
+class TwitterClientShowUserHandlerLegacy(TwitterClientHandler):
@asynchronous
@gen.engine
def get(self):
# TODO: would be nice to go through the login flow instead of
# cheating with a hard-coded access token.
- response = yield gen.Task(self.twitter_request,
- '/users/show/%s' % self.get_argument('name'),
- access_token=dict(key='hjkl', secret='vbnm'))
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ response = yield gen.Task(self.twitter_request,
+ '/users/show/%s' % self.get_argument('name'),
+ access_token=dict(key='hjkl', secret='vbnm'))
if response is None:
self.set_status(500)
self.finish('error from twitter request')
self.finish(response)
+class TwitterClientShowUserHandler(TwitterClientHandler):
+ @gen.coroutine
+ def get(self):
+ # TODO: would be nice to go through the login flow instead of
+ # cheating with a hard-coded access token.
+ try:
+ response = yield self.twitter_request(
+ '/users/show/%s' % self.get_argument('name'),
+ access_token=dict(key='hjkl', secret='vbnm'))
+ except AuthError:
+ self.set_status(500)
+ self.finish('error from twitter request')
+ else:
+ self.finish(response)
+
+
class TwitterClientShowUserFutureHandler(TwitterClientHandler):
@asynchronous
@gen.engine
return Application(
[
# test endpoints
+ ('/legacy/openid/client/login', OpenIdClientLoginHandlerLegacy, dict(test=self)),
('/openid/client/login', OpenIdClientLoginHandler, dict(test=self)),
+ ('/legacy/oauth10/client/login', OAuth1ClientLoginHandlerLegacy,
+ dict(test=self, version='1.0')),
('/oauth10/client/login', OAuth1ClientLoginHandler,
dict(test=self, version='1.0')),
('/oauth10/client/request_params',
OAuth1ClientRequestParametersHandler,
dict(version='1.0')),
+ ('/legacy/oauth10a/client/login', OAuth1ClientLoginHandlerLegacy,
+ dict(test=self, version='1.0a')),
('/oauth10a/client/login', OAuth1ClientLoginHandler,
dict(test=self, version='1.0a')),
('/oauth10a/client/login_coroutine',
('/facebook/client/login', FacebookClientLoginHandler, dict(test=self)),
+ ('/legacy/twitter/client/login', TwitterClientLoginHandlerLegacy, dict(test=self)),
('/twitter/client/login', TwitterClientLoginHandler, dict(test=self)),
('/twitter/client/login_gen_engine',
TwitterClientLoginGenEngineHandler, dict(test=self)),
('/twitter/client/login_gen_coroutine',
TwitterClientLoginGenCoroutineHandler, dict(test=self)),
+ ('/legacy/twitter/client/show_user',
+ TwitterClientShowUserHandlerLegacy, dict(test=self)),
('/twitter/client/show_user',
TwitterClientShowUserHandler, dict(test=self)),
('/twitter/client/show_user_future',
facebook_api_key='test_facebook_api_key',
facebook_secret='test_facebook_secret')
+ def test_openid_redirect_legacy(self):
+ response = self.fetch('/legacy/openid/client/login', follow_redirects=False)
+ self.assertEqual(response.code, 302)
+ self.assertTrue(
+ '/openid/server/authenticate?' in response.headers['Location'])
+
+ def test_openid_get_user_legacy(self):
+ response = self.fetch('/legacy/openid/client/login?openid.mode=blah'
+ '&openid.ns.ax=http://openid.net/srv/ax/1.0'
+ '&openid.ax.type.email=http://axschema.org/contact/email'
+ '&openid.ax.value.email=foo@example.com')
+ response.rethrow()
+ parsed = json_decode(response.body)
+ self.assertEqual(parsed["email"], "foo@example.com")
+
def test_openid_redirect(self):
response = self.fetch('/openid/client/login', follow_redirects=False)
self.assertEqual(response.code, 302)
parsed = json_decode(response.body)
self.assertEqual(parsed["email"], "foo@example.com")
+ def test_oauth10_redirect_legacy(self):
+ response = self.fetch('/legacy/oauth10/client/login', follow_redirects=False)
+ self.assertEqual(response.code, 302)
+ self.assertTrue(response.headers['Location'].endswith(
+ '/oauth1/server/authorize?oauth_token=zxcv'))
+ # the cookie is base64('zxcv')|base64('1234')
+ self.assertTrue(
+ '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
+ response.headers['Set-Cookie'])
+
def test_oauth10_redirect(self):
response = self.fetch('/oauth10/client/login', follow_redirects=False)
self.assertEqual(response.code, 302)
'_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
response.headers['Set-Cookie'])
+ def test_oauth10_get_user_legacy(self):
+ response = self.fetch(
+ '/legacy/oauth10/client/login?oauth_token=zxcv',
+ headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
+ response.rethrow()
+ parsed = json_decode(response.body)
+ self.assertEqual(parsed['email'], 'foo@example.com')
+ self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
+
def test_oauth10_get_user(self):
response = self.fetch(
'/oauth10/client/login?oauth_token=zxcv',
self.assertTrue('oauth_nonce' in parsed)
self.assertTrue('oauth_signature' in parsed)
+ def test_oauth10a_redirect_legacy(self):
+ response = self.fetch('/legacy/oauth10a/client/login', follow_redirects=False)
+ self.assertEqual(response.code, 302)
+ self.assertTrue(response.headers['Location'].endswith(
+ '/oauth1/server/authorize?oauth_token=zxcv'))
+ # the cookie is base64('zxcv')|base64('1234')
+ self.assertTrue(
+ '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
+ response.headers['Set-Cookie'])
+
+ def test_oauth10a_get_user_legacy(self):
+ response = self.fetch(
+ '/legacy/oauth10a/client/login?oauth_token=zxcv',
+ headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
+ response.rethrow()
+ parsed = json_decode(response.body)
+ self.assertEqual(parsed['email'], 'foo@example.com')
+ self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
+
def test_oauth10a_redirect(self):
response = self.fetch('/oauth10a/client/login', follow_redirects=False)
self.assertEqual(response.code, 302)
'_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
response.headers['Set-Cookie'])
+ def test_twitter_redirect_legacy(self):
+ self.base_twitter_redirect('/legacy/twitter/client/login')
+
def test_twitter_redirect(self):
self.base_twitter_redirect('/twitter/client/login')
u'screen_name': u'foo',
u'username': u'foo'})
+ def test_twitter_show_user_legacy(self):
+ response = self.fetch('/legacy/twitter/client/show_user?name=somebody')
+ response.rethrow()
+ self.assertEqual(json_decode(response.body),
+ {'name': 'Somebody', 'screen_name': 'somebody'})
+
+ def test_twitter_show_user_error_legacy(self):
+ with ExpectLog(gen_log, 'Error response HTTP 500'):
+ response = self.fetch('/legacy/twitter/client/show_user?name=error')
+ self.assertEqual(response.code, 500)
+ self.assertEqual(response.body, b'error from twitter request')
+
def test_twitter_show_user(self):
response = self.fetch('/twitter/client/show_user?name=somebody')
response.rethrow()
{'name': 'Somebody', 'screen_name': 'somebody'})
def test_twitter_show_user_error(self):
- with ExpectLog(gen_log, 'Error response HTTP 500'):
- response = self.fetch('/twitter/client/show_user?name=error')
+ response = self.fetch('/twitter/client/show_user?name=error')
self.assertEqual(response.code, 500)
self.assertEqual(response.body, b'error from twitter request')