curl.setopt(pycurl.PROXY, request.proxy_host)
curl.setopt(pycurl.PROXYPORT, request.proxy_port)
if request.proxy_username:
- credentials = '%s:%s' % (request.proxy_username,
- request.proxy_password)
+ credentials = httputil.encode_username_password(request.proxy_username,
+ request.proxy_password)
curl.setopt(pycurl.PROXYUSERPWD, credentials)
if (request.proxy_auth_mode is None or
curl.setopt(pycurl.INFILESIZE, len(request.body or ''))
if request.auth_username is not None:
- userpwd = "%s:%s" % (request.auth_username, request.auth_password or '')
-
if request.auth_mode is None or request.auth_mode == "basic":
curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
elif request.auth_mode == "digest":
else:
raise ValueError("Unsupported auth_mode %s" % request.auth_mode)
- curl.setopt(pycurl.USERPWD, utf8(userpwd))
+ userpwd = httputil.encode_username_password(request.auth_username,
+ request.auth_password)
+ curl.setopt(pycurl.USERPWD, userpwd)
curl_log.debug("%s %s (username: %r)", request.method, request.url,
request.auth_username)
else:
import numbers
import re
import time
+import unicodedata
import warnings
from tornado.escape import native_str, parse_qs_bytes, utf8
from tornado.log import gen_log
-from tornado.util import ObjectDict, PY3
+from tornado.util import ObjectDict, PY3, unicode_type
if PY3:
import http.cookies as Cookie
return '; '.join(out)
+def encode_username_password(username, password):
+ """Encodes a username/password pair in the format used by HTTP auth.
+
+ The return value is a byte string in the form ``username:password``.
+
+ .. versionadded:: 5.1
+ """
+ if isinstance(username, unicode_type):
+ username = unicodedata.normalize('NFC', username)
+ if isinstance(password, unicode_type):
+ password = unicodedata.normalize('NFC', password)
+ return utf8(username) + b":" + utf8(password)
+
+
def doctests():
import doctest
return doctest.DocTestSuite()
from __future__ import absolute_import, division, print_function
-from tornado.escape import utf8, _unicode
+from tornado.escape import _unicode
from tornado import gen
from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy
from tornado import httputil
if self.request.auth_mode not in (None, "basic"):
raise ValueError("unsupported auth_mode %s",
self.request.auth_mode)
- auth = utf8(username) + b":" + utf8(password)
- self.request.headers["Authorization"] = (b"Basic " +
- base64.b64encode(auth))
+ self.request.headers["Authorization"] = (
+ b"Basic " + base64.b64encode(
+ httputil.encode_username_password(username, password)))
if self.request.user_agent:
self.request.headers["User-Agent"] = self.request.user_agent
if not self.request.allow_nonstandard_methods:
+# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import base64
import threading
import datetime
from io import BytesIO
+import unicodedata
from tornado.escape import utf8, native_str
from tornado import gen
self.assertIs(exc_info[0][0], ZeroDivisionError)
def test_basic_auth(self):
+ # This test data appears in section 2 of RFC 7617.
self.assertEqual(self.fetch("/auth", auth_username="Aladdin",
auth_password="open sesame").body,
b"Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==")
auth_mode="basic").body,
b"Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==")
+ def test_basic_auth_unicode(self):
+ # This test data appears in section 2.1 of RFC 7617.
+ self.assertEqual(self.fetch("/auth", auth_username="test",
+ auth_password="123£").body,
+ b"Basic dGVzdDoxMjPCow==")
+
+ # The standard mandates NFC. Give it a decomposed username
+ # and ensure it is normalized to composed form.
+ username = unicodedata.normalize("NFD", u"josé")
+ self.assertEqual(self.fetch("/auth",
+ auth_username=username,
+ auth_password="səcrət").body,
+ b"Basic am9zw6k6c8mZY3LJmXQ=")
+
def test_unsupported_auth_mode(self):
# curl and simple clients handle errors a bit differently; the
# important thing is that they don't fall back to basic auth