]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Fix IOStream flow control under kqueue.
authorBen Darnell <ben@bendarnell.com>
Sun, 16 Nov 2014 00:08:21 +0000 (19:08 -0500)
committerBen Darnell <ben@bendarnell.com>
Sun, 16 Nov 2014 00:08:21 +0000 (19:08 -0500)
An early attempt to detect closed connections in kqueue led
us to always read from the socket when data was available,
defeating any higher-level flow control.

Add a test to verify this behavior.

tornado/platform/kqueue.py
tornado/test/iostream_test.py

index de8c046d3ed4be2109d8eae413d5a096687b02a6..f8f3e4a6113ee0901df45b15ae67a16b3cd2a7e2 100644 (file)
@@ -54,8 +54,7 @@ class _KQueue(object):
         if events & IOLoop.WRITE:
             kevents.append(select.kevent(
                 fd, filter=select.KQ_FILTER_WRITE, flags=flags))
-        if events & IOLoop.READ or not kevents:
-            # Always read when there is not a write
+        if events & IOLoop.READ:
             kevents.append(select.kevent(
                 fd, filter=select.KQ_FILTER_READ, flags=flags))
         # Even though control() takes a list, it seems to return EINVAL
index fa8590c8370bd4e1dc663f5ff6d7b2f082c3b41e..f54eed6bb8113c4eb320174005513c2779b2c95a 100644 (file)
@@ -725,6 +725,26 @@ class TestIOStreamMixin(object):
             server.close()
             client.close()
 
+    def test_flow_control(self):
+        MB = 1024 * 1024
+        server, client = self.make_iostream_pair(max_buffer_size=5 * MB)
+        try:
+            # Client writes more than the server will accept.
+            client.write(b"a" * 10 * MB)
+            # The server pauses while reading.
+            server.read_bytes(MB, self.stop)
+            self.wait()
+            self.io_loop.call_later(0.1, self.stop)
+            self.wait()
+            # The client's writes have been blocked; the server can
+            # continue to read gradually.
+            for i in range(9):
+                server.read_bytes(MB, self.stop)
+                self.wait()
+        finally:
+            server.close()
+            client.close()
+
 
 class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
     def _make_client_iostream(self):