state |= self.io_loop.READ
if self.writing():
state |= self.io_loop.WRITE
+ if state == self.io_loop.ERROR:
+ state |= self.io_loop.READ
if state != self._state:
assert self._state is not None, \
"shouldn't happen: _handle_events without self._state"
self._close_callback = None
self._run_callback(cb)
else:
- self._add_io_state(0)
+ self._add_io_state(ioloop.IOLoop.READ)
def _add_io_state(self, state):
"""Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
from tornado.util import b
from tornado.web import RequestHandler, Application
import socket
+import time
class HelloHandler(RequestHandler):
def get(self):
def get_app(self):
return Application([('/', HelloHandler)])
- def make_iostream_pair(self):
+ def make_iostream_pair(self, **kwargs):
port = get_unused_port()
[listener] = netutil.bind_sockets(port, '127.0.0.1',
family=socket.AF_INET)
streams = [None, None]
def accept_callback(connection, address):
- streams[0] = IOStream(connection, io_loop=self.io_loop)
+ streams[0] = IOStream(connection, io_loop=self.io_loop, **kwargs)
self.stop()
def connect_callback():
streams[1] = client_stream
self.stop()
netutil.add_accept_handler(listener, accept_callback,
io_loop=self.io_loop)
- client_stream = IOStream(socket.socket(), io_loop=self.io_loop)
+ client_stream = IOStream(socket.socket(), io_loop=self.io_loop,
+ **kwargs)
client_stream.connect(('127.0.0.1', port),
callback=connect_callback)
self.wait(condition=lambda: all(streams))
finally:
server.close()
client.close()
+
+ def test_close_buffered_data(self):
+ # Similar to the previous test, but with data stored in the OS's
+ # socket buffers instead of the IOStream's read buffer. Out-of-band
+ # close notifications must be delayed until all data has been
+ # drained into the IOStream buffer. (epoll used to use out-of-band
+ # close events with EPOLLRDHUP, but no longer)
+ #
+ # This depends on the read_chunk_size being smaller than the
+ # OS socket buffer, so make it small.
+ server, client = self.make_iostream_pair(read_chunk_size=256)
+ try:
+ server.write(b("A") * 512)
+ client.read_bytes(256, self.stop)
+ data = self.wait()
+ self.assertEqual(b("A") * 256, data)
+ server.close()
+ # Allow the close to propagate to the client side of the
+ # connection. Using add_callback instead of add_timeout
+ # doesn't seem to work, even with multiple iterations
+ self.io_loop.add_timeout(time.time() + 0.01, self.stop)
+ self.wait()
+ client.read_bytes(256, self.stop)
+ data = self.wait()
+ self.assertEqual(b("A") * 256, data)
+ finally:
+ server.close()
+ client.close()