raise
-class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
+# This test is shared with wsgi_test.py
+class WSGISafeWebTest(AsyncHTTPTestCase, LogTrapTestCase):
COOKIE_SECRET = "WebTest.COOKIE_SECRET"
def get_app(self):
+ self.app = Application(self.get_handlers(), **self.get_app_kwargs())
+ return self.app
+
+ def get_app_kwargs(self):
loader = DictLoader({
"linkify.html": "{% module linkify(message) %}",
"page.html": """\
{{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="<meta>", html_body='<script src="/analytics.js"/>') }}
<div class="entry">...</div>""",
})
+ return dict(template_loader=loader,
+ autoescape="xhtml_escape",
+ cookie_secret=self.COOKIE_SECRET)
+
+ def get_handlers(self):
urls = [
url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
url("/linkify", LinkifyHandler),
url("/uimodule_resources", UIModuleResourceHandler),
url("/optional_path/(.+)?", OptionalPathHandler),
- url("/flow_control", FlowControlHandler),
url("/multi_header", MultiHeaderHandler),
url("/redirect", RedirectHandler),
- url("/empty_flush", EmptyFlushCallbackHandler),
url("/header_injection", HeaderInjectionHandler),
]
- self.app = Application(urls,
- template_loader=loader,
- autoescape="xhtml_escape",
- cookie_secret=self.COOKIE_SECRET)
- return self.app
+ return urls
def fetch_json(self, *args, **kwargs):
response = self.fetch(*args, **kwargs)
self.assertEqual(self.fetch_json("/optional_path/"),
{u"path": None})
- def test_flow_control(self):
- self.assertEqual(self.fetch("/flow_control").body, b("123"))
-
def test_multi_header(self):
response = self.fetch("/multi_header")
self.assertEqual(response.headers["x-overwrite"], "2")
response = self.fetch("/redirect?status=307", follow_redirects=False)
self.assertEqual(response.code, 307)
- def test_empty_flush(self):
- response = self.fetch("/empty_flush")
- self.assertEqual(response.body, b("ok"))
-
def test_header_injection(self):
response = self.fetch("/header_injection")
self.assertEqual(response.body, b("ok"))
+class NonWSGIWebTests(AsyncHTTPTestCase, LogTrapTestCase):
+ def get_app(self):
+ urls = [
+ ("/flow_control", FlowControlHandler),
+ ("/empty_flush", EmptyFlushCallbackHandler),
+ ]
+ return Application(urls)
+
+ def test_flow_control(self):
+ self.assertEqual(self.fetch("/flow_control").body, b("123"))
+
+ def test_empty_flush(self):
+ response = self.fetch("/empty_flush")
+ self.assertEqual(response.body, b("ok"))
+
+
class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
class DefaultHandler(RequestHandler):
from tornado import httputil
from tornado import web
from tornado.escape import native_str, utf8, parse_qs_bytes
-from tornado.util import b
+from tornado.util import b, bytes_type
try:
from io import BytesIO # python 3
from cStringIO import StringIO as BytesIO # python 2
+# PEP 3333 specifies that WSGI on python 3 generally deals with byte strings
+# that are smuggled inside objects of type unicode (via the latin1 encoding).
+# These functions are like those in the tornado.escape module, but defined
+# here to minimize the temptation to use them in non-wsgi contexts.
+if str is unicode:
+ def to_wsgi_str(s):
+ assert isinstance(s, bytes_type)
+ return s.decode('latin1')
+
+ def from_wsgi_str(s):
+ assert isinstance(s, str)
+ return s.encode('latin1')
+else:
+ def to_wsgi_str(s):
+ assert isinstance(s, bytes_type)
+ return s
+
+ def from_wsgi_str(s):
+ assert isinstance(s, str)
+ return s
+
+
class WSGIApplication(web.Application):
"""A WSGI equivalent of `tornado.web.Application`.
def __init__(self, environ):
"""Parses the given WSGI environ to construct the request."""
self.method = environ["REQUEST_METHOD"]
- self.path = urllib.quote(environ.get("SCRIPT_NAME", ""))
- self.path += urllib.quote(environ.get("PATH_INFO", ""))
+ self.path = urllib.quote(from_wsgi_str(environ.get("SCRIPT_NAME", "")))
+ self.path += urllib.quote(from_wsgi_str(environ.get("PATH_INFO", "")))
self.uri = self.path
self.arguments = {}
self.query = environ.get("QUERY_STRING", "")
environ = {
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": "",
- "PATH_INFO": urllib.unquote(request.path),
+ "PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None)),
"QUERY_STRING": request.query,
"REMOTE_ADDR": request.remote_ip,
"SERVER_NAME": host,