From: Ben Darnell Date: Sat, 18 Jan 2014 18:09:18 +0000 (-0500) Subject: Allow and encourage the use of file objects instead of integer fds in IOLoop. X-Git-Tag: v4.0.0b1~147 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7552caed9966b01d01786229ccc2cc9049e8068d;p=thirdparty%2Ftornado.git Allow and encourage the use of file objects instead of integer fds in IOLoop. This fixes a problem in tests in which a closing IOLoop would os.close() all of its file descriptors while socket objects for those fds still existed. When those socket objects were garbage collected, they would close the fd a second time (by which time it may have been reassigned to a new socket). Due to subtleties of garbage collection this has only been observed with the asyncio event loop in tests of curl_httpclient. --- diff --git a/tornado/ioloop.py b/tornado/ioloop.py index e7b84dd7c..184866146 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -490,6 +490,43 @@ class IOLoop(Configurable): """ app_log.error("Exception in callback %r", callback, exc_info=True) + def split_fd(self, fd): + """Returns an (fd, obj) pair from an ``fd`` parameter. + + We accept both raw file descriptors and file-like objects as + input to `add_handler` and related methods. When a file-like + object is passed, we must retain the object itself so we can + close it correctly when the `IOLoop` shuts down, but the + poller interfaces favor file descriptors (they will accept + file-like objects and call ``fileno()`` for you, but they + always return the descriptor itself). + + This method is provided for use by `IOLoop` subclasses and should + not generally be used by application code. + """ + try: + return fd.fileno(), fd + except AttributeError: + return fd, fd + + def close_fd(self, fd): + """Utility method to close an ``fd``. + + If ``fd`` is a file-like object, we close it directly; otherwise + we use `os.close()`. + + This method is provided for use by `IOLoop` subclasses (in + implementations of ``IOLoop.close(all_fds=True)`` and should + not generally be used by application code. + """ + try: + try: + fd.close() + except AttributeError: + os.close(fd) + except OSError: + pass + class PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. @@ -528,26 +565,22 @@ class PollIOLoop(IOLoop): self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: - for fd in self._handlers.keys(): - try: - close_method = getattr(fd, 'close', None) - if close_method is not None: - close_method() - else: - os.close(fd) - except Exception: - gen_log.debug("error closing fd %s", fd, exc_info=True) + for fd, handler in self._handlers.values(): + self.close_fd(fd) self._waker.close() self._impl.close() def add_handler(self, fd, handler, events): - self._handlers[fd] = stack_context.wrap(handler) + fd, obj = self.split_fd(fd) + self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): + fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): + fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: @@ -685,7 +718,7 @@ class PollIOLoop(IOLoop): while self._events: fd, events = self._events.popitem() try: - self._handlers[fd](fd, events) + self._handlers[fd][1](fd, events) except (OSError, IOError) as e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection diff --git a/tornado/iostream.py b/tornado/iostream.py index 5d4d08ac4..197230f33 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -309,7 +309,7 @@ class BaseIOStream(object): def _handle_events(self, fd, events): if self.closed(): - gen_log.warning("Got events for closed stream %d", fd) + gen_log.warning("Got events for closed stream %s", fd) return try: if events & self.io_loop.READ: @@ -572,7 +572,7 @@ class BaseIOStream(object): # Broken pipe errors are usually caused by connection # reset, and its better to not log EPIPE errors to # minimize log spam - gen_log.warning("Write error on %d: %s", + gen_log.warning("Write error on %s: %s", self.fileno(), e) self.close(exc_info=True) return @@ -680,7 +680,7 @@ class IOStream(BaseIOStream): super(IOStream, self).__init__(*args, **kwargs) def fileno(self): - return self.socket.fileno() + return self.socket def close_fd(self): self.socket.close() @@ -740,7 +740,7 @@ class IOStream(BaseIOStream): # reported later in _handle_connect. if (e.args[0] != errno.EINPROGRESS and e.args[0] not in _ERRNO_WOULDBLOCK): - gen_log.warning("Connect error on fd %d: %s", + gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), e) self.close(exc_info=True) return @@ -755,7 +755,7 @@ class IOStream(BaseIOStream): # an error state before the socket becomes writable, so # in that case a connection failure would be handled by the # error path in _handle_events instead of here. - gen_log.warning("Connect error on fd %d: %s", + gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return @@ -841,7 +841,7 @@ class SSLIOStream(IOStream): peer = self.socket.getpeername() except Exception: peer = '(not connected)' - gen_log.warning("SSL Error on %d %s: %s", + gen_log.warning("SSL Error on %s %s: %s", self.socket.fileno(), peer, err) return self.close(exc_info=True) raise @@ -916,9 +916,17 @@ class SSLIOStream(IOStream): # user callbacks are enqueued asynchronously on the IOLoop, # but since _handle_events calls _handle_connect immediately # followed by _handle_write we need this to be synchronous. + # + # The IOLoop will get confused if we swap out self.socket while the + # fd is registered, so remove it now and re-register after + # wrap_socket(). + self.io_loop.remove_handler(self.socket) + old_state = self._state + self._state = None self.socket = ssl_wrap_socket(self.socket, self._ssl_options, server_hostname=self._server_hostname, do_handshake_on_connect=False) + self._add_io_state(old_state) super(SSLIOStream, self)._handle_connect() def read_from_fd(self): diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py index 162b36735..b6755e483 100644 --- a/tornado/platform/asyncio.py +++ b/tornado/platform/asyncio.py @@ -36,16 +36,13 @@ class BaseAsyncIOLoop(IOLoop): for fd in list(self.handlers): self.remove_handler(fd) if all_fds: - try: - os.close(fd) - except OSError: - pass + self.close_fd(fd) if self.close_loop: self.asyncio_loop.close() def add_handler(self, fd, handler, events): if fd in self.handlers: - raise ValueError("fd %d added twice" % fd) + raise ValueError("fd %s added twice" % fd) self.handlers[fd] = stack_context.wrap(handler) if events & IOLoop.READ: self.asyncio_loop.add_reader( diff --git a/tornado/platform/kqueue.py b/tornado/platform/kqueue.py index ceff0a43a..de8c046d3 100644 --- a/tornado/platform/kqueue.py +++ b/tornado/platform/kqueue.py @@ -37,7 +37,7 @@ class _KQueue(object): def register(self, fd, events): if fd in self._active: - raise IOError("fd %d already registered" % fd) + raise IOError("fd %s already registered" % fd) self._control(fd, events, select.KQ_EV_ADD) self._active[fd] = events diff --git a/tornado/platform/select.py b/tornado/platform/select.py index 8bbb1f4f9..9a8795626 100644 --- a/tornado/platform/select.py +++ b/tornado/platform/select.py @@ -37,7 +37,7 @@ class _Select(object): def register(self, fd, events): if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds: - raise IOError("fd %d already registered" % fd) + raise IOError("fd %s already registered" % fd) if events & IOLoop.READ: self.read_fds.add(fd) if events & IOLoop.WRITE: diff --git a/tornado/platform/twisted.py b/tornado/platform/twisted.py index 0c8a3105c..737032d57 100644 --- a/tornado/platform/twisted.py +++ b/tornado/platform/twisted.py @@ -365,8 +365,9 @@ def install(io_loop=None): @implementer(IReadDescriptor, IWriteDescriptor) class _FD(object): - def __init__(self, fd, handler): + def __init__(self, fd, fileobj, handler): self.fd = fd + self.fileobj = fileobj self.handler = handler self.reading = False self.writing = False @@ -412,14 +413,19 @@ class TwistedIOLoop(tornado.ioloop.IOLoop): self.reactor.callWhenRunning(self.make_current) def close(self, all_fds=False): + fds = self.fds self.reactor.removeAll() for c in self.reactor.getDelayedCalls(): c.cancel() + if all_fds: + for fd in fds.values(): + self.close_fd(fd.fileobj) def add_handler(self, fd, handler, events): if fd in self.fds: - raise ValueError('fd %d added twice' % fd) - self.fds[fd] = _FD(fd, wrap(handler)) + raise ValueError('fd %s added twice' % fd) + fd, fileobj = self.split_fd(fd) + self.fds[fd] = _FD(fd, fileobj, wrap(handler)) if events & tornado.ioloop.IOLoop.READ: self.fds[fd].reading = True self.reactor.addReader(self.fds[fd]) @@ -428,6 +434,7 @@ class TwistedIOLoop(tornado.ioloop.IOLoop): self.reactor.addWriter(self.fds[fd]) def update_handler(self, fd, events): + fd, fileobj = self.split_fd(fd) if events & tornado.ioloop.IOLoop.READ: if not self.fds[fd].reading: self.fds[fd].reading = True @@ -446,6 +453,7 @@ class TwistedIOLoop(tornado.ioloop.IOLoop): self.reactor.removeWriter(self.fds[fd]) def remove_handler(self, fd): + fd, fileobj = self.split_fd(fd) if fd not in self.fds: return self.fds[fd].lost = True diff --git a/tornado/test/ioloop_test.py b/tornado/test/ioloop_test.py index fa863e611..382adb41c 100644 --- a/tornado/test/ioloop_test.py +++ b/tornado/test/ioloop_test.py @@ -5,14 +5,14 @@ from __future__ import absolute_import, division, print_function, with_statement import contextlib import datetime import functools -import logging +import os import socket import sys import threading import time from tornado import gen -from tornado.ioloop import IOLoop, PollIOLoop, TimeoutError +from tornado.ioloop import IOLoop, TimeoutError from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext from tornado.testing import AsyncTestCase, bind_unused_port from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis @@ -172,6 +172,33 @@ class TestIOLoop(AsyncTestCase): self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop)) self.wait() + def test_close_file_object(self): + """When a file object is used instead of a numeric file descriptor, + the object should be closed (by IOLoop.close(all_fds=True), + not just the fd. + """ + # Use a socket since they are supported by IOLoop on all platforms. + # Unfortunately, sockets don't support the .closed attribute for + # inspecting their close status, so we must use a wrapper. + class SocketWrapper(object): + def __init__(self, sockobj): + self.sockobj = sockobj + self.closed = False + + def fileno(self): + return self.sockobj.fileno() + + def close(self): + self.closed = True + self.sockobj.close() + sockobj, port = bind_unused_port() + socket_wrapper = SocketWrapper(sockobj) + io_loop = IOLoop() + io_loop.add_handler(socket_wrapper, lambda fd, events: None, + IOLoop.READ) + io_loop.close(all_fds=True) + self.assertTrue(socket_wrapper.closed) + # Deliberately not a subclass of AsyncTestCase so the IOLoop isn't # automatically set as current. diff --git a/tox.ini b/tox.ini index f1f6d36a6..1fdedac26 100644 --- a/tox.ini +++ b/tox.ini @@ -160,8 +160,8 @@ commands = python -m tornado.test.runtests --ioloop=tornado.test.twisted_test.La basepython = python3.3 # Pycurl tests currently fail with asyncio. deps = + {[testenv:py33-full]deps} asyncio - # pycurl commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop {posargs:} # Trollius is the py2.7 backport of asyncio. @@ -169,11 +169,8 @@ commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.Asy basepython = python2.7 # Pycurl tests currently fail with trollius. deps = - futures - mock - # pycurl + {[testenv:py27-full]deps} trollius>=0.1.1 - twisted commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop {posargs:} [testenv:py2-monotonic]