]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Allow and encourage the use of file objects instead of integer fds in IOLoop.
authorBen Darnell <ben@bendarnell.com>
Sat, 18 Jan 2014 18:09:18 +0000 (13:09 -0500)
committerBen Darnell <ben@bendarnell.com>
Sat, 18 Jan 2014 18:09:18 +0000 (13:09 -0500)
This fixes a problem in tests in which a closing IOLoop would os.close()
all of its file descriptors while socket objects for those fds still
existed.  When those socket objects were garbage collected, they would
close the fd a second time (by which time it may have been reassigned
to a new socket).

Due to subtleties of garbage collection this has only been observed
with the asyncio event loop in tests of curl_httpclient.

tornado/ioloop.py
tornado/iostream.py
tornado/platform/asyncio.py
tornado/platform/kqueue.py
tornado/platform/select.py
tornado/platform/twisted.py
tornado/test/ioloop_test.py
tox.ini

index e7b84dd7c702b29b7d2f16847ea7264c9fd2d008..184866146a2a289f149fb3c2d507b3e99f245661 100644 (file)
@@ -490,6 +490,43 @@ class IOLoop(Configurable):
         """
         app_log.error("Exception in callback %r", callback, exc_info=True)
 
+    def split_fd(self, fd):
+        """Returns an (fd, obj) pair from an ``fd`` parameter.
+
+        We accept both raw file descriptors and file-like objects as
+        input to `add_handler` and related methods.  When a file-like
+        object is passed, we must retain the object itself so we can
+        close it correctly when the `IOLoop` shuts down, but the
+        poller interfaces favor file descriptors (they will accept
+        file-like objects and call ``fileno()`` for you, but they
+        always return the descriptor itself).
+
+        This method is provided for use by `IOLoop` subclasses and should
+        not generally be used by application code.
+        """
+        try:
+            return fd.fileno(), fd
+        except AttributeError:
+            return fd, fd
+
+    def close_fd(self, fd):
+        """Utility method to close an ``fd``.
+
+        If ``fd`` is a file-like object, we close it directly; otherwise
+        we use `os.close()`.
+
+        This method is provided for use by `IOLoop` subclasses (in
+        implementations of ``IOLoop.close(all_fds=True)`` and should
+        not generally be used by application code.
+        """
+        try:
+            try:
+                fd.close()
+            except AttributeError:
+                os.close(fd)
+        except OSError:
+            pass
+
 
 class PollIOLoop(IOLoop):
     """Base class for IOLoops built around a select-like function.
@@ -528,26 +565,22 @@ class PollIOLoop(IOLoop):
             self._closing = True
         self.remove_handler(self._waker.fileno())
         if all_fds:
-            for fd in self._handlers.keys():
-                try:
-                    close_method = getattr(fd, 'close', None)
-                    if close_method is not None:
-                        close_method()
-                    else:
-                        os.close(fd)
-                except Exception:
-                    gen_log.debug("error closing fd %s", fd, exc_info=True)
+            for fd, handler in self._handlers.values():
+                self.close_fd(fd)
         self._waker.close()
         self._impl.close()
 
     def add_handler(self, fd, handler, events):
-        self._handlers[fd] = stack_context.wrap(handler)
+        fd, obj = self.split_fd(fd)
+        self._handlers[fd] = (obj, stack_context.wrap(handler))
         self._impl.register(fd, events | self.ERROR)
 
     def update_handler(self, fd, events):
+        fd, obj = self.split_fd(fd)
         self._impl.modify(fd, events | self.ERROR)
 
     def remove_handler(self, fd):
+        fd, obj = self.split_fd(fd)
         self._handlers.pop(fd, None)
         self._events.pop(fd, None)
         try:
@@ -685,7 +718,7 @@ class PollIOLoop(IOLoop):
                 while self._events:
                     fd, events = self._events.popitem()
                     try:
-                        self._handlers[fd](fd, events)
+                        self._handlers[fd][1](fd, events)
                     except (OSError, IOError) as e:
                         if e.args[0] == errno.EPIPE:
                             # Happens when the client closes the connection
index 5d4d08ac4eb0ead8894182be0faf0757e4f66e19..197230f33f714c97debec208386aa18a378109d9 100644 (file)
@@ -309,7 +309,7 @@ class BaseIOStream(object):
 
     def _handle_events(self, fd, events):
         if self.closed():
-            gen_log.warning("Got events for closed stream %d", fd)
+            gen_log.warning("Got events for closed stream %s", fd)
             return
         try:
             if events & self.io_loop.READ:
@@ -572,7 +572,7 @@ class BaseIOStream(object):
                         # Broken pipe errors are usually caused by connection
                         # reset, and its better to not log EPIPE errors to
                         # minimize log spam
-                        gen_log.warning("Write error on %d: %s",
+                        gen_log.warning("Write error on %s: %s",
                                         self.fileno(), e)
                     self.close(exc_info=True)
                     return
@@ -680,7 +680,7 @@ class IOStream(BaseIOStream):
         super(IOStream, self).__init__(*args, **kwargs)
 
     def fileno(self):
-        return self.socket.fileno()
+        return self.socket
 
     def close_fd(self):
         self.socket.close()
@@ -740,7 +740,7 @@ class IOStream(BaseIOStream):
             # reported later in _handle_connect.
             if (e.args[0] != errno.EINPROGRESS and
                     e.args[0] not in _ERRNO_WOULDBLOCK):
-                gen_log.warning("Connect error on fd %d: %s",
+                gen_log.warning("Connect error on fd %s: %s",
                                 self.socket.fileno(), e)
                 self.close(exc_info=True)
                 return
@@ -755,7 +755,7 @@ class IOStream(BaseIOStream):
             # an error state before the socket becomes writable, so
             # in that case a connection failure would be handled by the
             # error path in _handle_events instead of here.
-            gen_log.warning("Connect error on fd %d: %s",
+            gen_log.warning("Connect error on fd %s: %s",
                             self.socket.fileno(), errno.errorcode[err])
             self.close()
             return
@@ -841,7 +841,7 @@ class SSLIOStream(IOStream):
                     peer = self.socket.getpeername()
                 except Exception:
                     peer = '(not connected)'
-                gen_log.warning("SSL Error on %d %s: %s",
+                gen_log.warning("SSL Error on %s %s: %s",
                                 self.socket.fileno(), peer, err)
                 return self.close(exc_info=True)
             raise
@@ -916,9 +916,17 @@ class SSLIOStream(IOStream):
         # user callbacks are enqueued asynchronously on the IOLoop,
         # but since _handle_events calls _handle_connect immediately
         # followed by _handle_write we need this to be synchronous.
+        #
+        # The IOLoop will get confused if we swap out self.socket while the
+        # fd is registered, so remove it now and re-register after
+        # wrap_socket().
+        self.io_loop.remove_handler(self.socket)
+        old_state = self._state
+        self._state = None
         self.socket = ssl_wrap_socket(self.socket, self._ssl_options,
                                       server_hostname=self._server_hostname,
                                       do_handshake_on_connect=False)
+        self._add_io_state(old_state)
         super(SSLIOStream, self)._handle_connect()
 
     def read_from_fd(self):
index 162b36735d564ac829c04bb552f400ce12dff190..b6755e483516d1092ac89fc452b00211e3bb400a 100644 (file)
@@ -36,16 +36,13 @@ class BaseAsyncIOLoop(IOLoop):
         for fd in list(self.handlers):
             self.remove_handler(fd)
             if all_fds:
-                try:
-                    os.close(fd)
-                except OSError:
-                    pass
+                self.close_fd(fd)
         if self.close_loop:
             self.asyncio_loop.close()
 
     def add_handler(self, fd, handler, events):
         if fd in self.handlers:
-            raise ValueError("fd %d added twice" % fd)
+            raise ValueError("fd %s added twice" % fd)
         self.handlers[fd] = stack_context.wrap(handler)
         if events & IOLoop.READ:
             self.asyncio_loop.add_reader(
index ceff0a43a31392dbb7255c24d3be9ccfcf6293ad..de8c046d3ed4be2109d8eae413d5a096687b02a6 100644 (file)
@@ -37,7 +37,7 @@ class _KQueue(object):
 
     def register(self, fd, events):
         if fd in self._active:
-            raise IOError("fd %d already registered" % fd)
+            raise IOError("fd %s already registered" % fd)
         self._control(fd, events, select.KQ_EV_ADD)
         self._active[fd] = events
 
index 8bbb1f4f995cf1c7a7de4f9aaea56f5612a0ea62..9a879562651aff2f3fa60618173075627da582e7 100644 (file)
@@ -37,7 +37,7 @@ class _Select(object):
 
     def register(self, fd, events):
         if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds:
-            raise IOError("fd %d already registered" % fd)
+            raise IOError("fd %s already registered" % fd)
         if events & IOLoop.READ:
             self.read_fds.add(fd)
         if events & IOLoop.WRITE:
index 0c8a3105c732ef2ea902d8df673258e2b2d7c78c..737032d573e7f822f21c6a84e0510a595f5197f6 100644 (file)
@@ -365,8 +365,9 @@ def install(io_loop=None):
 
 @implementer(IReadDescriptor, IWriteDescriptor)
 class _FD(object):
-    def __init__(self, fd, handler):
+    def __init__(self, fd, fileobj, handler):
         self.fd = fd
+        self.fileobj = fileobj
         self.handler = handler
         self.reading = False
         self.writing = False
@@ -412,14 +413,19 @@ class TwistedIOLoop(tornado.ioloop.IOLoop):
         self.reactor.callWhenRunning(self.make_current)
 
     def close(self, all_fds=False):
+        fds = self.fds
         self.reactor.removeAll()
         for c in self.reactor.getDelayedCalls():
             c.cancel()
+        if all_fds:
+            for fd in fds.values():
+                self.close_fd(fd.fileobj)
 
     def add_handler(self, fd, handler, events):
         if fd in self.fds:
-            raise ValueError('fd %d added twice' % fd)
-        self.fds[fd] = _FD(fd, wrap(handler))
+            raise ValueError('fd %s added twice' % fd)
+        fd, fileobj = self.split_fd(fd)
+        self.fds[fd] = _FD(fd, fileobj, wrap(handler))
         if events & tornado.ioloop.IOLoop.READ:
             self.fds[fd].reading = True
             self.reactor.addReader(self.fds[fd])
@@ -428,6 +434,7 @@ class TwistedIOLoop(tornado.ioloop.IOLoop):
             self.reactor.addWriter(self.fds[fd])
 
     def update_handler(self, fd, events):
+        fd, fileobj = self.split_fd(fd)
         if events & tornado.ioloop.IOLoop.READ:
             if not self.fds[fd].reading:
                 self.fds[fd].reading = True
@@ -446,6 +453,7 @@ class TwistedIOLoop(tornado.ioloop.IOLoop):
                 self.reactor.removeWriter(self.fds[fd])
 
     def remove_handler(self, fd):
+        fd, fileobj = self.split_fd(fd)
         if fd not in self.fds:
             return
         self.fds[fd].lost = True
index fa863e611cfa13f19d280ec98fa5de76761953cd..382adb41c49592a652b53f1f0aaca53f3c7622dc 100644 (file)
@@ -5,14 +5,14 @@ from __future__ import absolute_import, division, print_function, with_statement
 import contextlib
 import datetime
 import functools
-import logging
+import os
 import socket
 import sys
 import threading
 import time
 
 from tornado import gen
-from tornado.ioloop import IOLoop, PollIOLoop, TimeoutError
+from tornado.ioloop import IOLoop, TimeoutError
 from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
 from tornado.testing import AsyncTestCase, bind_unused_port
 from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis
@@ -172,6 +172,33 @@ class TestIOLoop(AsyncTestCase):
         self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop))
         self.wait()
 
+    def test_close_file_object(self):
+        """When a file object is used instead of a numeric file descriptor,
+        the object should be closed (by IOLoop.close(all_fds=True),
+        not just the fd.
+        """
+        # Use a socket since they are supported by IOLoop on all platforms.
+        # Unfortunately, sockets don't support the .closed attribute for
+        # inspecting their close status, so we must use a wrapper.
+        class SocketWrapper(object):
+            def __init__(self, sockobj):
+                self.sockobj = sockobj
+                self.closed = False
+
+            def fileno(self):
+                return self.sockobj.fileno()
+
+            def close(self):
+                self.closed = True
+                self.sockobj.close()
+        sockobj, port = bind_unused_port()
+        socket_wrapper = SocketWrapper(sockobj)
+        io_loop = IOLoop()
+        io_loop.add_handler(socket_wrapper, lambda fd, events: None,
+                            IOLoop.READ)
+        io_loop.close(all_fds=True)
+        self.assertTrue(socket_wrapper.closed)
+
 
 # Deliberately not a subclass of AsyncTestCase so the IOLoop isn't
 # automatically set as current.
diff --git a/tox.ini b/tox.ini
index f1f6d36a68d3d58985a64c477d14bcd89a2eb965..1fdedac26392034d37917b8632ddbad4e9d78c47 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -160,8 +160,8 @@ commands = python -m tornado.test.runtests --ioloop=tornado.test.twisted_test.La
 basepython = python3.3
 # Pycurl tests currently fail with asyncio.
 deps =
+     {[testenv:py33-full]deps}
      asyncio
-     # pycurl
 commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop {posargs:}
 
 # Trollius is the py2.7 backport of asyncio.
@@ -169,11 +169,8 @@ commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.Asy
 basepython = python2.7
 # Pycurl tests currently fail with trollius.
 deps =
-     futures
-     mock
-     # pycurl
+     {[testenv:py27-full]deps}
      trollius>=0.1.1
-     twisted
 commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop {posargs:}
 
 [testenv:py2-monotonic]