version = "2.4.post2"
-if major >= 3:
- import setuptools # setuptools is required for use_2to3
- kwargs["use_2to3"] = True
-
distutils.core.setup(
name="tornado",
version=version,
from tornado import escape
from tornado.httputil import url_concat
from tornado.log import gen_log
-from tornado.util import bytes_type, b, u
+from tornado.util import bytes_type, b, u, unicode_type
try:
import urlparse # py2
except ImportError:
import urllib.parse as urlparse # py3
+try:
+ import urllib.parse as urllib_parse # py3
+except ImportError:
+ import urllib as urllib_parse # py2
+
class OpenIdMixin(object):
"""Abstract implementation of OpenID and Attribute Exchange.
"""
callback_uri = callback_uri or self.request.uri
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
- self.redirect(self._OPENID_ENDPOINT + "?" + urllib.urlencode(args))
+ self.redirect(self._OPENID_ENDPOINT + "?" + urllib_parse.urlencode(args))
def get_authenticated_user(self, callback, http_client=None):
"""Fetches the authenticated user data upon redirect.
methods.
"""
# Verify the OpenID response via direct request to the OP
- args = dict((k, v[-1]) for k, v in self.request.arguments.iteritems())
+ args = dict((k, v[-1]) for k, v in self.request.arguments.items())
args["openid.mode"] = u("check_authentication")
url = self._OPENID_ENDPOINT
if http_client is None:
http_client = self.get_auth_http_client()
http_client.fetch(url, self.async_callback(
self._on_authentication_verified, callback),
- method="POST", body=urllib.urlencode(args))
+ method="POST", body=urllib_parse.urlencode(args))
def _openid_args(self, callback_uri, ax_attrs=[], oauth_scope=None):
url = urlparse.urljoin(self.request.full_url(), callback_uri)
# Make sure we got back at least an email from attribute exchange
ax_ns = None
- for name in self.request.arguments.iterkeys():
+ for name in self.request.arguments.keys():
if name.startswith("openid.ns.") and \
self.get_argument(name) == u("http://openid.net/srv/ax/1.0"):
ax_ns = name[10:]
return u("")
prefix = "openid." + ax_ns + ".type."
ax_name = None
- for name in self.request.arguments.iterkeys():
+ for name in self.request.arguments.keys():
if self.get_argument(name) == uri and name.startswith(prefix):
part = name[len(prefix):]
ax_name = "openid." + ax_ns + ".value." + part
signature = _oauth_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
- return url + "?" + urllib.urlencode(args)
+ return url + "?" + urllib_parse.urlencode(args)
def _on_request_token(self, authorize_url, callback_uri, response):
if response.error:
self.set_cookie("_oauth_request_token", data)
args = dict(oauth_token=request_token["key"])
if callback_uri == "oob":
- self.finish(authorize_url + "?" + urllib.urlencode(args))
+ self.finish(authorize_url + "?" + urllib_parse.urlencode(args))
return
elif callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
- self.redirect(authorize_url + "?" + urllib.urlencode(args))
+ self.redirect(authorize_url + "?" + urllib_parse.urlencode(args))
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
request_token)
args["oauth_signature"] = signature
- return url + "?" + urllib.urlencode(args)
+ return url + "?" + urllib_parse.urlencode(args)
def _on_access_token(self, callback, response):
if response.error:
url, access_token, all_args, method=method)
args.update(oauth)
if args:
- url += "?" + urllib.urlencode(args)
+ url += "?" + urllib_parse.urlencode(args)
callback = self.async_callback(self._on_twitter_request, callback)
http = self.get_auth_http_client()
if post_args is not None:
- http.fetch(url, method="POST", body=urllib.urlencode(post_args),
+ http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args),
callback=callback)
else:
http.fetch(url, callback=callback)
url, access_token, all_args, method=method)
args.update(oauth)
if args:
- url += "?" + urllib.urlencode(args)
+ url += "?" + urllib_parse.urlencode(args)
callback = self.async_callback(self._on_friendfeed_request, callback)
http = self.get_auth_http_client()
if post_args is not None:
- http.fetch(url, method="POST", body=urllib.urlencode(post_args),
+ http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args),
callback=callback)
else:
http.fetch(url, callback=callback)
callback_uri = callback_uri or self.request.uri
args = self._openid_args(callback_uri, ax_attrs=ax_attrs,
oauth_scope=oauth_scope)
- self.redirect(self._OPENID_ENDPOINT + "?" + urllib.urlencode(args))
+ self.redirect(self._OPENID_ENDPOINT + "?" + urllib_parse.urlencode(args))
def get_authenticated_user(self, callback):
"""Fetches the authenticated user data upon redirect."""
args["cancel_url"] = urlparse.urljoin(
self.request.full_url(), cancel_uri)
if extended_permissions:
- if isinstance(extended_permissions, (unicode, bytes_type)):
+ if isinstance(extended_permissions, (unicode_type, bytes_type)):
extended_permissions = [extended_permissions]
args["req_perms"] = ",".join(extended_permissions)
self.redirect("http://www.facebook.com/login.php?" +
- urllib.urlencode(args))
+ urllib_parse.urlencode(args))
def authorize_redirect(self, extended_permissions, callback_uri=None,
cancel_uri=None):
args["format"] = "json"
args["sig"] = self._signature(args)
url = "http://api.facebook.com/restserver.php?" + \
- urllib.urlencode(args)
+ urllib_parse.urlencode(args)
http = self.get_auth_http_client()
http.fetch(url, callback=self.async_callback(
self._parse_response, callback))
def _signature(self, args):
parts = ["%s=%s" % (n, args[n]) for n in sorted(args.keys())]
body = "".join(parts) + self.settings["facebook_secret"]
- if isinstance(body, unicode):
+ if isinstance(body, unicode_type):
body = body.encode("utf-8")
return hashlib.md5(body).hexdigest()
all_args.update(args)
if all_args:
- url += "?" + urllib.urlencode(all_args)
+ url += "?" + urllib_parse.urlencode(all_args)
callback = self.async_callback(self._on_facebook_request, callback)
http = self.get_auth_http_client()
if post_args is not None:
- http.fetch(url, method="POST", body=urllib.urlencode(post_args),
+ http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args),
callback=callback)
else:
http.fetch(url, callback=callback)
for k, v in sorted(parameters.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
- key_elems = [escape.utf8(urllib.quote(consumer_token["secret"], safe='~'))]
- key_elems.append(escape.utf8(urllib.quote(token["secret"], safe='~') if token else ""))
+ key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
+ key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
key = b("&").join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
def _oauth_escape(val):
- if isinstance(val, unicode):
+ if isinstance(val, unicode_type):
val = val.encode("utf-8")
- return urllib.quote(val, safe="~")
+ return urllib_parse.quote(val, safe="~")
def _oauth_parse_response(body):
import re
import sys
-import urllib
from tornado.util import bytes_type, unicode_type, basestring_type, u
try:
- from urlparse import parse_qs # Python 2.6+
+ from urllib.parse import parse_qs # py3
except ImportError:
- from cgi import parse_qs
+ try:
+ from urlparse import parse_qs # Python 2.6+
+ except ImportError:
+ from cgi import parse_qs
try:
import htmlentitydefs # py2
except ImportError:
import html.entities as htmlentitydefs # py3
+try:
+ import urllib.parse as urllib_parse # py3
+except ImportError:
+ import urllib as urllib_parse # py2
+
# json module is in the standard library as of python 2.6; fall back to
# simplejson if present for older versions.
try:
def url_escape(value):
"""Returns a valid URL-encoded version of the given value."""
- return urllib.quote_plus(utf8(value))
+ return urllib_parse.quote_plus(utf8(value))
# python 3 changed things around enough that we need two separate
# implementations of url_unescape. We also need our own implementation
the result is a unicode string in the specified encoding.
"""
if encoding is None:
- return urllib.unquote_plus(utf8(value))
+ return urllib_parse.unquote_plus(utf8(value))
else:
- return unicode_type(urllib.unquote_plus(utf8(value)), encoding)
+ return unicode_type(urllib_parse.unquote_plus(utf8(value)), encoding)
parse_qs_bytes = parse_qs
else:
the result is a unicode string in the specified encoding.
"""
if encoding is None:
- return urllib.parse.unquote_to_bytes(value)
+ return urllib_parse.unquote_to_bytes(value)
else:
- return urllib.unquote_plus(to_basestring(value), encoding=encoding)
+ return urllib_parse.unquote_plus(to_basestring(value), encoding=encoding)
def parse_qs_bytes(qs, keep_blank_values=False, strict_parsing=False):
"""Parses a query string like urlparse.parse_qs, but returns the
result = parse_qs(qs, keep_blank_values, strict_parsing,
encoding='latin1', errors='strict')
encoded = {}
- for k, v in result.iteritems():
+ for k, v in result.items():
encoded[k] = [i.encode('latin1') for i in v]
return encoded
Supports lists, tuples, and dictionaries.
"""
if isinstance(obj, dict):
- return dict((recursive_unicode(k), recursive_unicode(v)) for (k, v) in obj.iteritems())
+ return dict((recursive_unicode(k), recursive_unicode(v)) for (k, v) in obj.items())
elif isinstance(obj, list):
return list(recursive_unicode(i) for i in obj)
elif isinstance(obj, tuple):
except ImportError:
from http.client import responses # py3
+try:
+ from urllib import urlencode # py2
+except ImportError:
+ from urllib.parse import urlencode # py3
+
class HTTPHeaders(dict):
"""A dictionary that maintains Http-Header-Case for all keys.
value per key, with multiple values joined by a comma.
>>> h = HTTPHeaders({"content-type": "text/html"})
- >>> h.keys()
+ >>> list(h.keys())
['Content-Type']
>>> h["Content-Type"]
'text/html'
If a header has multiple values, multiple pairs will be
returned with the same name.
"""
- for name, list in self._as_list.iteritems():
+ for name, list in self._as_list.items():
for value in list:
yield (name, value)
"""Returns a dictionary from HTTP header text.
>>> h = HTTPHeaders.parse("Content-Type: text/html\\r\\nContent-Length: 42\\r\\n")
- >>> sorted(h.iteritems())
+ >>> sorted(h.items())
[('Content-Length', '42'), ('Content-Type', 'text/html')]
"""
h = cls()
def update(self, *args, **kwargs):
# dict.update bypasses our __setitem__
- for k, v in dict(*args, **kwargs).iteritems():
+ for k, v in dict(*args, **kwargs).items():
self[k] = v
def copy(self):
return url
if url[-1] not in ('?', '&'):
url += '&' if ('?' in url) else '?'
- return url + urllib.urlencode(args)
+ return url + urlencode(args)
class HTTPFile(ObjectDict):
"""
if content_type.startswith("application/x-www-form-urlencoded"):
uri_arguments = parse_qs_bytes(native_str(body))
- for name, values in uri_arguments.iteritems():
+ for name, values in uri_arguments.items():
values = [v for v in values if v]
if values:
arguments.setdefault(name, []).extend(values)
"""
parts = _parseparam(';' + line)
- key = parts.next()
+ key = next(parts)
pdict = {}
for p in parts:
i = p.find('=')
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
- for fd in self._handlers.keys()[:]:
+ for fd in self._handlers.keys():
try:
os.close(fd)
except Exception:
import collections
import errno
+import numbers
import os
import socket
import sys
``callback`` will be empty.
"""
self._set_read_callback(callback)
- assert isinstance(num_bytes, (int, long))
+ assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._streaming_callback = stack_context.wrap(streaming_callback)
self._try_inline_read()
global _default_locale
global _supported_locales
_default_locale = code
- _supported_locales = frozenset(_translations.keys() + [_default_locale])
+ _supported_locales = frozenset(list(_translations.keys()) + [_default_locale])
def load_translations(directory):
continue
_translations[locale].setdefault(plural, {})[english] = translation
f.close()
- _supported_locales = frozenset(_translations.keys() + [_default_locale])
+ _supported_locales = frozenset(list(_translations.keys()) + [_default_locale])
gen_log.debug("Supported locales: %s", sorted(_supported_locales))
except Exception as e:
gen_log.error("Cannot load translation for '%s': %s", lang, str(e))
continue
- _supported_locales = frozenset(_translations.keys() + [_default_locale])
+ _supported_locales = frozenset(list(_translations.keys()) + [_default_locale])
_use_gettext = True
gen_log.debug("Supported locales: %s", sorted(_supported_locales))
Requests currently in progress may still continue after the
server is stopped.
"""
- for fd, sock in self._sockets.iteritems():
+ for fd, sock in self._sockets.items():
self.io_loop.remove_handler(fd)
sock.close()
print("Usage: %s [OPTIONS]" % sys.argv[0], file=file)
print("\nOptions:\n", file=file)
by_group = {}
- for option in self._options.itervalues():
+ for option in self._options.values():
by_group.setdefault(option.group_name, []).append(option)
for filename, o in sorted(by_group.items()):
except ImportError:
multiprocessing = None
+try:
+ long # py2
+except NameError:
+ long = int # py3
def cpu_count():
"""Returns the number of processors on this machine."""
@classmethod
def _cleanup(cls):
- for pid in cls._waiting.keys():
+ for pid in list(cls._waiting.keys()): # make a copy
cls._try_cleanup_process(pid)
@classmethod
from tornado import escape
from tornado.log import app_log
-from tornado.util import bytes_type, ObjectDict, exec_in
+from tornado.util import bytes_type, ObjectDict, exec_in, unicode_type
try:
from cStringIO import StringIO # py2
"linkify": escape.linkify,
"datetime": datetime,
"_utf8": escape.utf8, # for internal use
- "_string_types": (unicode, bytes_type),
+ "_string_types": (unicode_type, bytes_type),
# __name__ and __loader__ allow the traceback mechanism to find
# the generated source code.
"__name__": self.name.replace('.', '_'),
'http://www.example.com/api/asdf',
dict(key='uiop', secret='5678'),
parameters=dict(foo='bar'))
- import urllib
- urllib.urlencode(params)
self.write(params)
import tornado.escape
from tornado.escape import utf8, xhtml_escape, xhtml_unescape, url_escape, url_unescape, to_unicode, json_decode, json_encode
-from tornado.util import b, u
+from tornado.util import b, u, unicode_type
from tornado.test.util import unittest
linkify_tests = [
# On python2 the escape methods should generally return the same
# type as their argument
self.assertEqual(type(xhtml_escape("foo")), str)
- self.assertEqual(type(xhtml_escape(u("foo"))), unicode)
+ self.assertEqual(type(xhtml_escape(u("foo"))), unicode_type)
def test_json_decode(self):
# json_decode accepts both bytes and unicode, but strings it returns
@gen.engine
def outer():
- for i in xrange(10):
+ for i in range(10):
yield gen.Task(inner)
stack_increase = len(stack_context._state.contexts) - initial_stack_depth
self.assertTrue(stack_increase <= 2)
@gen.engine
def outer():
- for i in xrange(10):
+ for i in range(10):
try:
yield gen.Task(inner)
except ZeroDivisionError:
for field, expected_type in fields:
self.check_type(field, getattr(self.request, field), expected_type)
- self.check_type('header_key', self.request.headers.keys()[0], str)
- self.check_type('header_value', self.request.headers.values()[0], str)
+ self.check_type('header_key', list(self.request.headers.keys())[0], str)
+ self.check_type('header_value', list(self.request.headers.values())[0], str)
- self.check_type('cookie_key', self.request.cookies.keys()[0], str)
- self.check_type('cookie_value', self.request.cookies.values()[0].value, str)
+ self.check_type('cookie_key', list(self.request.cookies.keys())[0], str)
+ self.check_type('cookie_value', list(self.request.cookies.values())[0].value, str)
# secure cookies
- self.check_type('arg_key', self.request.arguments.keys()[0], str)
- self.check_type('arg_value', self.request.arguments.values()[0][0], bytes_type)
+ self.check_type('arg_key', list(self.request.arguments.keys())[0], str)
+ self.check_type('arg_value', list(self.request.arguments.values())[0][0], bytes_type)
def post(self):
self.check_type('body', self.request.body, bytes_type)
def get(self):
# 512KB should be bigger than the socket buffers so it will
# be written out in chunks.
- self.write(''.join(chr(i % 256) * 1024 for i in xrange(512)))
+ self.write(''.join(chr(i % 256) * 1024 for i in range(512)))
class FinishOnCloseHandler(RequestHandler):
@asynchronous
# but there was no pypy for 2.5
pass
NUM_KB = 4096
- for i in xrange(NUM_KB):
+ for i in range(NUM_KB):
client.write(b("A") * 1024)
client.write(b("\r\n"))
server.read_until(b("\r\n"), self.stop)
import tornado.locale
from tornado.escape import utf8
from tornado.test.util import unittest
-from tornado.util import b, u
+from tornado.util import b, u, unicode_type
class TranslationLoaderTest(unittest.TestCase):
class LocaleDataTest(unittest.TestCase):
def test_non_ascii_name(self):
name = tornado.locale.LOCALE_NAMES['es_LA']['name']
- self.assertTrue(isinstance(name, unicode))
+ self.assertTrue(isinstance(name, unicode_type))
self.assertEqual(name, u('Espa\u00f1ol'))
self.assertEqual(utf8(name), b('Espa\xc3\xb1ol'))
from tornado.log import LogFormatter, define_logging_options, enable_pretty_logging
from tornado.options import OptionParser
from tornado.test.util import unittest
-from tornado.util import b, u, bytes_type
+from tornado.util import b, u, bytes_type, basestring_type
@contextlib.contextmanager
def ignore_bytes_warning():
def test_utf8_logging(self):
self.logger.error(u("\u00e9").encode("utf8"))
- if issubclass(bytes_type, basestring):
+ if issubclass(bytes_type, basestring_type):
# on python 2, utf8 byte strings (and by extension ascii byte
# strings) are passed through as-is.
self.assertEqual(self.get_output(), utf8(u("\u00e9")))
from tornado.template import Template, DictLoader, ParseError, Loader
from tornado.testing import ExpectLog
from tornado.test.util import unittest
-from tornado.util import b, u, bytes_type, ObjectDict
+from tornado.util import b, u, bytes_type, ObjectDict, unicode_type
class TemplateTest(unittest.TestCase):
# test simulates unicode characters appearing directly in the
# template file (with utf8 encoding), i.e. \u escapes would not
# be used in the template file itself.
- if str is unicode:
+ if str is unicode_type:
# python 3 needs a different version of this test since
# 2to3 doesn't run on template internals
template = Template(utf8(u('{{ "\u00e9" }}')))
from tornado.template import DictLoader
from tornado.testing import AsyncHTTPTestCase, ExpectLog
from tornado.test.util import unittest
-from tornado.util import b, u, bytes_type, ObjectDict
+from tornado.util import b, u, bytes_type, ObjectDict, unicode_type
from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value, ErrorHandler
import binascii
raise Exception("incorrect type for value: %r" %
type(value))
for value in self.get_arguments(key):
- if type(value) != unicode:
+ if type(value) != unicode_type:
raise Exception("incorrect type for value: %r" %
type(value))
for arg in path_args:
- if type(arg) != unicode:
+ if type(arg) != unicode_type:
raise Exception("incorrect type for path arg: %r" % type(arg))
self.write(dict(path=self.request.path,
path_args=path_args,
# get_argument is an exception from the general rule of using
# type str for non-body data mainly for historical reasons.
- self.check_type('argument', self.get_argument('foo'), unicode)
- self.check_type('cookie_key', self.cookies.keys()[0], str)
- self.check_type('cookie_value', self.cookies.values()[0].value, str)
+ self.check_type('argument', self.get_argument('foo'), unicode_type)
+ self.check_type('cookie_key', list(self.cookies.keys())[0], str)
+ self.check_type('cookie_value', list(self.cookies.values())[0].value, str)
# Secure cookies return bytes because they can contain arbitrary
# data, but regular cookies are native strings.
- if self.cookies.keys() != ['asdf']:
+ if list(self.cookies.keys()) != ['asdf']:
raise Exception("unexpected values for cookie keys: %r" %
self.cookies.keys())
self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes_type)
def get(self, path_component):
# path_component uses type unicode instead of str for consistency
# with get_argument()
- self.check_type('path_component', path_component, unicode)
+ self.check_type('path_component', path_component, unicode_type)
self.write(self.errors)
def post(self, path_component):
- self.check_type('path_component', path_component, unicode)
+ self.check_type('path_component', path_component, unicode_type)
self.write(self.errors)
def check_type(self, name, obj, expected_type):
def describe(s):
if type(s) == bytes_type:
return ["bytes", native_str(binascii.b2a_hex(s))]
- elif type(s) == unicode:
+ elif type(s) == unicode_type:
return ["unicode", s]
raise Exception("unknown type")
self.write({'path': describe(arg),
SimpleAsyncHTTPClient = None
from tornado.log import gen_log
from tornado.stack_context import ExceptionStackContext
-from tornado.util import raise_exc_info
+from tornado.util import raise_exc_info, basestring_type
import logging
import os
import re
:param required: If true, an exeption will be raised if the end of
the ``with`` statement is reached without matching any log entries.
"""
- if isinstance(logger, basestring):
+ if isinstance(logger, basestring_type):
logger = logging.getLogger(logger)
self.logger = logger
self.regex = re.compile(regex)
import hmac
import itertools
import mimetypes
+import numbers
import os.path
import re
import stat
import tornado
import traceback
import types
-import urllib
import uuid
from tornado import escape
from tornado import stack_context
from tornado import template
from tornado.escape import utf8, _unicode
-from tornado.util import b, bytes_type, import_object, ObjectDict, raise_exc_info
+from tornado.util import b, bytes_type, import_object, ObjectDict, raise_exc_info, unicode_type
try:
from io import BytesIO # python 3
except ImportError:
import urllib.parse as urlparse # py3
+try:
+ from urllib import urlencode # py2
+except ImportError:
+ from urllib.parse import urlencode # py3
+
class RequestHandler(object):
"""Subclass this class and define get() or post() to make a handler.
self.path_args = None
self.path_kwargs = None
self.ui = ObjectDict((n, self._ui_method(m)) for n, m in
- application.ui_methods.iteritems())
+ application.ui_methods.items())
# UIModules are available as both `modules` and `_modules` in the
# template namespace. Historically only `modules` was available
# but could be clobbered by user additions to the namespace.
# The template {% module %} directive looks in `_modules` to avoid
# possible conflicts.
self.ui["_modules"] = ObjectDict((n, self._ui_module(n, m)) for n, m in
- application.ui_modules.iteritems())
+ application.ui_modules.items())
self.ui["modules"] = self.ui["_modules"]
self.clear()
# Check since connection is not available in WSGI
def _convert_header_value(self, value):
if isinstance(value, bytes_type):
pass
- elif isinstance(value, unicode):
+ elif isinstance(value, unicode_type):
value = value.encode('utf-8')
- elif isinstance(value, (int, long)):
+ elif isinstance(value, numbers.Integral):
# return immediately since we know the converted value will be safe
return str(value)
elif isinstance(value, datetime.datetime):
values = []
for v in self.request.arguments.get(name, []):
v = self.decode_argument(v, name=name)
- if isinstance(v, unicode):
+ if isinstance(v, unicode_type):
# Get rid of any weird control chars (unless decoding gave
# us bytes, in which case leave it alone)
v = re.sub(r"[\x00-\x08\x0e-\x1f]", " ", v)
timestamp, localtime=False, usegmt=True)
if path:
morsel["path"] = path
- for k, v in kwargs.iteritems():
+ for k, v in kwargs.items():
if k == 'max_age':
k = 'max-age'
morsel[k] = v
css_files = []
html_heads = []
html_bodies = []
- for module in getattr(self, "_active_modules", {}).itervalues():
+ for module in getattr(self, "_active_modules", {}).values():
embed_part = module.embedded_javascript()
if embed_part:
js_embed.append(utf8(embed_part))
file_part = module.javascript_files()
if file_part:
- if isinstance(file_part, (unicode, bytes_type)):
+ if isinstance(file_part, (unicode_type, bytes_type)):
js_files.append(file_part)
else:
js_files.extend(file_part)
css_embed.append(utf8(embed_part))
file_part = module.css_files()
if file_part:
- if isinstance(file_part, (unicode, bytes_type)):
+ if isinstance(file_part, (unicode_type, bytes_type)):
css_files.append(file_part)
else:
css_files.extend(file_part)
raise HTTPError(405)
self.path_args = [self.decode_argument(arg) for arg in args]
self.path_kwargs = dict((k, self.decode_argument(v, name=k))
- for (k, v) in kwargs.iteritems())
+ for (k, v) in kwargs.items())
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
str(self._status_code) +
" " + reason)]
lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, v in
- itertools.chain(self._headers.iteritems(), self._list_headers)])
+ itertools.chain(self._headers.items(), self._list_headers)])
if hasattr(self, "_new_cookie"):
for cookie in self._new_cookie.values():
lines.append(utf8("Set-Cookie: " + cookie.OutputString(None)))
for m in methods:
self._load_ui_methods(m)
else:
- for name, fn in methods.iteritems():
+ for name, fn in methods.items():
if not name.startswith("_") and hasattr(fn, "__call__") \
and name[0].lower() == name[0]:
self.ui_methods[name] = fn
self._load_ui_modules(m)
else:
assert isinstance(modules, dict)
- for name, cls in modules.iteritems():
+ for name, cls in modules.items():
try:
if issubclass(cls, UIModule):
self.ui_modules[name] = cls
if spec.regex.groupindex:
kwargs = dict(
(str(k), unquote(v))
- for (k, v) in match.groupdict().iteritems())
+ for (k, v) in match.groupdict().items())
else:
args = [unquote(s) for s in match.groups()]
break
next_url = self.request.full_url()
else:
next_url = self.request.uri
- url += "?" + urllib.urlencode(dict(next=next_url))
+ url += "?" + urlencode(dict(next=next_url))
self.redirect(url)
return
raise HTTPError(403)
def javascript_files(self):
result = []
for f in self._get_resources("javascript_files"):
- if isinstance(f, (unicode, bytes_type)):
+ if isinstance(f, (unicode_type, bytes_type)):
result.append(f)
else:
result.extend(f)
def css_files(self):
result = []
for f in self._get_resources("css_files"):
- if isinstance(f, (unicode, bytes_type)):
+ if isinstance(f, (unicode_type, bytes_type)):
result.append(f)
else:
result.extend(f)
return self._path
converted_args = []
for a in args:
- if not isinstance(a, (unicode, bytes_type)):
+ if not isinstance(a, (unicode_type, bytes_type)):
a = str(a)
converted_args.append(escape.url_escape(utf8(a)))
return self._path % tuple(converted_args)
except ImportError:
import http.cookies as Cookie # py3
+try:
+ import urllib.parse as urllib_parse # py3
+except ImportError:
+ import urllib as urllib_parse
+
# PEP 3333 specifies that WSGI on python 3 generally deals with byte strings
# that are smuggled inside objects of type unicode (via the latin1 encoding).
# These functions are like those in the tornado.escape module, but defined
assert handler._finished
reason = handler._reason
status = str(handler._status_code) + " " + reason
- headers = handler._headers.items() + handler._list_headers
+ headers = list(handler._headers.items()) + handler._list_headers
if hasattr(handler, "_new_cookie"):
for cookie in handler._new_cookie.values():
headers.append(("Set-Cookie", cookie.OutputString(None)))
def __init__(self, environ):
"""Parses the given WSGI environ to construct the request."""
self.method = environ["REQUEST_METHOD"]
- self.path = urllib.quote(from_wsgi_str(environ.get("SCRIPT_NAME", "")))
- self.path += urllib.quote(from_wsgi_str(environ.get("PATH_INFO", "")))
+ self.path = urllib_parse.quote(from_wsgi_str(environ.get("SCRIPT_NAME", "")))
+ self.path += urllib_parse.quote(from_wsgi_str(environ.get("PATH_INFO", "")))
self.uri = self.path
self.arguments = {}
self.query = environ.get("QUERY_STRING", "")
environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
if "Content-Length" in request.headers:
environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
- for key, value in request.headers.iteritems():
+ for key, value in request.headers.items():
environ["HTTP_" + key.replace("-", "_").upper()] = value
return environ