]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Fix connection-close detection for epoll.
authorBen Darnell <ben@bendarnell.com>
Thu, 22 Sep 2011 07:10:43 +0000 (00:10 -0700)
committerBen Darnell <ben@bendarnell.com>
Thu, 22 Sep 2011 07:32:21 +0000 (00:32 -0700)
Previously, closed connections with epoll sent an IOLoop.ERROR event
while there was still data in the OS's socket buffers.  Event handlers
that did not drain the entire socket buffer before processing the
close event would lose data. (this was primarily an issue for
SimpleAsyncHTTPClient)

IOLoop.ERROR no longer includes EPOLLRDHUP (which is only supposed to be
used in edge-triggered mode: https://lkml.org/lkml/2003/7/12/132), so closed
connections while reading are signaled as zero-byte reads once the buffer
is drained (this was already the behavior of the select-based IOLoop).
Closed connections while writing are still signaled with EPOLLHUP.

Backwards-compatibility note:  Listening for IOLoop.ERROR alone is no longer
sufficient for detecting closed connections on an otherwise unused socket.
IOLoop.ERROR must always be used in combination with READ or WRITE.

tornado/ioloop.py
tornado/iostream.py
tornado/test/iostream_test.py

index 0a97304bcd4d197c16f2d3491da07d0be9f68b2c..f9a9f372eea5457f4437cbbbfbfe9e419c02f933 100644 (file)
@@ -102,7 +102,7 @@ class IOLoop(object):
     NONE = 0
     READ = _EPOLLIN
     WRITE = _EPOLLOUT
-    ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
+    ERROR = _EPOLLERR | _EPOLLHUP
 
     def __init__(self, impl=None):
         self._impl = impl or _poll()
index 122614a691b595c5bf29faa90c0f8265f1e62066..0027a82bf24dec38b13f9d3df7ac996d6c37c073 100644 (file)
@@ -272,6 +272,8 @@ class IOStream(object):
                 state |= self.io_loop.READ
             if self.writing():
                 state |= self.io_loop.WRITE
+            if state == self.io_loop.ERROR:
+                state |= self.io_loop.READ
             if state != self._state:
                 assert self._state is not None, \
                     "shouldn't happen: _handle_events without self._state"
@@ -500,7 +502,7 @@ class IOStream(object):
                     self._close_callback = None
                     self._run_callback(cb)
             else:
-                self._add_io_state(0)
+                self._add_io_state(ioloop.IOLoop.READ)
 
     def _add_io_state(self, state):
         """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
index dd3b8f6bb60f2761549d8a704c4f8f41015a3e52..f35614da97596d6ee79d778572c84697f1de734a 100644 (file)
@@ -4,6 +4,7 @@ from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase, get_unused_port
 from tornado.util import b
 from tornado.web import RequestHandler, Application
 import socket
+import time
 
 class HelloHandler(RequestHandler):
     def get(self):
@@ -13,20 +14,21 @@ class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
     def get_app(self):
         return Application([('/', HelloHandler)])
 
-    def make_iostream_pair(self):
+    def make_iostream_pair(self, **kwargs):
         port = get_unused_port()
         [listener] = netutil.bind_sockets(port, '127.0.0.1',
                                           family=socket.AF_INET)
         streams = [None, None]
         def accept_callback(connection, address):
-            streams[0] = IOStream(connection, io_loop=self.io_loop)
+            streams[0] = IOStream(connection, io_loop=self.io_loop, **kwargs)
             self.stop()
         def connect_callback():
             streams[1] = client_stream
             self.stop()
         netutil.add_accept_handler(listener, accept_callback,
                                    io_loop=self.io_loop)
-        client_stream = IOStream(socket.socket(), io_loop=self.io_loop)
+        client_stream = IOStream(socket.socket(), io_loop=self.io_loop,
+                                 **kwargs)
         client_stream.connect(('127.0.0.1', port),
                               callback=connect_callback)
         self.wait(condition=lambda: all(streams))
@@ -160,3 +162,31 @@ class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
         finally:
             server.close()
             client.close()
+
+    def test_close_buffered_data(self):
+        # Similar to the previous test, but with data stored in the OS's
+        # socket buffers instead of the IOStream's read buffer.  Out-of-band
+        # close notifications must be delayed until all data has been
+        # drained into the IOStream buffer. (epoll used to use out-of-band
+        # close events with EPOLLRDHUP, but no longer)
+        #
+        # This depends on the read_chunk_size being smaller than the
+        # OS socket buffer, so make it small.
+        server, client = self.make_iostream_pair(read_chunk_size=256)
+        try:
+            server.write(b("A") * 512)
+            client.read_bytes(256, self.stop)
+            data = self.wait()
+            self.assertEqual(b("A") * 256, data)
+            server.close()
+            # Allow the close to propagate to the client side of the
+            # connection.  Using add_callback instead of add_timeout
+            # doesn't seem to work, even with multiple iterations
+            self.io_loop.add_timeout(time.time() + 0.01, self.stop)
+            self.wait()
+            client.read_bytes(256, self.stop)
+            data = self.wait()
+            self.assertEqual(b("A") * 256, data)
+        finally:
+            server.close()
+            client.close()