class BucketHandler(BaseRequestHandler):
def get(self, bucket_name):
- prefix = self.get_argument("prefix", u"")
- marker = self.get_argument("marker", u"")
+ prefix = self.get_argument("prefix", "")
+ marker = self.get_argument("marker", "")
max_keys = int(self.get_argument("max-keys", 50000))
path = os.path.abspath(os.path.join(self.application.directory, bucket_name))
terse = int(self.get_argument("terse", 0))
-appdirs==1.4.4
-black==20.8b1
-click==7.1.2
-distlib==0.3.1
-filelock==3.0.12
-flake8==3.8.4
-mccabe==0.6.1
-mypy==0.941
+# Requirements for tools used in the development of tornado.
+#
+# This mainly contains tools that should be installed for editor integration.
+# Other tools we use are installed only via tox or CI scripts.
+# This is a manual recreation of the lockfile pattern: maint/requirements.txt
+# is the lockfile, and maint/requirements.in is the input file containing only
+# direct dependencies.
+
+black==22.10.0
+flake8==6.0.0
+mypy==0.991
+types-pycurl==7.45.1.4
+tox==3.27.1
+## The following requirements were added by pip freeze:
+click==8.1.3
+distlib==0.3.6
+filelock==3.8.0
+mccabe==0.7.0
mypy-extensions==0.4.3
-packaging==20.4
-pathspec==0.8.0
-pluggy==0.13.1
-py==1.10.0
-pycodestyle==2.6.0
-pyflakes==2.2.0
-pyparsing==2.4.7
-regex==2020.10.28
-six==1.15.0
-toml==0.10.1
+packaging==21.3
+pathspec==0.10.2
+platformdirs==2.5.4
+pluggy==1.0.0
+py==1.11.0
+pycodestyle==2.10.0
+pyflakes==3.0.1
+pyparsing==3.0.9
+six==1.16.0
tomli==2.0.1
-tox==3.20.1
-typed-ast==1.4.1
-types-pycurl==7.44.7
-typing-extensions==4.1.1
-virtualenv==20.1.0
+typing_extensions==4.4.0
+virtualenv==20.16.7
"""Data used by the tornado.locale module."""
LOCALE_NAMES = {
- "af_ZA": {"name_en": u"Afrikaans", "name": u"Afrikaans"},
- "am_ET": {"name_en": u"Amharic", "name": u"አማርኛ"},
- "ar_AR": {"name_en": u"Arabic", "name": u"العربية"},
- "bg_BG": {"name_en": u"Bulgarian", "name": u"Български"},
- "bn_IN": {"name_en": u"Bengali", "name": u"বাংলা"},
- "bs_BA": {"name_en": u"Bosnian", "name": u"Bosanski"},
- "ca_ES": {"name_en": u"Catalan", "name": u"Català"},
- "cs_CZ": {"name_en": u"Czech", "name": u"Čeština"},
- "cy_GB": {"name_en": u"Welsh", "name": u"Cymraeg"},
- "da_DK": {"name_en": u"Danish", "name": u"Dansk"},
- "de_DE": {"name_en": u"German", "name": u"Deutsch"},
- "el_GR": {"name_en": u"Greek", "name": u"Ελληνικά"},
- "en_GB": {"name_en": u"English (UK)", "name": u"English (UK)"},
- "en_US": {"name_en": u"English (US)", "name": u"English (US)"},
- "es_ES": {"name_en": u"Spanish (Spain)", "name": u"Español (España)"},
- "es_LA": {"name_en": u"Spanish", "name": u"Español"},
- "et_EE": {"name_en": u"Estonian", "name": u"Eesti"},
- "eu_ES": {"name_en": u"Basque", "name": u"Euskara"},
- "fa_IR": {"name_en": u"Persian", "name": u"فارسی"},
- "fi_FI": {"name_en": u"Finnish", "name": u"Suomi"},
- "fr_CA": {"name_en": u"French (Canada)", "name": u"Français (Canada)"},
- "fr_FR": {"name_en": u"French", "name": u"Français"},
- "ga_IE": {"name_en": u"Irish", "name": u"Gaeilge"},
- "gl_ES": {"name_en": u"Galician", "name": u"Galego"},
- "he_IL": {"name_en": u"Hebrew", "name": u"עברית"},
- "hi_IN": {"name_en": u"Hindi", "name": u"हिन्दी"},
- "hr_HR": {"name_en": u"Croatian", "name": u"Hrvatski"},
- "hu_HU": {"name_en": u"Hungarian", "name": u"Magyar"},
- "id_ID": {"name_en": u"Indonesian", "name": u"Bahasa Indonesia"},
- "is_IS": {"name_en": u"Icelandic", "name": u"Íslenska"},
- "it_IT": {"name_en": u"Italian", "name": u"Italiano"},
- "ja_JP": {"name_en": u"Japanese", "name": u"日本語"},
- "ko_KR": {"name_en": u"Korean", "name": u"한국어"},
- "lt_LT": {"name_en": u"Lithuanian", "name": u"Lietuvių"},
- "lv_LV": {"name_en": u"Latvian", "name": u"Latviešu"},
- "mk_MK": {"name_en": u"Macedonian", "name": u"Македонски"},
- "ml_IN": {"name_en": u"Malayalam", "name": u"മലയാളം"},
- "ms_MY": {"name_en": u"Malay", "name": u"Bahasa Melayu"},
- "nb_NO": {"name_en": u"Norwegian (bokmal)", "name": u"Norsk (bokmål)"},
- "nl_NL": {"name_en": u"Dutch", "name": u"Nederlands"},
- "nn_NO": {"name_en": u"Norwegian (nynorsk)", "name": u"Norsk (nynorsk)"},
- "pa_IN": {"name_en": u"Punjabi", "name": u"ਪੰਜਾਬੀ"},
- "pl_PL": {"name_en": u"Polish", "name": u"Polski"},
- "pt_BR": {"name_en": u"Portuguese (Brazil)", "name": u"Português (Brasil)"},
- "pt_PT": {"name_en": u"Portuguese (Portugal)", "name": u"Português (Portugal)"},
- "ro_RO": {"name_en": u"Romanian", "name": u"Română"},
- "ru_RU": {"name_en": u"Russian", "name": u"Русский"},
- "sk_SK": {"name_en": u"Slovak", "name": u"Slovenčina"},
- "sl_SI": {"name_en": u"Slovenian", "name": u"Slovenščina"},
- "sq_AL": {"name_en": u"Albanian", "name": u"Shqip"},
- "sr_RS": {"name_en": u"Serbian", "name": u"Српски"},
- "sv_SE": {"name_en": u"Swedish", "name": u"Svenska"},
- "sw_KE": {"name_en": u"Swahili", "name": u"Kiswahili"},
- "ta_IN": {"name_en": u"Tamil", "name": u"தமிழ்"},
- "te_IN": {"name_en": u"Telugu", "name": u"తెలుగు"},
- "th_TH": {"name_en": u"Thai", "name": u"ภาษาไทย"},
- "tl_PH": {"name_en": u"Filipino", "name": u"Filipino"},
- "tr_TR": {"name_en": u"Turkish", "name": u"Türkçe"},
- "uk_UA": {"name_en": u"Ukraini ", "name": u"Українська"},
- "vi_VN": {"name_en": u"Vietnamese", "name": u"Tiếng Việt"},
- "zh_CN": {"name_en": u"Chinese (Simplified)", "name": u"中文(简体)"},
- "zh_TW": {"name_en": u"Chinese (Traditional)", "name": u"中文(繁體)"},
+ "af_ZA": {"name_en": "Afrikaans", "name": "Afrikaans"},
+ "am_ET": {"name_en": "Amharic", "name": "አማርኛ"},
+ "ar_AR": {"name_en": "Arabic", "name": "العربية"},
+ "bg_BG": {"name_en": "Bulgarian", "name": "Български"},
+ "bn_IN": {"name_en": "Bengali", "name": "বাংলা"},
+ "bs_BA": {"name_en": "Bosnian", "name": "Bosanski"},
+ "ca_ES": {"name_en": "Catalan", "name": "Català"},
+ "cs_CZ": {"name_en": "Czech", "name": "Čeština"},
+ "cy_GB": {"name_en": "Welsh", "name": "Cymraeg"},
+ "da_DK": {"name_en": "Danish", "name": "Dansk"},
+ "de_DE": {"name_en": "German", "name": "Deutsch"},
+ "el_GR": {"name_en": "Greek", "name": "Ελληνικά"},
+ "en_GB": {"name_en": "English (UK)", "name": "English (UK)"},
+ "en_US": {"name_en": "English (US)", "name": "English (US)"},
+ "es_ES": {"name_en": "Spanish (Spain)", "name": "Español (España)"},
+ "es_LA": {"name_en": "Spanish", "name": "Español"},
+ "et_EE": {"name_en": "Estonian", "name": "Eesti"},
+ "eu_ES": {"name_en": "Basque", "name": "Euskara"},
+ "fa_IR": {"name_en": "Persian", "name": "فارسی"},
+ "fi_FI": {"name_en": "Finnish", "name": "Suomi"},
+ "fr_CA": {"name_en": "French (Canada)", "name": "Français (Canada)"},
+ "fr_FR": {"name_en": "French", "name": "Français"},
+ "ga_IE": {"name_en": "Irish", "name": "Gaeilge"},
+ "gl_ES": {"name_en": "Galician", "name": "Galego"},
+ "he_IL": {"name_en": "Hebrew", "name": "עברית"},
+ "hi_IN": {"name_en": "Hindi", "name": "हिन्दी"},
+ "hr_HR": {"name_en": "Croatian", "name": "Hrvatski"},
+ "hu_HU": {"name_en": "Hungarian", "name": "Magyar"},
+ "id_ID": {"name_en": "Indonesian", "name": "Bahasa Indonesia"},
+ "is_IS": {"name_en": "Icelandic", "name": "Íslenska"},
+ "it_IT": {"name_en": "Italian", "name": "Italiano"},
+ "ja_JP": {"name_en": "Japanese", "name": "日本語"},
+ "ko_KR": {"name_en": "Korean", "name": "한국어"},
+ "lt_LT": {"name_en": "Lithuanian", "name": "Lietuvių"},
+ "lv_LV": {"name_en": "Latvian", "name": "Latviešu"},
+ "mk_MK": {"name_en": "Macedonian", "name": "Македонски"},
+ "ml_IN": {"name_en": "Malayalam", "name": "മലയാളം"},
+ "ms_MY": {"name_en": "Malay", "name": "Bahasa Melayu"},
+ "nb_NO": {"name_en": "Norwegian (bokmal)", "name": "Norsk (bokmål)"},
+ "nl_NL": {"name_en": "Dutch", "name": "Nederlands"},
+ "nn_NO": {"name_en": "Norwegian (nynorsk)", "name": "Norsk (nynorsk)"},
+ "pa_IN": {"name_en": "Punjabi", "name": "ਪੰਜਾਬੀ"},
+ "pl_PL": {"name_en": "Polish", "name": "Polski"},
+ "pt_BR": {"name_en": "Portuguese (Brazil)", "name": "Português (Brasil)"},
+ "pt_PT": {"name_en": "Portuguese (Portugal)", "name": "Português (Portugal)"},
+ "ro_RO": {"name_en": "Romanian", "name": "Română"},
+ "ru_RU": {"name_en": "Russian", "name": "Русский"},
+ "sk_SK": {"name_en": "Slovak", "name": "Slovenčina"},
+ "sl_SI": {"name_en": "Slovenian", "name": "Slovenščina"},
+ "sq_AL": {"name_en": "Albanian", "name": "Shqip"},
+ "sr_RS": {"name_en": "Serbian", "name": "Српски"},
+ "sv_SE": {"name_en": "Swedish", "name": "Svenska"},
+ "sw_KE": {"name_en": "Swahili", "name": "Kiswahili"},
+ "ta_IN": {"name_en": "Tamil", "name": "தமிழ்"},
+ "te_IN": {"name_en": "Telugu", "name": "తెలుగు"},
+ "th_TH": {"name_en": "Thai", "name": "ภาษาไทย"},
+ "tl_PH": {"name_en": "Filipino", "name": "Filipino"},
+ "tr_TR": {"name_en": "Turkish", "name": "Türkçe"},
+ "uk_UA": {"name_en": "Ukraini ", "name": "Українська"},
+ "vi_VN": {"name_en": "Vietnamese", "name": "Tiếng Việt"},
+ "zh_CN": {"name_en": "Chinese (Simplified)", "name": "中文(简体)"},
+ "zh_TW": {"name_en": "Chinese (Traditional)", "name": "中文(繁體)"},
}
args = dict(
(k, v[-1]) for k, v in handler.request.arguments.items()
) # type: Dict[str, Union[str, bytes]]
- args["openid.mode"] = u"check_authentication"
+ args["openid.mode"] = "check_authentication"
url = self._OPENID_ENDPOINT # type: ignore
if http_client is None:
http_client = self.get_auth_http_client()
for key in handler.request.arguments:
if (
key.startswith("openid.ns.")
- and handler.get_argument(key) == u"http://openid.net/srv/ax/1.0"
+ and handler.get_argument(key) == "http://openid.net/srv/ax/1.0"
):
ax_ns = key[10:]
break
def get_ax_arg(uri: str) -> str:
if not ax_ns:
- return u""
+ return ""
prefix = "openid." + ax_ns + ".type."
ax_name = None
for name in handler.request.arguments.keys():
ax_name = "openid." + ax_ns + ".value." + part
break
if not ax_name:
- return u""
- return handler.get_argument(ax_name, u"")
+ return ""
+ return handler.get_argument(ax_name, "")
email = get_ax_arg("http://axschema.org/contact/email")
name = get_ax_arg("http://axschema.org/namePerson")
if name:
user["name"] = name
elif name_parts:
- user["name"] = u" ".join(name_parts)
+ user["name"] = " ".join(name_parts)
elif email:
user["name"] = email.split("@")[0]
if email:
)
from tornado.log import app_log
-from typing import Dict, Any, Callable, Union, Tuple, Optional
+from typing import Dict, Any, Callable, Union, Optional
import typing
if typing.TYPE_CHECKING:
- from typing import Deque # noqa: F401
+ from typing import Deque, Tuple # noqa: F401
curl_log = logging.getLogger("tornado.curl_httpclient")
# have a status bar, such as Safari by default)
params += ' title="%s"' % href
- return u'<a href="%s"%s>%s</a>' % (href, params, url)
+ return '<a href="%s"%s>%s</a>' % (href, params, url)
# First HTML-escape so that our strings are all safe.
# The regex is modified to avoid character entites other than & so
def __init__(self, code: str) -> None:
self.code = code
- self.name = LOCALE_NAMES.get(code, {}).get("name", u"Unknown")
+ self.name = LOCALE_NAMES.get(code, {}).get("name", "Unknown")
self.rtl = False
for prefix in ["fa", "ar", "he"]:
if self.code.startswith(prefix):
str_time = "%d:%02d" % (local_date.hour, local_date.minute)
elif self.code == "zh_CN":
str_time = "%s%d:%02d" % (
- (u"\u4e0a\u5348", u"\u4e0b\u5348")[local_date.hour >= 12],
+ ("\u4e0a\u5348", "\u4e0b\u5348")[local_date.hour >= 12],
local_date.hour % 12 or 12,
local_date.minute,
)
return ""
if len(parts) == 1:
return parts[0]
- comma = u" \u0648 " if self.code.startswith("fa") else u", "
+ comma = " \u0648 " if self.code.startswith("fa") else ", "
return _("%(commas)s and %(last)s") % {
"commas": comma.join(parts[:-1]),
"last": parts[len(parts) - 1],
# module-import time, the import lock is already held by the main thread,
# leading to deadlock. Avoid it by caching the idna encoder on the main
# thread now.
-u"foo".encode("idna")
+"foo".encode("idna")
# For undiagnosed reasons, 'latin1' codec may also need to be preloaded.
-u"foo".encode("latin1")
+"foo".encode("latin1")
# Default backlog used when calling sock.listen()
_DEFAULT_BACKLOG = 128
sys.platform == "darwin"
and address == "localhost"
and af == socket.AF_INET6
- and sockaddr[3] != 0
+ and sockaddr[3] != 0 # type: ignore
):
# Mac OS X includes a link-local address fe80::1%lo0 in the
# getaddrinfo results for 'localhost'. However, the firewall
from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop, _Selectable
-from typing import Any, TypeVar, Awaitable, Callable, Union, Optional, List, Tuple, Dict
+from typing import Any, TypeVar, Awaitable, Callable, Union, Optional, List, Dict
if typing.TYPE_CHECKING:
- from typing import Set # noqa: F401
+ from typing import Set, Tuple # noqa: F401
from typing_extensions import Protocol
class _HasFileno(Protocol):
return asyncio.get_event_loop_policy().get_event_loop()
-
else:
from asyncio import get_event_loop as _get_event_loop
self._writers[fd] = functools.partial(callback, *args)
self._wake_selector()
- def remove_reader(self, fd: "_FileDescriptorLike") -> None:
- del self._readers[fd]
+ def remove_reader(self, fd: "_FileDescriptorLike") -> bool:
+ try:
+ del self._readers[fd]
+ except KeyError:
+ return False
self._wake_selector()
+ return True
- def remove_writer(self, fd: "_FileDescriptorLike") -> None:
- del self._writers[fd]
+ def remove_writer(self, fd: "_FileDescriptorLike") -> bool:
+ try:
+ del self._writers[fd]
+ except KeyError:
+ return False
self._wake_selector()
+ return True
value: Optional[BaseException],
tb: Optional[TracebackType],
) -> bool:
- if self.final_callback:
+ if self.final_callback is not None:
self._remove_timeout()
if isinstance(value, StreamClosedError):
if value.real_error is None:
import numbers
import datetime
import ssl
+import typing
from tornado.concurrent import Future, future_add_done_callback
from tornado.ioloop import IOLoop
from tornado.netutil import Resolver
from tornado.gen import TimeoutError
-from typing import Any, Union, Dict, Tuple, List, Callable, Iterator, Optional, Set
+from typing import Any, Union, Dict, Tuple, List, Callable, Iterator, Optional
+
+if typing.TYPE_CHECKING:
+ from typing import Set # noqa(F401)
_INITIAL_CONNECT_TIMEOUT = 0.3
self.assertEqual(
parsed,
{
- u"access_token": {
- u"key": u"hjkl",
- u"screen_name": u"foo",
- u"secret": u"vbnm",
+ "access_token": {
+ "key": "hjkl",
+ "screen_name": "foo",
+ "secret": "vbnm",
},
- u"name": u"Foo",
- u"screen_name": u"foo",
- u"username": u"foo",
+ "name": "Foo",
+ "screen_name": "foo",
+ "username": "foo",
},
)
response = self.fetch("/client/login")
self.assertDictEqual(
{
- u"name": u"Foo",
- u"email": u"foo@example.com",
- u"access_token": u"fake-access-token",
+ "name": "Foo",
+ "email": "foo@example.com",
+ "access_token": "fake-access-token",
},
json_decode(response.body),
)
(
"hello http://world.com/!",
{},
- u'hello <a href="http://world.com/">http://world.com/</a>!',
+ 'hello <a href="http://world.com/">http://world.com/</a>!',
),
(
"hello http://world.com/with?param=true&stuff=yes",
{},
- u'hello <a href="http://world.com/with?param=true&stuff=yes">http://world.com/with?param=true&stuff=yes</a>', # noqa: E501
+ 'hello <a href="http://world.com/with?param=true&stuff=yes">http://world.com/with?param=true&stuff=yes</a>', # noqa: E501
),
# an opened paren followed by many chars killed Gruber's regex
(
"http://url.com/w(aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
{},
- u'<a href="http://url.com/w">http://url.com/w</a>(aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', # noqa: E501
+ '<a href="http://url.com/w">http://url.com/w</a>(aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', # noqa: E501
),
# as did too many dots at the end
(
"http://url.com/withmany.......................................",
{},
- u'<a href="http://url.com/withmany">http://url.com/withmany</a>.......................................', # noqa: E501
+ '<a href="http://url.com/withmany">http://url.com/withmany</a>.......................................', # noqa: E501
),
(
"http://url.com/withmany((((((((((((((((((((((((((((((((((a)",
{},
- u'<a href="http://url.com/withmany">http://url.com/withmany</a>((((((((((((((((((((((((((((((((((a)', # noqa: E501
+ '<a href="http://url.com/withmany">http://url.com/withmany</a>((((((((((((((((((((((((((((((((((a)', # noqa: E501
),
# some examples from http://daringfireball.net/2009/11/liberal_regex_for_matching_urls
# plus a fex extras (such as multiple parentheses).
(
"http://foo.com/blah_blah",
{},
- u'<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>',
+ '<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>',
),
(
"http://foo.com/blah_blah/",
{},
- u'<a href="http://foo.com/blah_blah/">http://foo.com/blah_blah/</a>',
+ '<a href="http://foo.com/blah_blah/">http://foo.com/blah_blah/</a>',
),
(
"(Something like http://foo.com/blah_blah)",
{},
- u'(Something like <a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>)',
+ '(Something like <a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>)',
),
(
"http://foo.com/blah_blah_(wikipedia)",
{},
- u'<a href="http://foo.com/blah_blah_(wikipedia)">http://foo.com/blah_blah_(wikipedia)</a>',
+ '<a href="http://foo.com/blah_blah_(wikipedia)">http://foo.com/blah_blah_(wikipedia)</a>',
),
(
"http://foo.com/blah_(blah)_(wikipedia)_blah",
{},
- u'<a href="http://foo.com/blah_(blah)_(wikipedia)_blah">http://foo.com/blah_(blah)_(wikipedia)_blah</a>', # noqa: E501
+ '<a href="http://foo.com/blah_(blah)_(wikipedia)_blah">http://foo.com/blah_(blah)_(wikipedia)_blah</a>', # noqa: E501
),
(
"(Something like http://foo.com/blah_blah_(wikipedia))",
{},
- u'(Something like <a href="http://foo.com/blah_blah_(wikipedia)">http://foo.com/blah_blah_(wikipedia)</a>)', # noqa: E501
+ '(Something like <a href="http://foo.com/blah_blah_(wikipedia)">http://foo.com/blah_blah_(wikipedia)</a>)', # noqa: E501
),
(
"http://foo.com/blah_blah.",
{},
- u'<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>.',
+ '<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>.',
),
(
"http://foo.com/blah_blah/.",
{},
- u'<a href="http://foo.com/blah_blah/">http://foo.com/blah_blah/</a>.',
+ '<a href="http://foo.com/blah_blah/">http://foo.com/blah_blah/</a>.',
),
(
"<http://foo.com/blah_blah>",
{},
- u'<<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>>',
+ '<<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>>',
),
(
"<http://foo.com/blah_blah/>",
{},
- u'<<a href="http://foo.com/blah_blah/">http://foo.com/blah_blah/</a>>',
+ '<<a href="http://foo.com/blah_blah/">http://foo.com/blah_blah/</a>>',
),
(
"http://foo.com/blah_blah,",
{},
- u'<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>,',
+ '<a href="http://foo.com/blah_blah">http://foo.com/blah_blah</a>,',
),
(
"http://www.example.com/wpstyle/?p=364.",
{},
- u'<a href="http://www.example.com/wpstyle/?p=364">http://www.example.com/wpstyle/?p=364</a>.', # noqa: E501
+ '<a href="http://www.example.com/wpstyle/?p=364">http://www.example.com/wpstyle/?p=364</a>.', # noqa: E501
),
(
"rdar://1234",
{"permitted_protocols": ["http", "rdar"]},
- u'<a href="rdar://1234">rdar://1234</a>',
+ '<a href="rdar://1234">rdar://1234</a>',
),
(
"rdar:/1234",
{"permitted_protocols": ["rdar"]},
- u'<a href="rdar:/1234">rdar:/1234</a>',
+ '<a href="rdar:/1234">rdar:/1234</a>',
),
(
"http://userid:password@example.com:8080",
{},
- u'<a href="http://userid:password@example.com:8080">http://userid:password@example.com:8080</a>', # noqa: E501
+ '<a href="http://userid:password@example.com:8080">http://userid:password@example.com:8080</a>', # noqa: E501
),
(
"http://userid@example.com",
{},
- u'<a href="http://userid@example.com">http://userid@example.com</a>',
+ '<a href="http://userid@example.com">http://userid@example.com</a>',
),
(
"http://userid@example.com:8080",
{},
- u'<a href="http://userid@example.com:8080">http://userid@example.com:8080</a>',
+ '<a href="http://userid@example.com:8080">http://userid@example.com:8080</a>',
),
(
"http://userid:password@example.com",
{},
- u'<a href="http://userid:password@example.com">http://userid:password@example.com</a>',
+ '<a href="http://userid:password@example.com">http://userid:password@example.com</a>',
),
(
"message://%3c330e7f8409726r6a4ba78dkf1fd71420c1bf6ff@mail.gmail.com%3e",
{"permitted_protocols": ["http", "message"]},
- u'<a href="message://%3c330e7f8409726r6a4ba78dkf1fd71420c1bf6ff@mail.gmail.com%3e">'
- u"message://%3c330e7f8409726r6a4ba78dkf1fd71420c1bf6ff@mail.gmail.com%3e</a>",
+ '<a href="message://%3c330e7f8409726r6a4ba78dkf1fd71420c1bf6ff@mail.gmail.com%3e">'
+ "message://%3c330e7f8409726r6a4ba78dkf1fd71420c1bf6ff@mail.gmail.com%3e</a>",
),
(
- u"http://\u27a1.ws/\u4a39",
+ "http://\u27a1.ws/\u4a39",
{},
- u'<a href="http://\u27a1.ws/\u4a39">http://\u27a1.ws/\u4a39</a>',
+ '<a href="http://\u27a1.ws/\u4a39">http://\u27a1.ws/\u4a39</a>',
),
(
"<tag>http://example.com</tag>",
{},
- u'<tag><a href="http://example.com">http://example.com</a></tag>',
+ '<tag><a href="http://example.com">http://example.com</a></tag>',
),
(
"Just a www.example.com link.",
{},
- u'Just a <a href="http://www.example.com">www.example.com</a> link.',
+ 'Just a <a href="http://www.example.com">www.example.com</a> link.',
),
(
"Just a www.example.com link.",
{"require_protocol": True},
- u"Just a www.example.com link.",
+ "Just a www.example.com link.",
),
(
"A http://reallylong.com/link/that/exceedsthelenglimit.html",
{"require_protocol": True, "shorten": True},
- u'A <a href="http://reallylong.com/link/that/exceedsthelenglimit.html"'
- u' title="http://reallylong.com/link/that/exceedsthelenglimit.html">http://reallylong.com/link...</a>', # noqa: E501
+ 'A <a href="http://reallylong.com/link/that/exceedsthelenglimit.html"'
+ ' title="http://reallylong.com/link/that/exceedsthelenglimit.html">http://reallylong.com/link...</a>', # noqa: E501
),
(
"A http://reallylongdomainnamethatwillbetoolong.com/hi!",
{"shorten": True},
- u'A <a href="http://reallylongdomainnamethatwillbetoolong.com/hi"'
- u' title="http://reallylongdomainnamethatwillbetoolong.com/hi">http://reallylongdomainnametha...</a>!', # noqa: E501
+ 'A <a href="http://reallylongdomainnamethatwillbetoolong.com/hi"'
+ ' title="http://reallylongdomainnamethatwillbetoolong.com/hi">http://reallylongdomainnametha...</a>!', # noqa: E501
),
(
"A file:///passwords.txt and http://web.com link",
{},
- u'A file:///passwords.txt and <a href="http://web.com">http://web.com</a> link',
+ 'A file:///passwords.txt and <a href="http://web.com">http://web.com</a> link',
),
(
"A file:///passwords.txt and http://web.com link",
{"permitted_protocols": ["file"]},
- u'A <a href="file:///passwords.txt">file:///passwords.txt</a> and http://web.com link',
+ 'A <a href="file:///passwords.txt">file:///passwords.txt</a> and http://web.com link',
),
(
"www.external-link.com",
{"extra_params": 'rel="nofollow" class="external"'},
- u'<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>', # noqa: E501
+ '<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>', # noqa: E501
),
(
"www.external-link.com and www.internal-link.com/blogs extra",
if href.startswith("http://www.internal-link.com")
else 'rel="nofollow" class="external"'
},
- u'<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>' # noqa: E501
- u' and <a href="http://www.internal-link.com/blogs" class="internal">www.internal-link.com/blogs</a> extra', # noqa: E501
+ '<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>' # noqa: E501
+ ' and <a href="http://www.internal-link.com/blogs" class="internal">www.internal-link.com/blogs</a> extra', # noqa: E501
),
(
"www.external-link.com",
{"extra_params": lambda href: ' rel="nofollow" class="external" '},
- u'<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>', # noqa: E501
+ '<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>', # noqa: E501
),
] # type: List[Tuple[Union[str, bytes], Dict[str, Any], str]]
def test_xhtml_escape(self):
tests = [
("<foo>", "<foo>"),
- (u"<foo>", u"<foo>"),
+ ("<foo>", "<foo>"),
(b"<foo>", b"<foo>"),
("<>&\"'", "<>&"'"),
("&", "&amp;"),
- (u"<\u00e9>", u"<\u00e9>"),
+ ("<\u00e9>", "<\u00e9>"),
(b"<\xc3\xa9>", b"<\xc3\xa9>"),
] # type: List[Tuple[Union[str, bytes], Union[str, bytes]]]
for unescaped, escaped in tests:
("foo bar", "foo bar"),
("foo bar", "foo bar"),
("foo bar", "foo bar"),
- ("foo઼bar", u"foo\u0abcbar"),
+ ("foo઼bar", "foo\u0abcbar"),
("foo&#xyz;bar", "foo&#xyz;bar"), # invalid encoding
("foo&#;bar", "foo&#;bar"), # invalid encoding
("foo&#x;bar", "foo&#x;bar"), # invalid encoding
def test_url_escape_unicode(self):
tests = [
# byte strings are passed through as-is
- (u"\u00e9".encode("utf8"), "%C3%A9"),
- (u"\u00e9".encode("latin1"), "%E9"),
+ ("\u00e9".encode("utf8"), "%C3%A9"),
+ ("\u00e9".encode("latin1"), "%E9"),
# unicode strings become utf8
- (u"\u00e9", "%C3%A9"),
+ ("\u00e9", "%C3%A9"),
] # type: List[Tuple[Union[str, bytes], str]]
for unescaped, escaped in tests:
self.assertEqual(url_escape(unescaped), escaped)
def test_url_unescape_unicode(self):
tests = [
- ("%C3%A9", u"\u00e9", "utf8"),
- ("%C3%A9", u"\u00c3\u00a9", "latin1"),
- ("%C3%A9", utf8(u"\u00e9"), None),
+ ("%C3%A9", "\u00e9", "utf8"),
+ ("%C3%A9", "\u00c3\u00a9", "latin1"),
+ ("%C3%A9", utf8("\u00e9"), None),
]
for escaped, unescaped, encoding in tests:
# input strings to url_unescape should only contain ascii
# On python2 the escape methods should generally return the same
# type as their argument
self.assertEqual(type(xhtml_escape("foo")), str)
- self.assertEqual(type(xhtml_escape(u"foo")), unicode_type)
+ self.assertEqual(type(xhtml_escape("foo")), unicode_type)
def test_json_decode(self):
# json_decode accepts both bytes and unicode, but strings it returns
# are always unicode.
- self.assertEqual(json_decode(b'"foo"'), u"foo")
- self.assertEqual(json_decode(u'"foo"'), u"foo")
+ self.assertEqual(json_decode(b'"foo"'), "foo")
+ self.assertEqual(json_decode('"foo"'), "foo")
# Non-ascii bytes are interpreted as utf8
- self.assertEqual(json_decode(utf8(u'"\u00e9"')), u"\u00e9")
+ self.assertEqual(json_decode(utf8('"\u00e9"')), "\u00e9")
def test_json_encode(self):
# json deals with strings, not bytes. On python 2 byte strings will
# convert automatically if they are utf8; on python 3 byte strings
# are not allowed.
- self.assertEqual(json_decode(json_encode(u"\u00e9")), u"\u00e9")
+ self.assertEqual(json_decode(json_encode("\u00e9")), "\u00e9")
if bytes is str:
- self.assertEqual(json_decode(json_encode(utf8(u"\u00e9"))), u"\u00e9")
+ self.assertEqual(json_decode(json_encode(utf8("\u00e9"))), "\u00e9")
self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
def test_squeeze(self):
self.assertEqual(
- squeeze(u"sequences of whitespace chars"),
- u"sequences of whitespace chars",
+ squeeze("sequences of whitespace chars"),
+ "sequences of whitespace chars",
)
def test_recursive_unicode(self):
"tuple": (b"foo", b"bar"),
"bytes": b"foo",
}
- self.assertEqual(recursive_unicode(tests["dict"]), {u"foo": u"bar"})
- self.assertEqual(recursive_unicode(tests["list"]), [u"foo", u"bar"])
- self.assertEqual(recursive_unicode(tests["tuple"]), (u"foo", u"bar"))
- self.assertEqual(recursive_unicode(tests["bytes"]), u"foo")
+ self.assertEqual(recursive_unicode(tests["dict"]), {"foo": "bar"})
+ self.assertEqual(recursive_unicode(tests["list"]), ["foo", "bar"])
+ self.assertEqual(recursive_unicode(tests["tuple"]), ("foo", "bar"))
+ self.assertEqual(recursive_unicode(tests["bytes"]), "foo")
import socket
-import typing
+import typing # noqa(F401)
from tornado.http1connection import HTTP1Connection
from tornado.httputil import HTTPMessageDelegate
# The standard mandates NFC. Give it a decomposed username
# and ensure it is normalized to composed form.
- username = unicodedata.normalize("NFD", u"josé")
+ username = unicodedata.normalize("NFD", "josé")
self.assertEqual(
self.fetch("/auth", auth_username=username, auth_password="səcrət").body,
b"Basic am9zw6k6c8mZY3LJmXQ=",
self.assertEqual(b"Basic " + base64.b64encode(b"me:secret"), response.body)
def test_body_encoding(self):
- unicode_body = u"\xe9"
+ unicode_body = "\xe9"
byte_body = binascii.a2b_hex(b"e9")
# unicode string in body gets converted to utf8
method="POST",
body=byte_body,
headers={"Content-Type": "application/blah"},
- user_agent=u"foo",
+ user_agent="foo",
)
self.assertEqual(response.headers["Content-Length"], "1")
self.assertEqual(response.body, byte_body)
# in a plain dictionary or an HTTPHeaders object.
# Keys must always be the native str type.
# All combinations should have the same results on the wire.
- for value in [u"MyUserAgent", b"MyUserAgent"]:
+ for value in ["MyUserAgent", b"MyUserAgent"]:
for container in [dict, HTTPHeaders]:
headers = container()
headers["User-Agent"] = value
"Foo": "b\xe4r",
},
)
- self.assertEqual(response.body, u"b\xe4r".encode("ISO8859-1"))
+ self.assertEqual(response.body, "b\xe4r".encode("ISO8859-1"))
def test_304_with_content_length(self):
# According to the spec 304 responses SHOULD NOT include
# Non-ascii headers are sent as latin1.
response = self.fetch("/set_header?k=foo&v=%E9")
response.rethrow()
- self.assertEqual(response.headers["Foo"], native_str(u"\u00e9"))
+ self.assertEqual(response.headers["Foo"], native_str("\u00e9"))
def test_response_times(self):
# A few simple sanity checks of the response time fields to
start_time = time.time()
response = self.fetch("/hello")
response.rethrow()
+ assert response.request_time is not None
self.assertGreaterEqual(response.request_time, 0)
self.assertLess(response.request_time, 1.0)
# A very crude check to make sure that start_time is based on
[
b"Content-Disposition: form-data; name=argument",
b"",
- u"\u00e1".encode("utf-8"),
+ "\u00e1".encode("utf-8"),
b"--1234567890",
- u'Content-Disposition: form-data; name="files"; filename="\u00f3"'.encode(
+ 'Content-Disposition: form-data; name="files"; filename="\u00f3"'.encode(
"utf8"
),
b"",
- u"\u00fa".encode("utf-8"),
+ "\u00fa".encode("utf-8"),
b"--1234567890--",
b"",
]
),
)
data = json_decode(response)
- self.assertEqual(u"\u00e9", data["header"])
- self.assertEqual(u"\u00e1", data["argument"])
- self.assertEqual(u"\u00f3", data["filename"])
- self.assertEqual(u"\u00fa", data["filebody"])
+ self.assertEqual("\u00e9", data["header"])
+ self.assertEqual("\u00e1", data["argument"])
+ self.assertEqual("\u00f3", data["filename"])
+ self.assertEqual("\u00fa", data["filebody"])
def test_newlines(self):
# We support both CRLF and bare LF as line separators.
def test_query_string_encoding(self):
response = self.fetch("/echo?foo=%C3%A9")
data = json_decode(response.body)
- self.assertEqual(data, {u"foo": [u"\u00e9"]})
+ self.assertEqual(data, {"foo": ["\u00e9"]})
def test_empty_query_string(self):
response = self.fetch("/echo?foo=&foo=")
data = json_decode(response.body)
- self.assertEqual(data, {u"foo": [u"", u""]})
+ self.assertEqual(data, {"foo": ["", ""]})
def test_empty_post_parameters(self):
response = self.fetch("/echo", method="POST", body="foo=&bar=")
data = json_decode(response.body)
- self.assertEqual(data, {u"foo": [u""], u"bar": [u""]})
+ self.assertEqual(data, {"foo": [""], "bar": [""]})
def test_types(self):
headers = {"Cookie": "foo=bar"}
start_line, headers, response = self.io_loop.run_sync(
lambda: read_stream_body(self.stream)
)
- self.assertEqual(json_decode(response), {u"foo": [u"bar"]})
+ self.assertEqual(json_decode(response), {"foo": ["bar"]})
def test_chunked_request_uppercase(self):
# As per RFC 2616 section 3.6, "Transfer-Encoding" header's value is
start_line, headers, response = self.io_loop.run_sync(
lambda: read_stream_body(self.stream)
)
- self.assertEqual(json_decode(response), {u"foo": [u"bar"]})
+ self.assertEqual(json_decode(response), {"foo": ["bar"]})
@gen_test
def test_invalid_content_length(self):
def test_uncompressed(self):
response = self.fetch("/", method="POST", body="foo=bar")
- self.assertEqual(json_decode(response.body), {u"foo": [u"bar"]})
+ self.assertEqual(json_decode(response.body), {"foo": ["bar"]})
class GzipTest(GzipBaseTest, AsyncHTTPTestCase):
def test_gzip(self):
response = self.post_gzip("foo=bar")
- self.assertEqual(json_decode(response.body), {u"foo": [u"bar"]})
+ self.assertEqual(json_decode(response.body), {"foo": ["bar"]})
def test_gzip_case_insensitive(self):
# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.1
body=compressed_body,
headers={"Content-Encoding": "GZIP"},
)
- self.assertEqual(json_decode(response.body), {u"foo": [u"bar"]})
+ self.assertEqual(json_decode(response.body), {"foo": ["bar"]})
class GzipUnsupportedTest(GzipBaseTest, AsyncHTTPTestCase):
args, files = form_data_args()
parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
- self.assertEqual(file["filename"], u"áb.txt")
+ self.assertEqual(file["filename"], "áb.txt")
self.assertEqual(file["body"], b"Foo")
def test_boundary_starts_and_ends_with_quotes(self):
# and cpython's unicodeobject.c (which defines the implementation
# of unicode_type.splitlines(), and uses a different list than TR13).
newlines = [
- u"\u001b", # VERTICAL TAB
- u"\u001c", # FILE SEPARATOR
- u"\u001d", # GROUP SEPARATOR
- u"\u001e", # RECORD SEPARATOR
- u"\u0085", # NEXT LINE
- u"\u2028", # LINE SEPARATOR
- u"\u2029", # PARAGRAPH SEPARATOR
+ "\u001b", # VERTICAL TAB
+ "\u001c", # FILE SEPARATOR
+ "\u001d", # GROUP SEPARATOR
+ "\u001e", # RECORD SEPARATOR
+ "\u0085", # NEXT LINE
+ "\u2028", # LINE SEPARATOR
+ "\u2029", # PARAGRAPH SEPARATOR
]
for newline in newlines:
# Try the utf8 and latin1 representations of each newline
)
locale = tornado.locale.get("fr_FR")
self.assertTrue(isinstance(locale, tornado.locale.CSVLocale))
- self.assertEqual(locale.translate("school"), u"\u00e9cole")
+ self.assertEqual(locale.translate("school"), "\u00e9cole")
def test_csv_bom(self):
with open(
tornado.locale.load_translations(tmpdir)
locale = tornado.locale.get("fr_FR")
self.assertIsInstance(locale, tornado.locale.CSVLocale)
- self.assertEqual(locale.translate("school"), u"\u00e9cole")
+ self.assertEqual(locale.translate("school"), "\u00e9cole")
finally:
shutil.rmtree(tmpdir)
)
locale = tornado.locale.get("fr_FR")
self.assertTrue(isinstance(locale, tornado.locale.GettextLocale))
- self.assertEqual(locale.translate("school"), u"\u00e9cole")
- self.assertEqual(locale.pgettext("law", "right"), u"le droit")
- self.assertEqual(locale.pgettext("good", "right"), u"le bien")
+ self.assertEqual(locale.translate("school"), "\u00e9cole")
+ self.assertEqual(locale.pgettext("law", "right"), "le droit")
+ self.assertEqual(locale.pgettext("good", "right"), "le bien")
+ self.assertEqual(locale.pgettext("organization", "club", "clubs", 1), "le club")
self.assertEqual(
- locale.pgettext("organization", "club", "clubs", 1), u"le club"
+ locale.pgettext("organization", "club", "clubs", 2), "les clubs"
)
- self.assertEqual(
- locale.pgettext("organization", "club", "clubs", 2), u"les clubs"
- )
- self.assertEqual(locale.pgettext("stick", "club", "clubs", 1), u"le b\xe2ton")
- self.assertEqual(locale.pgettext("stick", "club", "clubs", 2), u"les b\xe2tons")
+ self.assertEqual(locale.pgettext("stick", "club", "clubs", 1), "le b\xe2ton")
+ self.assertEqual(locale.pgettext("stick", "club", "clubs", 2), "les b\xe2tons")
class LocaleDataTest(unittest.TestCase):
def test_non_ascii_name(self):
name = tornado.locale.LOCALE_NAMES["es_LA"]["name"]
self.assertTrue(isinstance(name, unicode_type))
- self.assertEqual(name, u"Espa\u00f1ol")
+ self.assertEqual(name, "Espa\u00f1ol")
self.assertEqual(utf8(name), b"Espa\xc3\xb1ol")
# variable when the tests are run, so just patch in some values
# for testing. (testing with color off fails to expose some potential
# encoding issues from the control characters)
- self.formatter._colors = {logging.ERROR: u"\u0001"}
- self.formatter._normal = u"\u0002"
+ self.formatter._colors = {logging.ERROR: "\u0001"}
+ self.formatter._normal = "\u0002"
# construct a Logger directly to bypass getLogger's caching
self.logger = logging.Logger("LogFormatterTest")
self.logger.propagate = False
def test_utf8_logging(self):
with ignore_bytes_warning():
- self.logger.error(u"\u00e9".encode("utf8"))
+ self.logger.error("\u00e9".encode("utf8"))
if issubclass(bytes, basestring_type):
# on python 2, utf8 byte strings (and by extension ascii byte
# strings) are passed through as-is.
- self.assertEqual(self.get_output(), utf8(u"\u00e9"))
+ self.assertEqual(self.get_output(), utf8("\u00e9"))
else:
# on python 3, byte strings always get repr'd even if
# they're ascii-only, so this degenerates into another
# copy of test_bytes_logging.
- self.assertEqual(self.get_output(), utf8(repr(utf8(u"\u00e9"))))
+ self.assertEqual(self.get_output(), utf8(repr(utf8("\u00e9"))))
def test_bytes_exception_logging(self):
try:
# This will be "Exception: \xe9" on python 2 or
# "Exception: b'\xe9'" on python 3.
output = self.get_output()
- self.assertRegex(output, br"Exception.*\\xe9")
+ self.assertRegex(output, rb"Exception.*\\xe9")
# The traceback contains newlines, which should not have been escaped.
- self.assertNotIn(br"\n", output)
+ self.assertNotIn(rb"\n", output)
class UnicodeLogFormatterTest(LogFormatterTest):
return logging.FileHandler(filename, encoding="utf8")
def test_unicode_logging(self):
- self.logger.error(u"\u00e9")
- self.assertEqual(self.get_output(), utf8(u"\u00e9"))
+ self.logger.error("\u00e9")
+ self.assertEqual(self.get_output(), utf8("\u00e9"))
class EnablePrettyLoggingTest(unittest.TestCase):
# this deadlock.
resolver = ThreadedResolver()
-IOLoop.current().run_sync(lambda: resolver.resolve(u"localhost", 80))
+IOLoop.current().run_sync(lambda: resolver.resolve("localhost", 80))
)
def test_unicode_template(self):
- template = Template(utf8(u"\u00e9"))
- self.assertEqual(template.generate(), utf8(u"\u00e9"))
+ template = Template(utf8("\u00e9"))
+ self.assertEqual(template.generate(), utf8("\u00e9"))
def test_unicode_literal_expression(self):
# Unicode literals should be usable in templates. Note that this
# test simulates unicode characters appearing directly in the
# template file (with utf8 encoding), i.e. \u escapes would not
# be used in the template file itself.
- template = Template(utf8(u'{{ "\u00e9" }}'))
- self.assertEqual(template.generate(), utf8(u"\u00e9"))
+ template = Template(utf8('{{ "\u00e9" }}'))
+ self.assertEqual(template.generate(), utf8("\u00e9"))
def test_custom_namespace(self):
loader = DictLoader(
def upper(s):
return to_unicode(s).upper()
- template = Template(utf8(u"{% apply upper %}foo \u00e9{% end %}"))
- self.assertEqual(template.generate(upper=upper), utf8(u"FOO \u00c9"))
+ template = Template(utf8("{% apply upper %}foo \u00e9{% end %}"))
+ self.assertEqual(template.generate(upper=upper), utf8("FOO \u00c9"))
def test_bytes_apply(self):
def upper(s):
return utf8(to_unicode(s).upper())
- template = Template(utf8(u"{% apply upper %}foo \u00e9{% end %}"))
- self.assertEqual(template.generate(upper=upper), utf8(u"FOO \u00c9"))
+ template = Template(utf8("{% apply upper %}foo \u00e9{% end %}"))
+ self.assertEqual(template.generate(upper=upper), utf8("FOO \u00c9"))
def test_if(self):
template = Template(utf8("{% if x > 4 %}yes{% else %}no{% end %}"))
self.assertEqual(template.generate(), "0")
def test_non_ascii_name(self):
- loader = DictLoader({u"t\u00e9st.html": "hello"})
- self.assertEqual(loader.load(u"t\u00e9st.html").generate(), b"hello")
+ loader = DictLoader({"t\u00e9st.html": "hello"})
+ self.assertEqual(loader.load("t\u00e9st.html").generate(), b"hello")
class StackTraceTest(unittest.TestCase):
def test_utf8_in_file(self):
tmpl = self.loader.load("utf8.html")
result = tmpl.generate()
- self.assertEqual(to_unicode(result).strip(), u"H\u00e9llo")
+ self.assertEqual(to_unicode(result).strip(), "H\u00e9llo")
class UnicodeLiteralTest(unittest.TestCase):
def test_unicode_escapes(self):
- self.assertEqual(utf8(u"\u00e9"), b"\xc3\xa9")
+ self.assertEqual(utf8("\u00e9"), b"\xc3\xa9")
class ExecInTest(unittest.TestCase):
self.assertIs(import_object("tornado.escape.utf8"), utf8)
def test_import_member_unicode(self):
- self.assertIs(import_object(u"tornado.escape.utf8"), utf8)
+ self.assertIs(import_object("tornado.escape.utf8"), utf8)
def test_import_module(self):
self.assertIs(import_object("tornado.escape"), tornado.escape)
# The internal implementation of __import__ differs depending on
# whether the thing being imported is a module or not.
# This variant requires a byte string in python 2.
- self.assertIs(import_object(u"tornado.escape"), tornado.escape)
+ self.assertIs(import_object("tornado.escape"), tornado.escape)
class ReUnescapeTest(unittest.TestCase):
# this string base64-encodes to '12345678'
handler.set_secure_cookie("foo", binascii.a2b_hex(b"d76df8e7aefc"), version=1)
cookie = handler._cookies["foo"]
- match = re.match(br"12345678\|([0-9]+)\|([0-9a-f]+)", cookie)
+ match = re.match(rb"12345678\|([0-9]+)\|([0-9a-f]+)", cookie)
assert match is not None
timestamp = match.group(1)
sig = match.group(2)
# Try setting cookies with different argument types
# to ensure that everything gets encoded correctly
self.set_cookie("str", "asdf")
- self.set_cookie("unicode", u"qwer")
+ self.set_cookie("unicode", "qwer")
self.set_cookie("bytes", b"zxcv")
class GetCookieHandler(RequestHandler):
def get(self):
# unicode domain and path arguments shouldn't break things
# either (see bug #285)
- self.set_cookie("unicode_args", "blah", domain=u"foo.com", path=u"/foo")
+ self.set_cookie("unicode_args", "blah", domain="foo.com", path="/foo")
class SetCookieSpecialCharHandler(RequestHandler):
def get(self):
self.assertEqual(
self.fetch_json("/group/%C3%A9?arg=%C3%A9"),
{
- u"path": u"/group/%C3%A9",
- u"path_args": [u"\u00e9"],
- u"args": {u"arg": [u"\u00e9"]},
+ "path": "/group/%C3%A9",
+ "path_args": ["\u00e9"],
+ "args": {"arg": ["\u00e9"]},
},
)
data = json_decode(response.body)
self.assertEqual(
data,
- {u"path": [u"unicode", u"\u00e9"], u"query": [u"unicode", u"\u00e9"]},
+ {"path": ["unicode", "\u00e9"], "query": ["unicode", "\u00e9"]},
)
response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
response.rethrow()
data = json_decode(response.body)
- self.assertEqual(
- data, {u"path": [u"bytes", u"c3a9"], u"query": [u"bytes", u"c3a9"]}
- )
+ self.assertEqual(data, {"path": ["bytes", "c3a9"], "query": ["bytes", "c3a9"]})
def test_decode_argument_invalid_unicode(self):
# test that invalid unicode in URLs causes 400, not 500
data = json_decode(response.body)
self.assertEqual(
data,
- {u"path": [u"unicode", u"1 + 1"], u"query": [u"unicode", u"1 + 1"]},
+ {"path": ["unicode", "1 + 1"], "query": ["unicode", "1 + 1"]},
)
def test_reverse_url(self):
self.assertEqual(self.app.reverse_url("decode_arg", 42), "/decode_arg/42")
self.assertEqual(self.app.reverse_url("decode_arg", b"\xe9"), "/decode_arg/%E9")
self.assertEqual(
- self.app.reverse_url("decode_arg", u"\u00e9"), "/decode_arg/%C3%A9"
+ self.app.reverse_url("decode_arg", "\u00e9"), "/decode_arg/%C3%A9"
)
self.assertEqual(
self.app.reverse_url("decode_arg", "1 + 1"), "/decode_arg/1%20%2B%201"
)
def test_optional_path(self):
- self.assertEqual(self.fetch_json("/optional_path/foo"), {u"path": u"foo"})
- self.assertEqual(self.fetch_json("/optional_path/"), {u"path": None})
+ self.assertEqual(self.fetch_json("/optional_path/foo"), {"path": "foo"})
+ self.assertEqual(self.fetch_json("/optional_path/"), {"path": None})
def test_multi_header(self):
response = self.fetch("/multi_header")
return [
("/str/(?P<path>.*)", EchoHandler),
- (u"/unicode/(?P<path>.*)", EchoHandler),
+ ("/unicode/(?P<path>.*)", EchoHandler),
]
def test_named_urlspec_groups(self):
@gen_test
def test_unicode_message(self):
ws = yield self.ws_connect("/echo")
- ws.write_message(u"hello \u00e9")
+ ws.write_message("hello \u00e9")
response = yield ws.read_message()
- self.assertEqual(response, u"hello \u00e9")
+ self.assertEqual(response, "hello \u00e9")
@gen_test
def test_error_in_closed_client_write_message(self):
ws = yield self.ws_connect("/echo")
ws.close()
with self.assertRaises(WebSocketClosedError):
- ws.write_message(u"hello \u00e9")
+ ws.write_message("hello \u00e9")
@gen_test
def test_render_message(self):
# Find all weak and strong etag values from If-None-Match header
# because RFC 7232 allows multiple etag values in a single header.
etags = re.findall(
- br'\*|(?:W/)?"[^"]*"', utf8(self.request.headers.get("If-None-Match", ""))
+ rb'\*|(?:W/)?"[^"]*"', utf8(self.request.headers.get("If-None-Match", ""))
)
if not computed_etag or not etags:
return False
)
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
- if (
- self.request.method
- not in (
- "GET",
- "HEAD",
- "OPTIONS",
- )
- and self.application.settings.get("xsrf_cookies")
- ):
+ if self.request.method not in (
+ "GET",
+ "HEAD",
+ "OPTIONS",
+ ) and self.application.settings.get("xsrf_cookies"):
self.check_xsrf_cookie()
result = self.prepare()
if result is not None:
- result = await result
+ result = await result # type: ignore
if self._prepared_future is not None:
# Tell the Application we've finished with prepare()
# and are ready for the body to arrive.
# A leading version number in decimal
# with no leading zeros, followed by a pipe.
-_signed_value_version_re = re.compile(br"^([1-9][0-9]*)\|(.*)$")
+_signed_value_version_re = re.compile(rb"^([1-9][0-9]*)\|(.*)$")
def _get_version(value: bytes) -> int: