]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
wsgi: Iterate the response in executor too
authorBen Darnell <ben@bendarnell.com>
Thu, 9 Feb 2023 22:35:46 +0000 (22:35 +0000)
committerBen Darnell <ben@bendarnell.com>
Thu, 16 Feb 2023 20:37:56 +0000 (20:37 +0000)
tornado/test/wsgi_test.py
tornado/wsgi.py

index 2686209213e41aee0ee4ecd650f39092c89a87d7..9fbc744e11f012e7622e07fcff53ddb25e577c00 100644 (file)
@@ -27,6 +27,7 @@ class WSGIAppMixin:
             [
                 ("/simple", make_container(self.simple_wsgi_app)),
                 ("/barrier", make_container(self.barrier_wsgi_app)),
+                ("/streaming_barrier", make_container(self.streaming_barrier_wsgi_app)),
             ]
         )
 
@@ -39,10 +40,6 @@ class WSGIAppMixin:
         self.respond_plain(start_response)
         return [b"Hello world!"]
 
-    def test_simple(self):
-        response = self.fetch("/simple")
-        self.assertEqual(response.body, b"Hello world!")
-
     def barrier_wsgi_app(self, environ, start_response):
         self.respond_plain(start_response)
         try:
@@ -52,11 +49,25 @@ class WSGIAppMixin:
         else:
             return [b"ok %d" % n]
 
+    def streaming_barrier_wsgi_app(self, environ, start_response):
+        self.respond_plain(start_response)
+        yield b"ok "
+        try:
+            n = self.barrier.wait()
+        except threading.BrokenBarrierError:
+            yield b"broken barrier"
+        else:
+            yield b"%d" % n
+
 
 class WSGIContainerDummyExecutorTest(WSGIAppMixin, AsyncHTTPTestCase):
     def get_executor(self):
         return None
 
+    def test_simple(self):
+        response = self.fetch("/simple")
+        self.assertEqual(response.body, b"Hello world!")
+
     @gen_test
     async def test_concurrent_barrier(self):
         self.barrier.reset()
@@ -67,11 +78,25 @@ class WSGIContainerDummyExecutorTest(WSGIAppMixin, AsyncHTTPTestCase):
         for resp in resps:
             self.assertEqual(resp.body, b"broken barrier")
 
+    @gen_test
+    async def test_concurrent_streaming_barrier(self):
+        self.barrier.reset()
+        resps = await asyncio.gather(
+            self.http_client.fetch(self.get_url("/streaming_barrier")),
+            self.http_client.fetch(self.get_url("/streaming_barrier")),
+        )
+        for resp in resps:
+            self.assertEqual(resp.body, b"ok broken barrier")
+
 
 class WSGIContainerThreadPoolTest(WSGIAppMixin, AsyncHTTPTestCase):
     def get_executor(self):
         return concurrent.futures.ThreadPoolExecutor()
 
+    def test_simple(self):
+        response = self.fetch("/simple")
+        self.assertEqual(response.body, b"Hello world!")
+
     @gen_test
     async def test_concurrent_barrier(self):
         self.barrier.reset()
@@ -80,3 +105,12 @@ class WSGIContainerThreadPoolTest(WSGIAppMixin, AsyncHTTPTestCase):
             self.http_client.fetch(self.get_url("/barrier")),
         )
         self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))
+
+    @gen_test
+    async def test_concurrent_streaming_barrier(self):
+        self.barrier.reset()
+        resps = await asyncio.gather(
+            self.http_client.fetch(self.get_url("/streaming_barrier")),
+            self.http_client.fetch(self.get_url("/streaming_barrier")),
+        )
+        self.assertEqual([b"ok 0", b"ok 1"], sorted([resp.body for resp in resps]))
index 66d4ca6d5d1c6e0448d1be81275b2d0708134e81..c37d7dc89b8ca3c5a16cdf84f6f50534e3919440 100644 (file)
@@ -94,8 +94,8 @@ class WSGIContainer(object):
     https://github.com/bdarnell/django-tornado-demo for a complete example.
 
     `WSGIContainer` supports executing the WSGI application in custom executors
-    using `IOLoop.run_in_executor`. The default executor uses
-    `tornado.concurrent.dummy_executor` which works synchronously, but other
+    using `.IOLoop.run_in_executor`. The default executor uses
+    ``tornado.concurrent.dummy_executor`` which works synchronously, but other
     executors subclassing `concurrent.futures.Executor` may be used. To execute
     WSGI application code in separate threads in an event-loop compatible way
     use::
@@ -109,7 +109,7 @@ class WSGIContainer(object):
             http_server.listen(8888)
             await asyncio.Event().wait()
 
-    Running the WSGI app with a `ThreadPoolExecutor` remains *less scalable*
+    Running the WSGI app with a ``ThreadPoolExecutor`` remains *less scalable*
     than running the same app in a multi-threaded WSGI server like ``gunicorn``
     or ``uwsgi``.
     """
@@ -117,7 +117,7 @@ class WSGIContainer(object):
     def __init__(
         self,
         wsgi_application: "WSGIAppType",
-        executor: concurrent.futures.Executor = None,
+        executor: Optional[concurrent.futures.Executor] = None,
     ) -> None:
         self.wsgi_application = wsgi_application
         self.executor = dummy_executor if executor is None else executor
@@ -152,11 +152,25 @@ class WSGIContainer(object):
             start_response,
         )
         try:
-            response.extend(app_response)
-            body = b"".join(response)
+            app_response_iter = iter(app_response)
+
+            def next_chunk() -> Optional[bytes]:
+                try:
+                    return next(app_response_iter)
+                except StopIteration:
+                    # StopIteration is special and is not allowed to pass through
+                    # coroutines normally.
+                    return None
+
+            while True:
+                chunk = await loop.run_in_executor(self.executor, next_chunk)
+                if chunk is None:
+                    break
+                response.append(chunk)
         finally:
             if hasattr(app_response, "close"):
                 app_response.close()  # type: ignore
+        body = b"".join(response)
         if not data:
             raise Exception("WSGI app did not call start_response")