is different from the ``callback(fd, events)`` signature used for
`.IOLoop` handlers.
+ A callable is returned which, when called, will remove the `.IOLoop`
+ event handler and stop processing further incoming connections.
+
.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
+
+ .. versionchanged:: 5.0
+ A callable is returned (``None`` was returned before).
"""
+ io_loop = IOLoop.current()
+ removed = []
+
def accept_handler(fd, events):
# More connections may come in while we're handling callbacks;
# to prevent starvation of other tasks we must limit the number
# heuristic for the number of connections we can reasonably
# accept at once.
for i in xrange(_DEFAULT_BACKLOG):
+ if removed:
+ # The socket was probably closed
+ return
try:
connection, address = sock.accept()
except socket.error as e:
raise
set_close_exec(connection.fileno())
callback(connection, address)
- IOLoop.current().add_handler(sock, accept_handler, IOLoop.READ)
+
+ def remove_handler():
+ io_loop.remove_handler(sock)
+ removed.append(True)
+
+ io_loop.add_handler(sock, accept_handler, IOLoop.READ)
+ return remove_handler
def is_valid_ip(ip):
read_chunk_size=None):
self.io_loop = IOLoop.current()
self.ssl_options = ssl_options
- self._sockets = {} # fd -> socket object
+ self._sockets = {} # fd -> socket object
+ self._handlers = {} # fd -> remove_handler callable
self._pending_sockets = []
self._started = False
self._stopped = False
"""
for sock in sockets:
self._sockets[sock.fileno()] = sock
- add_accept_handler(sock, self._handle_connection)
+ self._handlers[sock.fileno()] = add_accept_handler(
+ sock, self._handle_connection)
def add_socket(self, socket):
"""Singular version of `add_sockets`. Takes a single socket object."""
self._stopped = True
for fd, sock in self._sockets.items():
assert sock.fileno() == fd
- self.io_loop.remove_handler(fd)
+ # Unregister socket from IOLoop
+ self._handlers.pop(fd)()
sock.close()
def handle_stream(self, stream, address):
from __future__ import absolute_import, division, print_function
+#from functools import partial
import socket
from tornado import gen
-from tornado.iostream import IOStream
+from tornado.iostream import IOStream, StreamClosedError
from tornado.log import app_log
from tornado.stack_context import NullContext
from tornado.tcpserver import TCPServer
server.add_socket(sock)
server.stop()
server.stop()
+
+ @gen_test
+ def test_stop_in_callback(self):
+ # Issue #2069: calling server.stop() in a loop callback should not
+ # raise EBADF when the loop handles other server connection
+ # requests in the same loop iteration
+
+ class TestServer(TCPServer):
+ @gen.coroutine
+ def handle_stream(self, stream, address):
+ server.stop()
+ yield stream.read_until_close()
+
+ sock, port = bind_unused_port()
+ server = TestServer()
+ server.add_socket(sock)
+ server_addr = ('localhost', port)
+ N = 40
+ clients = [IOStream(socket.socket()) for i in range(N)]
+ connected_clients = []
+
+ @gen.coroutine
+ def connect(c):
+ try:
+ yield c.connect(server_addr)
+ except EnvironmentError:
+ pass
+ else:
+ connected_clients.append(c)
+
+ yield [connect(c) for c in clients]
+
+ self.assertGreater(len(connected_clients), 0,
+ "all clients failed connecting")
+ self.assertLess(len(connected_clients), N,
+ "at least one client should fail connecting for "
+ "the test to be meaningful")
+
+ for c in connected_clients:
+ c.close()
+ # AsyncTestCase.tearDown() would re-raise the EBADF encountered in the IO loop
+