From ea03332e204a357bb43b2f106a9868b3701d0623 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Thu, 9 Feb 2023 22:35:46 +0000 Subject: [PATCH] wsgi: Iterate the response in executor too --- tornado/test/wsgi_test.py | 42 +++++++++++++++++++++++++++++++++++---- tornado/wsgi.py | 26 ++++++++++++++++++------ 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/tornado/test/wsgi_test.py b/tornado/test/wsgi_test.py index 268620921..9fbc744e1 100644 --- a/tornado/test/wsgi_test.py +++ b/tornado/test/wsgi_test.py @@ -27,6 +27,7 @@ class WSGIAppMixin: [ ("/simple", make_container(self.simple_wsgi_app)), ("/barrier", make_container(self.barrier_wsgi_app)), + ("/streaming_barrier", make_container(self.streaming_barrier_wsgi_app)), ] ) @@ -39,10 +40,6 @@ class WSGIAppMixin: self.respond_plain(start_response) return [b"Hello world!"] - def test_simple(self): - response = self.fetch("/simple") - self.assertEqual(response.body, b"Hello world!") - def barrier_wsgi_app(self, environ, start_response): self.respond_plain(start_response) try: @@ -52,11 +49,25 @@ class WSGIAppMixin: else: return [b"ok %d" % n] + def streaming_barrier_wsgi_app(self, environ, start_response): + self.respond_plain(start_response) + yield b"ok " + try: + n = self.barrier.wait() + except threading.BrokenBarrierError: + yield b"broken barrier" + else: + yield b"%d" % n + class WSGIContainerDummyExecutorTest(WSGIAppMixin, AsyncHTTPTestCase): def get_executor(self): return None + def test_simple(self): + response = self.fetch("/simple") + self.assertEqual(response.body, b"Hello world!") + @gen_test async def test_concurrent_barrier(self): self.barrier.reset() @@ -67,11 +78,25 @@ class WSGIContainerDummyExecutorTest(WSGIAppMixin, AsyncHTTPTestCase): for resp in resps: self.assertEqual(resp.body, b"broken barrier") + @gen_test + async def test_concurrent_streaming_barrier(self): + self.barrier.reset() + resps = await asyncio.gather( + self.http_client.fetch(self.get_url("/streaming_barrier")), + self.http_client.fetch(self.get_url("/streaming_barrier")), + ) + for resp in resps: + self.assertEqual(resp.body, b"ok broken barrier") + class WSGIContainerThreadPoolTest(WSGIAppMixin, AsyncHTTPTestCase): def get_executor(self): return concurrent.futures.ThreadPoolExecutor() + def test_simple(self): + response = self.fetch("/simple") + self.assertEqual(response.body, b"Hello world!") + @gen_test async def test_concurrent_barrier(self): self.barrier.reset() @@ -80,3 +105,12 @@ class WSGIContainerThreadPoolTest(WSGIAppMixin, AsyncHTTPTestCase): self.http_client.fetch(self.get_url("/barrier")), ) self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps])) + + @gen_test + async def test_concurrent_streaming_barrier(self): + self.barrier.reset() + resps = await asyncio.gather( + self.http_client.fetch(self.get_url("/streaming_barrier")), + self.http_client.fetch(self.get_url("/streaming_barrier")), + ) + self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps])) diff --git a/tornado/wsgi.py b/tornado/wsgi.py index 66d4ca6d5..c37d7dc89 100644 --- a/tornado/wsgi.py +++ b/tornado/wsgi.py @@ -94,8 +94,8 @@ class WSGIContainer(object): https://github.com/bdarnell/django-tornado-demo for a complete example. `WSGIContainer` supports executing the WSGI application in custom executors - using `IOLoop.run_in_executor`. The default executor uses - `tornado.concurrent.dummy_executor` which works synchronously, but other + using `.IOLoop.run_in_executor`. The default executor uses + ``tornado.concurrent.dummy_executor`` which works synchronously, but other executors subclassing `concurrent.futures.Executor` may be used. To execute WSGI application code in separate threads in an event-loop compatible way use:: @@ -109,7 +109,7 @@ class WSGIContainer(object): http_server.listen(8888) await asyncio.Event().wait() - Running the WSGI app with a `ThreadPoolExecutor` remains *less scalable* + Running the WSGI app with a ``ThreadPoolExecutor`` remains *less scalable* than running the same app in a multi-threaded WSGI server like ``gunicorn`` or ``uwsgi``. """ @@ -117,7 +117,7 @@ class WSGIContainer(object): def __init__( self, wsgi_application: "WSGIAppType", - executor: concurrent.futures.Executor = None, + executor: Optional[concurrent.futures.Executor] = None, ) -> None: self.wsgi_application = wsgi_application self.executor = dummy_executor if executor is None else executor @@ -152,11 +152,25 @@ class WSGIContainer(object): start_response, ) try: - response.extend(app_response) - body = b"".join(response) + app_response_iter = iter(app_response) + + def next_chunk() -> Optional[bytes]: + try: + return next(app_response_iter) + except StopIteration: + # StopIteration is special and is not allowed to pass through + # coroutines normally. + return None + + while True: + chunk = await loop.run_in_executor(self.executor, next_chunk) + if chunk is None: + break + response.append(chunk) finally: if hasattr(app_response, "close"): app_response.close() # type: ignore + body = b"".join(response) if not data: raise Exception("WSGI app did not call start_response") -- 2.47.2