[
("/simple", make_container(self.simple_wsgi_app)),
("/barrier", make_container(self.barrier_wsgi_app)),
+ ("/streaming_barrier", make_container(self.streaming_barrier_wsgi_app)),
]
)
self.respond_plain(start_response)
return [b"Hello world!"]
- def test_simple(self):
- response = self.fetch("/simple")
- self.assertEqual(response.body, b"Hello world!")
-
def barrier_wsgi_app(self, environ, start_response):
self.respond_plain(start_response)
try:
else:
return [b"ok %d" % n]
+ def streaming_barrier_wsgi_app(self, environ, start_response):
+ self.respond_plain(start_response)
+ yield b"ok "
+ try:
+ n = self.barrier.wait()
+ except threading.BrokenBarrierError:
+ yield b"broken barrier"
+ else:
+ yield b"%d" % n
+
class WSGIContainerDummyExecutorTest(WSGIAppMixin, AsyncHTTPTestCase):
def get_executor(self):
return None
+ def test_simple(self):
+ response = self.fetch("/simple")
+ self.assertEqual(response.body, b"Hello world!")
+
@gen_test
async def test_concurrent_barrier(self):
self.barrier.reset()
for resp in resps:
self.assertEqual(resp.body, b"broken barrier")
+ @gen_test
+ async def test_concurrent_streaming_barrier(self):
+ self.barrier.reset()
+ resps = await asyncio.gather(
+ self.http_client.fetch(self.get_url("/streaming_barrier")),
+ self.http_client.fetch(self.get_url("/streaming_barrier")),
+ )
+ for resp in resps:
+ self.assertEqual(resp.body, b"ok broken barrier")
+
class WSGIContainerThreadPoolTest(WSGIAppMixin, AsyncHTTPTestCase):
def get_executor(self):
return concurrent.futures.ThreadPoolExecutor()
+ def test_simple(self):
+ response = self.fetch("/simple")
+ self.assertEqual(response.body, b"Hello world!")
+
@gen_test
async def test_concurrent_barrier(self):
self.barrier.reset()
self.http_client.fetch(self.get_url("/barrier")),
)
self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))
+
+ @gen_test
+ async def test_concurrent_streaming_barrier(self):
+ self.barrier.reset()
+ resps = await asyncio.gather(
+ self.http_client.fetch(self.get_url("/streaming_barrier")),
+ self.http_client.fetch(self.get_url("/streaming_barrier")),
+ )
+ self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))
https://github.com/bdarnell/django-tornado-demo for a complete example.
`WSGIContainer` supports executing the WSGI application in custom executors
- using `IOLoop.run_in_executor`. The default executor uses
- `tornado.concurrent.dummy_executor` which works synchronously, but other
+ using `.IOLoop.run_in_executor`. The default executor uses
+ ``tornado.concurrent.dummy_executor`` which works synchronously, but other
executors subclassing `concurrent.futures.Executor` may be used. To execute
WSGI application code in separate threads in an event-loop compatible way
use::
http_server.listen(8888)
await asyncio.Event().wait()
- Running the WSGI app with a `ThreadPoolExecutor` remains *less scalable*
+ Running the WSGI app with a ``ThreadPoolExecutor`` remains *less scalable*
than running the same app in a multi-threaded WSGI server like ``gunicorn``
or ``uwsgi``.
"""
def __init__(
self,
wsgi_application: "WSGIAppType",
- executor: concurrent.futures.Executor = None,
+ executor: Optional[concurrent.futures.Executor] = None,
) -> None:
self.wsgi_application = wsgi_application
self.executor = dummy_executor if executor is None else executor
start_response,
)
try:
- response.extend(app_response)
- body = b"".join(response)
+ app_response_iter = iter(app_response)
+
+ def next_chunk() -> Optional[bytes]:
+ try:
+ return next(app_response_iter)
+ except StopIteration:
+ # StopIteration is special and is not allowed to pass through
+ # coroutines normally.
+ return None
+
+ while True:
+ chunk = await loop.run_in_executor(self.executor, next_chunk)
+ if chunk is None:
+ break
+ response.append(chunk)
finally:
if hasattr(app_response, "close"):
app_response.close() # type: ignore
+ body = b"".join(response)
if not data:
raise Exception("WSGI app did not call start_response")