]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Run some (synchronous) web.py tests via wsgi too.
authorBen Darnell <ben@bendarnell.com>
Mon, 18 Jun 2012 05:57:23 +0000 (22:57 -0700)
committerBen Darnell <ben@bendarnell.com>
Mon, 18 Jun 2012 05:57:23 +0000 (22:57 -0700)
There's probably a more principled way to do this, but for now just port
this batch of tests, which would have caught the add_header bug from
the previous commit, and uncovered an encoding problem on python 3.

tornado/test/web_test.py
tornado/test/wsgi_test.py
tornado/wsgi.py

index 37a098b366fd4a137ccdca60e158e23909811d5d..ad417336535696403ef931cbf38fb2afb140d19d 100644 (file)
@@ -447,10 +447,15 @@ class HeaderInjectionHandler(RequestHandler):
                 raise
 
 
-class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
+# This test is shared with wsgi_test.py
+class WSGISafeWebTest(AsyncHTTPTestCase, LogTrapTestCase):
     COOKIE_SECRET = "WebTest.COOKIE_SECRET"
 
     def get_app(self):
+        self.app = Application(self.get_handlers(), **self.get_app_kwargs())
+        return self.app
+
+    def get_app_kwargs(self):
         loader = DictLoader({
                 "linkify.html": "{% module linkify(message) %}",
                 "page.html": """\
@@ -463,6 +468,11 @@ class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
 {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="<meta>", html_body='<script src="/analytics.js"/>') }}
 <div class="entry">...</div>""",
                 })
+        return dict(template_loader=loader,
+                    autoescape="xhtml_escape",
+                    cookie_secret=self.COOKIE_SECRET)
+
+    def get_handlers(self):
         urls = [
             url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
             url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
@@ -470,17 +480,11 @@ class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
             url("/linkify", LinkifyHandler),
             url("/uimodule_resources", UIModuleResourceHandler),
             url("/optional_path/(.+)?", OptionalPathHandler),
-            url("/flow_control", FlowControlHandler),
             url("/multi_header", MultiHeaderHandler),
             url("/redirect", RedirectHandler),
-            url("/empty_flush", EmptyFlushCallbackHandler),
             url("/header_injection", HeaderInjectionHandler),
             ]
-        self.app = Application(urls,
-                               template_loader=loader,
-                               autoescape="xhtml_escape",
-                               cookie_secret=self.COOKIE_SECRET)
-        return self.app
+        return urls
 
     def fetch_json(self, *args, **kwargs):
         response = self.fetch(*args, **kwargs)
@@ -566,9 +570,6 @@ js_embed()
         self.assertEqual(self.fetch_json("/optional_path/"),
                          {u"path": None})
 
-    def test_flow_control(self):
-        self.assertEqual(self.fetch("/flow_control").body, b("123"))
-
     def test_multi_header(self):
         response = self.fetch("/multi_header")
         self.assertEqual(response.headers["x-overwrite"], "2")
@@ -582,15 +583,27 @@ js_embed()
         response = self.fetch("/redirect?status=307", follow_redirects=False)
         self.assertEqual(response.code, 307)
 
-    def test_empty_flush(self):
-        response = self.fetch("/empty_flush")
-        self.assertEqual(response.body, b("ok"))
-
     def test_header_injection(self):
         response = self.fetch("/header_injection")
         self.assertEqual(response.body, b("ok"))
 
 
+class NonWSGIWebTests(AsyncHTTPTestCase, LogTrapTestCase):
+    def get_app(self):
+        urls = [
+            ("/flow_control", FlowControlHandler),
+            ("/empty_flush", EmptyFlushCallbackHandler),
+            ]
+        return Application(urls)
+
+    def test_flow_control(self):
+        self.assertEqual(self.fetch("/flow_control").body, b("123"))
+
+    def test_empty_flush(self):
+        response = self.fetch("/empty_flush")
+        self.assertEqual(response.body, b("ok"))
+
+
 class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
     def get_app(self):
         class DefaultHandler(RequestHandler):
index 54c374065d256924a5455c9e30f922806a744d37..c3a8cfefc7bbc93f01b58ac8696c640a09f6f740 100644 (file)
@@ -66,10 +66,17 @@ class WSGIApplicationTest(AsyncHTTPTestCase, LogTrapTestCase):
 # WSGIContainer and WSGIApplication to make sure everything survives
 # repeated disassembly and reassembly.
 from tornado.test.httpserver_test import HTTPConnectionTest
+from tornado.test.web_test import WSGISafeWebTest
 
 
 class WSGIConnectionTest(HTTPConnectionTest):
     def get_app(self):
         return WSGIContainer(validator(WSGIApplication(self.get_handlers())))
 
+class WSGIWebTest(WSGISafeWebTest):
+    def get_app(self):
+        self.app = WSGIApplication(self.get_handlers(), **self.get_app_kwargs())
+        return WSGIContainer(validator(self.app))
+
 del HTTPConnectionTest
+del WSGISafeWebTest
index e714ec1dd28521557ad08029864ea2480c2d1749..d507cb6e1f589361c90be57f60d87e9b8e03aa3d 100644 (file)
@@ -43,7 +43,7 @@ from tornado import escape
 from tornado import httputil
 from tornado import web
 from tornado.escape import native_str, utf8, parse_qs_bytes
-from tornado.util import b
+from tornado.util import b, bytes_type
 
 try:
     from io import BytesIO  # python 3
@@ -51,6 +51,28 @@ except ImportError:
     from cStringIO import StringIO as BytesIO  # python 2
 
 
+# PEP 3333 specifies that WSGI on python 3 generally deals with byte strings
+# that are smuggled inside objects of type unicode (via the latin1 encoding).
+# These functions are like those in the tornado.escape module, but defined
+# here to minimize the temptation to use them in non-wsgi contexts.
+if str is unicode:
+    def to_wsgi_str(s):
+        assert isinstance(s, bytes_type)
+        return s.decode('latin1')
+
+    def from_wsgi_str(s):
+        assert isinstance(s, str)
+        return s.encode('latin1')
+else:
+    def to_wsgi_str(s):
+        assert isinstance(s, bytes_type)
+        return s
+
+    def from_wsgi_str(s):
+        assert isinstance(s, str)
+        return s
+
+
 class WSGIApplication(web.Application):
     """A WSGI equivalent of `tornado.web.Application`.
 
@@ -108,8 +130,8 @@ class HTTPRequest(object):
     def __init__(self, environ):
         """Parses the given WSGI environ to construct the request."""
         self.method = environ["REQUEST_METHOD"]
-        self.path = urllib.quote(environ.get("SCRIPT_NAME", ""))
-        self.path += urllib.quote(environ.get("PATH_INFO", ""))
+        self.path = urllib.quote(from_wsgi_str(environ.get("SCRIPT_NAME", "")))
+        self.path += urllib.quote(from_wsgi_str(environ.get("PATH_INFO", "")))
         self.uri = self.path
         self.arguments = {}
         self.query = environ.get("QUERY_STRING", "")
@@ -266,7 +288,7 @@ class WSGIContainer(object):
         environ = {
             "REQUEST_METHOD": request.method,
             "SCRIPT_NAME": "",
-            "PATH_INFO": urllib.unquote(request.path),
+            "PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None)),
             "QUERY_STRING": request.query,
             "REMOTE_ADDR": request.remote_ip,
             "SERVER_NAME": host,