From: Antoine Pitrou Date: Tue, 6 Jun 2017 08:43:14 +0000 (+0200) Subject: Fix #2069: Fix race condition between accept() handler and TCPServer.stop() X-Git-Tag: v5.0.0~67^2~2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=21680e20c539aa837f9a42b1e3d82026c0e1beff;p=thirdparty%2Ftornado.git Fix #2069: Fix race condition between accept() handler and TCPServer.stop() --- diff --git a/tornado/netutil.py b/tornado/netutil.py index 31bbe8036..04a3a4bf4 100644 --- a/tornado/netutil.py +++ b/tornado/netutil.py @@ -241,9 +241,18 @@ def add_accept_handler(sock, callback): is different from the ``callback(fd, events)`` signature used for `.IOLoop` handlers. + A callable is returned which, when called, will remove the `.IOLoop` + event handler and stop processing further incoming connections. + .. versionchanged:: 5.0 The ``io_loop`` argument (deprecated since version 4.1) has been removed. + + .. versionchanged:: 5.0 + A callable is returned (``None`` was returned before). """ + io_loop = IOLoop.current() + removed = [] + def accept_handler(fd, events): # More connections may come in while we're handling callbacks; # to prevent starvation of other tasks we must limit the number @@ -257,6 +266,9 @@ def add_accept_handler(sock, callback): # heuristic for the number of connections we can reasonably # accept at once. for i in xrange(_DEFAULT_BACKLOG): + if removed: + # The socket was probably closed + return try: connection, address = sock.accept() except socket.error as e: @@ -272,7 +284,13 @@ def add_accept_handler(sock, callback): raise set_close_exec(connection.fileno()) callback(connection, address) - IOLoop.current().add_handler(sock, accept_handler, IOLoop.READ) + + def remove_handler(): + io_loop.remove_handler(sock) + removed.append(True) + + io_loop.add_handler(sock, accept_handler, IOLoop.READ) + return remove_handler def is_valid_ip(ip): diff --git a/tornado/tcpserver.py b/tornado/tcpserver.py index b3cdaa2ca..fea215f53 100644 --- a/tornado/tcpserver.py +++ b/tornado/tcpserver.py @@ -110,7 +110,8 @@ class TCPServer(object): read_chunk_size=None): self.io_loop = IOLoop.current() self.ssl_options = ssl_options - self._sockets = {} # fd -> socket object + self._sockets = {} # fd -> socket object + self._handlers = {} # fd -> remove_handler callable self._pending_sockets = [] self._started = False self._stopped = False @@ -156,7 +157,8 @@ class TCPServer(object): """ for sock in sockets: self._sockets[sock.fileno()] = sock - add_accept_handler(sock, self._handle_connection) + self._handlers[sock.fileno()] = add_accept_handler( + sock, self._handle_connection) def add_socket(self, socket): """Singular version of `add_sockets`. Takes a single socket object.""" @@ -233,7 +235,8 @@ class TCPServer(object): self._stopped = True for fd, sock in self._sockets.items(): assert sock.fileno() == fd - self.io_loop.remove_handler(fd) + # Unregister socket from IOLoop + self._handlers.pop(fd)() sock.close() def handle_stream(self, stream, address): diff --git a/tornado/test/tcpserver_test.py b/tornado/test/tcpserver_test.py index 9afb54202..46841b32e 100644 --- a/tornado/test/tcpserver_test.py +++ b/tornado/test/tcpserver_test.py @@ -1,9 +1,10 @@ from __future__ import absolute_import, division, print_function +#from functools import partial import socket from tornado import gen -from tornado.iostream import IOStream +from tornado.iostream import IOStream, StreamClosedError from tornado.log import app_log from tornado.stack_context import NullContext from tornado.tcpserver import TCPServer @@ -68,3 +69,45 @@ class TCPServerTest(AsyncTestCase): server.add_socket(sock) server.stop() server.stop() + + @gen_test + def test_stop_in_callback(self): + # Issue #2069: calling server.stop() in a loop callback should not + # raise EBADF when the loop handles other server connection + # requests in the same loop iteration + + class TestServer(TCPServer): + @gen.coroutine + def handle_stream(self, stream, address): + server.stop() + yield stream.read_until_close() + + sock, port = bind_unused_port() + server = TestServer() + server.add_socket(sock) + server_addr = ('localhost', port) + N = 40 + clients = [IOStream(socket.socket()) for i in range(N)] + connected_clients = [] + + @gen.coroutine + def connect(c): + try: + yield c.connect(server_addr) + except EnvironmentError: + pass + else: + connected_clients.append(c) + + yield [connect(c) for c in clients] + + self.assertGreater(len(connected_clients), 0, + "all clients failed connecting") + self.assertLess(len(connected_clients), N, + "at least one client should fail connecting for " + "the test to be meaningful") + + for c in connected_clients: + c.close() + # AsyncTestCase.tearDown() would re-raise the EBADF encountered in the IO loop +