From: Ben Darnell Date: Thu, 22 Sep 2011 07:10:43 +0000 (-0700) Subject: Fix connection-close detection for epoll. X-Git-Tag: v2.1.1~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8572cc40a1c514ac828f04ce672ca3a2499b0132;p=thirdparty%2Ftornado.git Fix connection-close detection for epoll. Previously, closed connections with epoll sent an IOLoop.ERROR event while there was still data in the OS's socket buffers. Event handlers that did not drain the entire socket buffer before processing the close event would lose data. (this was primarily an issue for SimpleAsyncHTTPClient) IOLoop.ERROR no longer includes EPOLLRDHUP (which is only supposed to be used in edge-triggered mode: https://lkml.org/lkml/2003/7/12/132), so closed connections while reading are signaled as zero-byte reads once the buffer is drained (this was already the behavior of the select-based IOLoop). Closed connections while writing are still signaled with EPOLLHUP. Backwards-compatibility note: Listening for IOLoop.ERROR alone is no longer sufficient for detecting closed connections on an otherwise unused socket. IOLoop.ERROR must always be used in combination with READ or WRITE. --- diff --git a/tornado/ioloop.py b/tornado/ioloop.py index 0a97304bc..f9a9f372e 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -102,7 +102,7 @@ class IOLoop(object): NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT - ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP + ERROR = _EPOLLERR | _EPOLLHUP def __init__(self, impl=None): self._impl = impl or _poll() diff --git a/tornado/iostream.py b/tornado/iostream.py index 122614a69..0027a82bf 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -272,6 +272,8 @@ class IOStream(object): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE + if state == self.io_loop.ERROR: + state |= self.io_loop.READ if state != self._state: assert self._state is not None, \ "shouldn't happen: _handle_events without self._state" @@ -500,7 +502,7 @@ class IOStream(object): self._close_callback = None self._run_callback(cb) else: - self._add_io_state(0) + self._add_io_state(ioloop.IOLoop.READ) def _add_io_state(self, state): """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler. diff --git a/tornado/test/iostream_test.py b/tornado/test/iostream_test.py index dd3b8f6bb..f35614da9 100644 --- a/tornado/test/iostream_test.py +++ b/tornado/test/iostream_test.py @@ -4,6 +4,7 @@ from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase, get_unused_port from tornado.util import b from tornado.web import RequestHandler, Application import socket +import time class HelloHandler(RequestHandler): def get(self): @@ -13,20 +14,21 @@ class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase): def get_app(self): return Application([('/', HelloHandler)]) - def make_iostream_pair(self): + def make_iostream_pair(self, **kwargs): port = get_unused_port() [listener] = netutil.bind_sockets(port, '127.0.0.1', family=socket.AF_INET) streams = [None, None] def accept_callback(connection, address): - streams[0] = IOStream(connection, io_loop=self.io_loop) + streams[0] = IOStream(connection, io_loop=self.io_loop, **kwargs) self.stop() def connect_callback(): streams[1] = client_stream self.stop() netutil.add_accept_handler(listener, accept_callback, io_loop=self.io_loop) - client_stream = IOStream(socket.socket(), io_loop=self.io_loop) + client_stream = IOStream(socket.socket(), io_loop=self.io_loop, + **kwargs) client_stream.connect(('127.0.0.1', port), callback=connect_callback) self.wait(condition=lambda: all(streams)) @@ -160,3 +162,31 @@ class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase): finally: server.close() client.close() + + def test_close_buffered_data(self): + # Similar to the previous test, but with data stored in the OS's + # socket buffers instead of the IOStream's read buffer. Out-of-band + # close notifications must be delayed until all data has been + # drained into the IOStream buffer. (epoll used to use out-of-band + # close events with EPOLLRDHUP, but no longer) + # + # This depends on the read_chunk_size being smaller than the + # OS socket buffer, so make it small. + server, client = self.make_iostream_pair(read_chunk_size=256) + try: + server.write(b("A") * 512) + client.read_bytes(256, self.stop) + data = self.wait() + self.assertEqual(b("A") * 256, data) + server.close() + # Allow the close to propagate to the client side of the + # connection. Using add_callback instead of add_timeout + # doesn't seem to work, even with multiple iterations + self.io_loop.add_timeout(time.time() + 0.01, self.stop) + self.wait() + client.read_bytes(256, self.stop) + data = self.wait() + self.assertEqual(b("A") * 256, data) + finally: + server.close() + client.close()