from __future__ import absolute_import, division, print_function
from wsgiref.validate import validator
-from tornado.escape import json_decode
-from tornado.test.httpserver_test import TypeCheckHandler
-from tornado.test.util import ignore_deprecation
from tornado.testing import AsyncHTTPTestCase
-from tornado.web import RequestHandler, Application
-from tornado.wsgi import WSGIApplication, WSGIContainer, WSGIAdapter
-
-from tornado.test import httpserver_test
-from tornado.test import web_test
+from tornado.wsgi import WSGIContainer
class WSGIContainerTest(AsyncHTTPTestCase):
+ # TODO: Now that WSGIAdapter is gone, this is a pretty weak test.
def wsgi_app(self, environ, start_response):
status = "200 OK"
response_headers = [("Content-Type", "text/plain")]
def test_simple(self):
response = self.fetch("/")
self.assertEqual(response.body, b"Hello world!")
-
-
-class WSGIAdapterTest(AsyncHTTPTestCase):
- def get_app(self):
- class HelloHandler(RequestHandler):
- def get(self):
- self.write("Hello world!")
-
- class PathQuotingHandler(RequestHandler):
- def get(self, path):
- self.write(path)
-
- # It would be better to run the wsgiref server implementation in
- # another thread instead of using our own WSGIContainer, but this
- # fits better in our async testing framework and the wsgiref
- # validator should keep us honest
- with ignore_deprecation():
- return WSGIContainer(validator(WSGIAdapter(
- Application([
- ("/", HelloHandler),
- ("/path/(.*)", PathQuotingHandler),
- ("/typecheck", TypeCheckHandler),
- ]))))
-
- def test_simple(self):
- response = self.fetch("/")
- self.assertEqual(response.body, b"Hello world!")
-
- def test_path_quoting(self):
- response = self.fetch("/path/foo%20bar%C3%A9")
- self.assertEqual(response.body, u"foo bar\u00e9".encode("utf-8"))
-
- def test_types(self):
- headers = {"Cookie": "foo=bar"}
- response = self.fetch("/typecheck?foo=bar", headers=headers)
- data = json_decode(response.body)
- self.assertEqual(data, {})
-
- response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers)
- data = json_decode(response.body)
- self.assertEqual(data, {})
-
-
-# This is kind of hacky, but run some of the HTTPServer and web tests
-# through WSGIContainer and WSGIApplication to make sure everything
-# survives repeated disassembly and reassembly.
-class WSGIConnectionTest(httpserver_test.HTTPConnectionTest):
- def get_app(self):
- with ignore_deprecation():
- return WSGIContainer(validator(WSGIAdapter(Application(self.get_handlers()))))
-
-
-def wrap_web_tests_application():
- result = {}
- for cls in web_test.wsgi_safe_tests:
- def class_factory():
- class WSGIApplicationWrappedTest(cls): # type: ignore
- def setUp(self):
- self.warning_catcher = ignore_deprecation()
- self.warning_catcher.__enter__()
- super(WSGIApplicationWrappedTest, self).setUp()
-
- def tearDown(self):
- super(WSGIApplicationWrappedTest, self).tearDown()
- self.warning_catcher.__exit__(None, None, None)
-
- def get_app(self):
- self.app = WSGIApplication(self.get_handlers(),
- **self.get_app_kwargs())
- return WSGIContainer(validator(self.app))
- result["WSGIApplication_" + cls.__name__] = class_factory()
- return result
-
-
-globals().update(wrap_web_tests_application())
-
-
-def wrap_web_tests_adapter():
- result = {}
- for cls in web_test.wsgi_safe_tests:
- class WSGIAdapterWrappedTest(cls): # type: ignore
- def get_app(self):
- self.app = Application(self.get_handlers(),
- **self.get_app_kwargs())
- with ignore_deprecation():
- return WSGIContainer(validator(WSGIAdapter(self.app)))
- result["WSGIAdapter_" + cls.__name__] = WSGIAdapterWrappedTest
- return result
-
-
-globals().update(wrap_web_tests_adapter())
import sys
from io import BytesIO
import tornado
-import warnings
-from tornado.concurrent import Future
from tornado import escape
from tornado import httputil
from tornado.log import access_log
-from tornado import web
-from tornado.escape import native_str
-from tornado.util import unicode_type, PY3
-if PY3:
- import urllib.parse as urllib_parse # py3
-else:
- import urllib as urllib_parse
-
# PEP 3333 specifies that WSGI on python 3 generally deals with byte strings
# that are smuggled inside objects of type unicode (via the latin1 encoding).
-# These functions are like those in the tornado.escape module, but defined
-# here to minimize the temptation to use them in non-wsgi contexts.
-if str is unicode_type:
- def to_wsgi_str(s):
- assert isinstance(s, bytes)
- return s.decode('latin1')
-
- def from_wsgi_str(s):
- assert isinstance(s, str)
- return s.encode('latin1')
-else:
- def to_wsgi_str(s):
- assert isinstance(s, bytes)
- return s
-
- def from_wsgi_str(s):
- assert isinstance(s, str)
- return s
-
-
-class WSGIApplication(web.Application):
- """A WSGI equivalent of `tornado.web.Application`.
-
- .. deprecated:: 4.0
-
- Use a regular `.Application` and wrap it in `WSGIAdapter` instead.
- This class will be removed in Tornado 6.0.
- """
- def __call__(self, environ, start_response):
- return WSGIAdapter(self)(environ, start_response)
-
-
-# WSGI has no facilities for flow control, so just return an already-done
-# Future when the interface requires it.
-def _dummy_future():
- f = Future()
- f.set_result(None)
- return f
-
-
-class _WSGIConnection(httputil.HTTPConnection):
- def __init__(self, method, start_response, context):
- self.method = method
- self.start_response = start_response
- self.context = context
- self._write_buffer = []
- self._finished = False
- self._expected_content_remaining = None
- self._error = None
-
- def set_close_callback(self, callback):
- # WSGI has no facility for detecting a closed connection mid-request,
- # so we can simply ignore the callback.
- pass
-
- def write_headers(self, start_line, headers, chunk=None, callback=None):
- if self.method == 'HEAD':
- self._expected_content_remaining = 0
- elif 'Content-Length' in headers:
- self._expected_content_remaining = int(headers['Content-Length'])
- else:
- self._expected_content_remaining = None
- self.start_response(
- '%s %s' % (start_line.code, start_line.reason),
- [(native_str(k), native_str(v)) for (k, v) in headers.get_all()])
- if chunk is not None:
- self.write(chunk, callback)
- elif callback is not None:
- callback()
- return _dummy_future()
-
- def write(self, chunk, callback=None):
- if self._expected_content_remaining is not None:
- self._expected_content_remaining -= len(chunk)
- if self._expected_content_remaining < 0:
- self._error = httputil.HTTPOutputError(
- "Tried to write more data than Content-Length")
- raise self._error
- self._write_buffer.append(chunk)
- if callback is not None:
- callback()
- return _dummy_future()
-
- def finish(self):
- if (self._expected_content_remaining is not None and
- self._expected_content_remaining != 0):
- self._error = httputil.HTTPOutputError(
- "Tried to write %d bytes less than Content-Length" %
- self._expected_content_remaining)
- raise self._error
- self._finished = True
-
-
-class _WSGIRequestContext(object):
- def __init__(self, remote_ip, protocol):
- self.remote_ip = remote_ip
- self.protocol = protocol
-
- def __str__(self):
- return self.remote_ip
-
-
-class WSGIAdapter(object):
- """Converts a `tornado.web.Application` instance into a WSGI application.
-
- Example usage::
-
- import tornado.web
- import tornado.wsgi
- import wsgiref.simple_server
-
- class MainHandler(tornado.web.RequestHandler):
- def get(self):
- self.write("Hello, world")
-
- if __name__ == "__main__":
- application = tornado.web.Application([
- (r"/", MainHandler),
- ])
- wsgi_app = tornado.wsgi.WSGIAdapter(application)
- server = wsgiref.simple_server.make_server('', 8888, wsgi_app)
- server.serve_forever()
-
- See the `appengine demo
- <https://github.com/tornadoweb/tornado/tree/stable/demos/appengine>`_
- for an example of using this module to run a Tornado app on Google
- App Engine.
-
- In WSGI mode asynchronous methods are not supported. This means
- that it is not possible to use `.AsyncHTTPClient`, or the
- `tornado.auth` or `tornado.websocket` modules.
-
- In multithreaded WSGI servers on Python 3, it may be necessary to
- permit `asyncio` to create event loops on any thread. Run the
- following at startup (typically import time for WSGI
- applications)::
-
- import asyncio
- from tornado.platform.asyncio import AnyThreadEventLoopPolicy
- asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
-
- .. versionadded:: 4.0
-
- .. deprecated:: 5.1
-
- This class is deprecated and will be removed in Tornado 6.0.
- Use Tornado's `.HTTPServer` instead of a WSGI container.
- """
- def __init__(self, application):
- warnings.warn("WSGIAdapter is deprecated, use Tornado's HTTPServer instead",
- DeprecationWarning)
- if isinstance(application, WSGIApplication):
- self.application = lambda request: web.Application.__call__(
- application, request)
- else:
- self.application = application
-
- def __call__(self, environ, start_response):
- method = environ["REQUEST_METHOD"]
- uri = urllib_parse.quote(from_wsgi_str(environ.get("SCRIPT_NAME", "")))
- uri += urllib_parse.quote(from_wsgi_str(environ.get("PATH_INFO", "")))
- if environ.get("QUERY_STRING"):
- uri += "?" + environ["QUERY_STRING"]
- headers = httputil.HTTPHeaders()
- if environ.get("CONTENT_TYPE"):
- headers["Content-Type"] = environ["CONTENT_TYPE"]
- if environ.get("CONTENT_LENGTH"):
- headers["Content-Length"] = environ["CONTENT_LENGTH"]
- for key in environ:
- if key.startswith("HTTP_"):
- headers[key[5:].replace("_", "-")] = environ[key]
- if headers.get("Content-Length"):
- body = environ["wsgi.input"].read(
- int(headers["Content-Length"]))
- else:
- body = b""
- protocol = environ["wsgi.url_scheme"]
- remote_ip = environ.get("REMOTE_ADDR", "")
- if environ.get("HTTP_HOST"):
- host = environ["HTTP_HOST"]
- else:
- host = environ["SERVER_NAME"]
- connection = _WSGIConnection(method, start_response,
- _WSGIRequestContext(remote_ip, protocol))
- request = httputil.HTTPServerRequest(
- method, uri, "HTTP/1.1", headers=headers, body=body,
- host=host, connection=connection)
- request._parse_body()
- self.application(request)
- if connection._error:
- raise connection._error
- if not connection._finished:
- raise Exception("request did not finish synchronously")
- return connection._write_buffer
+# This function is like those in the tornado.escape module, but defined
+# here to minimize the temptation to use it in non-wsgi contexts.
+def to_wsgi_str(s):
+ assert isinstance(s, bytes)
+ return s.decode('latin1')
class WSGIContainer(object):