from tornado import escape
from tornado.httputil import url_concat
from tornado.util import unicode_type
+from tornado.web import RequestHandler
+
+from typing import List, Any, Dict, cast, Iterable, Union, Optional
class AuthError(Exception):
* ``_OPENID_ENDPOINT``: the identity provider's URI.
"""
- def authenticate_redirect(self, callback_uri=None,
- ax_attrs=["name", "email", "language", "username"]):
+ def authenticate_redirect(
+ self, callback_uri: str=None,
+ ax_attrs: List[str]=["name", "email", "language", "username"]) -> None:
"""Redirects to the authentication URL for this service.
After authentication, the service will redirect back to the given
longer returns an awaitable object. It is now an ordinary
synchronous function.
"""
- callback_uri = callback_uri or self.request.uri
+ handler = cast(RequestHandler, self)
+ callback_uri = callback_uri or handler.request.uri
+ assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
- self.redirect(self._OPENID_ENDPOINT + "?" + urllib.parse.urlencode(args))
+ endpoint = self._OPENID_ENDPOINT # type: ignore
+ handler.redirect(endpoint + "?" + urllib.parse.urlencode(args))
- async def get_authenticated_user(self, http_client=None):
+ async def get_authenticated_user(
+ self, http_client: httpclient.AsyncHTTPClient=None
+ ) -> Dict[str, Any]:
"""Fetches the authenticated user data upon redirect.
This method should be called by the handler that receives the
The ``callback`` argument was removed. Use the returned
awaitable object instead.
"""
+ handler = cast(RequestHandler, self)
# Verify the OpenID response via direct request to the OP
- args = dict((k, v[-1]) for k, v in self.request.arguments.items())
+ args = dict((k, v[-1]) for k, v in handler.request.arguments.items()) \
+ # type: Dict[str, Union[str, bytes]]
args["openid.mode"] = u"check_authentication"
- url = self._OPENID_ENDPOINT
+ url = self._OPENID_ENDPOINT # type: ignore
if http_client is None:
http_client = self.get_auth_http_client()
resp = await http_client.fetch(url, method="POST", body=urllib.parse.urlencode(args))
return self._on_authentication_verified(resp)
- def _openid_args(self, callback_uri, ax_attrs=[], oauth_scope=None):
- url = urllib.parse.urljoin(self.request.full_url(), callback_uri)
+ def _openid_args(self, callback_uri: str, ax_attrs: Iterable[str]=[],
+ oauth_scope: str=None) -> Dict[str, str]:
+ handler = cast(RequestHandler, self)
+ url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id":
"openid.ax.mode": "fetch_request",
})
ax_attrs = set(ax_attrs)
- required = []
+ required = [] # type: List[str]
if "name" in ax_attrs:
ax_attrs -= set(["name", "firstname", "fullname", "lastname"])
required += ["firstname", "fullname", "lastname"]
args.update({
"openid.ns.oauth":
"http://specs.openid.net/extensions/oauth/1.0",
- "openid.oauth.consumer": self.request.host.split(":")[0],
+ "openid.oauth.consumer": handler.request.host.split(":")[0],
"openid.oauth.scope": oauth_scope,
})
return args
- def _on_authentication_verified(self, response):
+ def _on_authentication_verified(self, response: httpclient.HTTPResponse) -> Dict[str, Any]:
+ handler = cast(RequestHandler, self)
if b"is_valid:true" not in response.body:
raise AuthError("Invalid OpenID response: %s" % response.body)
# Make sure we got back at least an email from attribute exchange
ax_ns = None
- for name in self.request.arguments:
- if name.startswith("openid.ns.") and \
- self.get_argument(name) == u"http://openid.net/srv/ax/1.0":
- ax_ns = name[10:]
+ for key in handler.request.arguments:
+ if key.startswith("openid.ns.") and \
+ handler.get_argument(key) == u"http://openid.net/srv/ax/1.0":
+ ax_ns = key[10:]
break
- def get_ax_arg(uri):
+ def get_ax_arg(uri: str) -> str:
if not ax_ns:
return u""
prefix = "openid." + ax_ns + ".type."
ax_name = None
- for name in self.request.arguments.keys():
- if self.get_argument(name) == uri and name.startswith(prefix):
+ for name in handler.request.arguments.keys():
+ if handler.get_argument(name) == uri and name.startswith(prefix):
part = name[len(prefix):]
ax_name = "openid." + ax_ns + ".value." + part
break
if not ax_name:
return u""
- return self.get_argument(ax_name, u"")
+ return handler.get_argument(ax_name, u"")
email = get_ax_arg("http://axschema.org/contact/email")
name = get_ax_arg("http://axschema.org/namePerson")
user["locale"] = locale
if username:
user["username"] = username
- claimed_id = self.get_argument("openid.claimed_id", None)
+ claimed_id = handler.get_argument("openid.claimed_id", None)
if claimed_id:
user["claimed_id"] = claimed_id
return user
- def get_auth_http_client(self):
+ def get_auth_http_client(self) -> httpclient.AsyncHTTPClient:
"""Returns the `.AsyncHTTPClient` instance to be used for auth requests.
May be overridden by subclasses to use an HTTP client other than
Subclasses must also override the `_oauth_get_user_future` and
`_oauth_consumer_token` methods.
"""
- async def authorize_redirect(self, callback_uri=None, extra_params=None,
- http_client=None):
+ async def authorize_redirect(self, callback_uri: str=None, extra_params: Dict[str, Any]=None,
+ http_client: httpclient.AsyncHTTPClient=None) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
The ``callback_uri`` may be omitted if you have previously
raise Exception("This service does not support oauth_callback")
if http_client is None:
http_client = self.get_auth_http_client()
+ assert http_client is not None
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
response = await http_client.fetch(
self._oauth_request_token_url(callback_uri=callback_uri,
extra_params=extra_params))
else:
response = await http_client.fetch(self._oauth_request_token_url())
- self._on_request_token(self._OAUTH_AUTHORIZE_URL, callback_uri, response)
+ url = self._OAUTH_AUTHORIZE_URL # type: ignore
+ self._on_request_token(url, callback_uri, response)
- async def get_authenticated_user(self, http_client=None):
+ async def get_authenticated_user(
+ self, http_client: httpclient.AsyncHTTPClient=None
+ ) -> Dict[str, Any]:
"""Gets the OAuth authorized user and access token.
This method should be called from the handler for your
The ``callback`` argument was removed. Use the returned
awaitable object instead.
"""
- request_key = escape.utf8(self.get_argument("oauth_token"))
- oauth_verifier = self.get_argument("oauth_verifier", None)
- request_cookie = self.get_cookie("_oauth_request_token")
+ handler = cast(RequestHandler, self)
+ request_key = escape.utf8(handler.get_argument("oauth_token"))
+ oauth_verifier = handler.get_argument("oauth_verifier", None)
+ request_cookie = handler.get_cookie("_oauth_request_token")
if not request_cookie:
raise AuthError("Missing OAuth request token cookie")
- self.clear_cookie("_oauth_request_token")
+ handler.clear_cookie("_oauth_request_token")
cookie_key, cookie_secret = [
base64.b64decode(escape.utf8(i)) for i in request_cookie.split("|")]
if cookie_key != request_key:
raise AuthError("Request token does not match cookie")
- token = dict(key=cookie_key, secret=cookie_secret)
+ token = dict(key=cookie_key, secret=cookie_secret) # type: Dict[str, Union[str, bytes]]
if oauth_verifier:
token["verifier"] = oauth_verifier
if http_client is None:
http_client = self.get_auth_http_client()
+ assert http_client is not None
response = await http_client.fetch(self._oauth_access_token_url(token))
access_token = _oauth_parse_response(response.body)
user = await self._oauth_get_user_future(access_token)
user["access_token"] = access_token
return user
- def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
+ def _oauth_request_token_url(self, callback_uri: str=None,
+ extra_params: Dict[str, Any]=None) -> str:
+ handler = cast(RequestHandler, self)
consumer_token = self._oauth_consumer_token()
- url = self._OAUTH_REQUEST_TOKEN_URL
+ url = self._OAUTH_REQUEST_TOKEN_URL # type: ignore
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_signature_method="HMAC-SHA1",
args["oauth_callback"] = "oob"
elif callback_uri:
args["oauth_callback"] = urllib.parse.urljoin(
- self.request.full_url(), callback_uri)
+ handler.request.full_url(), callback_uri)
if extra_params:
args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib.parse.urlencode(args)
- def _on_request_token(self, authorize_url, callback_uri, response):
+ def _on_request_token(self, authorize_url: str, callback_uri: Optional[str],
+ response: httpclient.HTTPResponse) -> None:
+ handler = cast(RequestHandler, self)
request_token = _oauth_parse_response(response.body)
data = (base64.b64encode(escape.utf8(request_token["key"])) + b"|" +
base64.b64encode(escape.utf8(request_token["secret"])))
- self.set_cookie("_oauth_request_token", data)
+ handler.set_cookie("_oauth_request_token", data)
args = dict(oauth_token=request_token["key"])
if callback_uri == "oob":
- self.finish(authorize_url + "?" + urllib.parse.urlencode(args))
+ handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
elif callback_uri:
args["oauth_callback"] = urllib.parse.urljoin(
- self.request.full_url(), callback_uri)
- self.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
+ handler.request.full_url(), callback_uri)
+ handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
- def _oauth_access_token_url(self, request_token):
+ def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
- url = self._OAUTH_ACCESS_TOKEN_URL
+ url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args = dict(
oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
oauth_token=escape.to_basestring(request_token["key"]),
args["oauth_signature"] = signature
return url + "?" + urllib.parse.urlencode(args)
- def _oauth_consumer_token(self):
+ def _oauth_consumer_token(self) -> Dict[str, Any]:
"""Subclasses must override this to return their OAuth consumer keys.
The return value should be a `dict` with keys ``key`` and ``secret``.
"""
raise NotImplementedError()
- async def _oauth_get_user_future(self, access_token):
+ async def _oauth_get_user_future(self, access_token: Dict[str, Any]) -> Dict[str, Any]:
"""Subclasses must override this to get basic information about the
user.
"""
raise NotImplementedError()
- def _oauth_request_parameters(self, url, access_token, parameters={},
- method="GET"):
+ def _oauth_request_parameters(self, url: str, access_token: Dict[str, Any],
+ parameters: Dict[str, Any]={},
+ method: str="GET") -> Dict[str, Any]:
"""Returns the OAuth parameters as a dict for the given request.
parameters should include all POST arguments and query string arguments
base_args["oauth_signature"] = escape.to_basestring(signature)
return base_args
- def get_auth_http_client(self):
+ def get_auth_http_client(self) -> httpclient.AsyncHTTPClient:
"""Returns the `.AsyncHTTPClient` instance to be used for auth requests.
May be overridden by subclasses to use an HTTP client other than
* ``_OAUTH_AUTHORIZE_URL``: The service's authorization url.
* ``_OAUTH_ACCESS_TOKEN_URL``: The service's access token url.
"""
- def authorize_redirect(self, redirect_uri=None, client_id=None,
- client_secret=None, extra_params=None,
- scope=None, response_type="code"):
+ def authorize_redirect(self, redirect_uri: str=None, client_id: str=None,
+ client_secret: str=None, extra_params: Dict[str, Any]=None,
+ scope: str=None, response_type: str="code") -> None:
"""Redirects the user to obtain OAuth authorization for this service.
Some providers require that you register a redirect URL with
The ``callback`` argument and returned awaitable were removed;
this is now an ordinary synchronous function.
"""
+ handler = cast(RequestHandler, self)
args = {
- "redirect_uri": redirect_uri,
- "client_id": client_id,
"response_type": response_type
}
+ if redirect_uri is not None:
+ args["redirect_uri"] = redirect_uri
+ if client_id is not None:
+ args["client_id"] = client_id
if extra_params:
args.update(extra_params)
if scope:
args['scope'] = ' '.join(scope)
- self.redirect(
- url_concat(self._OAUTH_AUTHORIZE_URL, args))
-
- def _oauth_request_token_url(self, redirect_uri=None, client_id=None,
- client_secret=None, code=None,
- extra_params=None):
- url = self._OAUTH_ACCESS_TOKEN_URL
- args = dict(
- redirect_uri=redirect_uri,
- code=code,
- client_id=client_id,
- client_secret=client_secret,
- )
+ url = self._OAUTH_AUTHORIZE_URL # type: ignore
+ handler.redirect(url_concat(url, args))
+
+ def _oauth_request_token_url(self, redirect_uri: str=None, client_id: str=None,
+ client_secret: str=None, code: str=None,
+ extra_params: Dict[str, Any]=None) -> str:
+ url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
+ args = {} # type: Dict[str, str]
+ if redirect_uri is not None:
+ args["redirect_uri"] = redirect_uri
+ if code is not None:
+ args["code"] = code
+ if client_id is not None:
+ args["client_id"] = client_id
+ if client_secret is not None:
+ args["client_secret"] = client_secret
if extra_params:
args.update(extra_params)
return url_concat(url, args)
- async def oauth2_request(self, url, access_token=None,
- post_args=None, **args):
+ async def oauth2_request(self, url: str, access_token: str=None,
+ post_args: Dict[str, Any]=None, **args: Any) -> Any:
"""Fetches the given URL auth an OAuth2 access token.
If the request is a POST, ``post_args`` should be provided. Query
response = await http.fetch(url)
return escape.json_decode(response.body)
- def get_auth_http_client(self):
+ def get_auth_http_client(self) -> httpclient.AsyncHTTPClient:
"""Returns the `.AsyncHTTPClient` instance to be used for auth requests.
May be overridden by subclasses to use an HTTP client other than
_OAUTH_NO_CALLBACKS = False
_TWITTER_BASE_URL = "https://api.twitter.com/1.1"
- async def authenticate_redirect(self, callback_uri=None):
+ async def authenticate_redirect(self, callback_uri: str=None) -> None:
"""Just like `~OAuthMixin.authorize_redirect`, but
auto-redirects if authorized.
response = await http.fetch(self._oauth_request_token_url(callback_uri=callback_uri))
self._on_request_token(self._OAUTH_AUTHENTICATE_URL, None, response)
- async def twitter_request(self, path, access_token=None, post_args=None, **args):
+ async def twitter_request(self, path: str, access_token: Dict[str, Any],
+ post_args: Dict[str, Any]=None, **args: Any) -> Any:
"""Fetches the given API path, e.g., ``statuses/user_timeline/btaylor``
The path should not include the format or API version number.
response = await http.fetch(url)
return escape.json_decode(response.body)
- def _oauth_consumer_token(self):
- self.require_setting("twitter_consumer_key", "Twitter OAuth")
- self.require_setting("twitter_consumer_secret", "Twitter OAuth")
+ def _oauth_consumer_token(self) -> Dict[str, Any]:
+ handler = cast(RequestHandler, self)
+ handler.require_setting("twitter_consumer_key", "Twitter OAuth")
+ handler.require_setting("twitter_consumer_secret", "Twitter OAuth")
return dict(
- key=self.settings["twitter_consumer_key"],
- secret=self.settings["twitter_consumer_secret"])
+ key=handler.settings["twitter_consumer_key"],
+ secret=handler.settings["twitter_consumer_secret"])
- async def _oauth_get_user_future(self, access_token):
+ async def _oauth_get_user_future(self, access_token: Dict[str, Any]) -> Dict[str, Any]:
user = await self.twitter_request(
"/account/verify_credentials",
access_token=access_token)
_OAUTH_NO_CALLBACKS = False
_OAUTH_SETTINGS_KEY = 'google_oauth'
- async def get_authenticated_user(self, redirect_uri, code):
+ async def get_authenticated_user(self, redirect_uri: str, code: str) -> Dict[str, Any]:
"""Handles the login for the Google user, returning an access token.
The result is a dictionary containing an ``access_token`` field
The ``callback`` argument was removed. Use the returned awaitable object instead.
""" # noqa: E501
+ handler = cast(RequestHandler, self)
http = self.get_auth_http_client()
body = urllib.parse.urlencode({
"redirect_uri": redirect_uri,
"code": code,
- "client_id": self.settings[self._OAUTH_SETTINGS_KEY]['key'],
- "client_secret": self.settings[self._OAUTH_SETTINGS_KEY]['secret'],
+ "client_id": handler.settings[self._OAUTH_SETTINGS_KEY]['key'],
+ "client_secret": handler.settings[self._OAUTH_SETTINGS_KEY]['secret'],
"grant_type": "authorization_code",
})
_OAUTH_NO_CALLBACKS = False
_FACEBOOK_BASE_URL = "https://graph.facebook.com"
- async def get_authenticated_user(self, redirect_uri, client_id, client_secret,
- code, extra_fields=None):
+ async def get_authenticated_user(
+ self, redirect_uri: str, client_id: str, client_secret: str,
+ code: str, extra_fields: Dict[str, Any]=None
+ ) -> Optional[Dict[str, Any]]:
"""Handles the login for the Facebook user, returning a user object.
Example usage:
if extra_fields:
fields.update(extra_fields)
- response = await http.fetch(self._oauth_request_token_url(**args))
+ response = await http.fetch(self._oauth_request_token_url(**args)) # type: ignore
args = escape.json_decode(response.body)
session = {
"access_token": args.get("access_token"),
"expires_in": args.get("expires_in")
}
+ assert session["access_token"] is not None
user = await self.facebook_request(
path="/me",
"session_expires": str(session.get("expires_in"))})
return fieldmap
- async def facebook_request(self, path, access_token=None, post_args=None, **args):
+ async def facebook_request(self, path: str, access_token: str=None,
+ post_args: Dict[str, Any]=None, **args: Any) -> Any:
"""Fetches the given relative API path, e.g., "/btaylor/picture"
If the request is a POST, ``post_args`` should be provided. Query
post_args=post_args, **args)
-def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
+def _oauth_signature(consumer_token: Dict[str, Any], method: str, url: str,
+ parameters: Dict[str, Any]={}, token: Dict[str, Any]=None) -> bytes:
"""Calculates the HMAC-SHA1 OAuth signature for the given request.
See http://oauth.net/core/1.0/#signing_process
return binascii.b2a_base64(hash.digest())[:-1]
-def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
+def _oauth10a_signature(consumer_token: Dict[str, Any], method: str, url: str,
+ parameters: Dict[str, Any]={}, token: Dict[str, Any]=None) -> bytes:
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
See http://oauth.net/core/1.0a/#signing_process
return binascii.b2a_base64(hash.digest())[:-1]
-def _oauth_escape(val):
+def _oauth_escape(val: Union[str, bytes]) -> str:
if isinstance(val, unicode_type):
val = val.encode("utf-8")
return urllib.parse.quote(val, safe="~")
-def _oauth_parse_response(body):
+def _oauth_parse_response(body: bytes) -> Dict[str, Any]:
# I can't find an officially-defined encoding for oauth responses and
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
- body = escape.native_str(body)
- p = urllib.parse.parse_qs(body, keep_blank_values=False)
+ body_str = escape.native_str(body)
+ p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token