"""
app_log.error("Exception in callback %r", callback, exc_info=True)
+ def split_fd(self, fd):
+ """Returns an (fd, obj) pair from an ``fd`` parameter.
+
+ We accept both raw file descriptors and file-like objects as
+ input to `add_handler` and related methods. When a file-like
+ object is passed, we must retain the object itself so we can
+ close it correctly when the `IOLoop` shuts down, but the
+ poller interfaces favor file descriptors (they will accept
+ file-like objects and call ``fileno()`` for you, but they
+ always return the descriptor itself).
+
+ This method is provided for use by `IOLoop` subclasses and should
+ not generally be used by application code.
+ """
+ try:
+ return fd.fileno(), fd
+ except AttributeError:
+ return fd, fd
+
+ def close_fd(self, fd):
+ """Utility method to close an ``fd``.
+
+ If ``fd`` is a file-like object, we close it directly; otherwise
+ we use `os.close()`.
+
+ This method is provided for use by `IOLoop` subclasses (in
+ implementations of ``IOLoop.close(all_fds=True)`` and should
+ not generally be used by application code.
+ """
+ try:
+ try:
+ fd.close()
+ except AttributeError:
+ os.close(fd)
+ except OSError:
+ pass
+
class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
- for fd in self._handlers.keys():
- try:
- close_method = getattr(fd, 'close', None)
- if close_method is not None:
- close_method()
- else:
- os.close(fd)
- except Exception:
- gen_log.debug("error closing fd %s", fd, exc_info=True)
+ for fd, handler in self._handlers.values():
+ self.close_fd(fd)
self._waker.close()
self._impl.close()
def add_handler(self, fd, handler, events):
- self._handlers[fd] = stack_context.wrap(handler)
+ fd, obj = self.split_fd(fd)
+ self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
+ fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
+ fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
while self._events:
fd, events = self._events.popitem()
try:
- self._handlers[fd](fd, events)
+ self._handlers[fd][1](fd, events)
except (OSError, IOError) as e:
if e.args[0] == errno.EPIPE:
# Happens when the client closes the connection
def _handle_events(self, fd, events):
if self.closed():
- gen_log.warning("Got events for closed stream %d", fd)
+ gen_log.warning("Got events for closed stream %s", fd)
return
try:
if events & self.io_loop.READ:
# Broken pipe errors are usually caused by connection
# reset, and its better to not log EPIPE errors to
# minimize log spam
- gen_log.warning("Write error on %d: %s",
+ gen_log.warning("Write error on %s: %s",
self.fileno(), e)
self.close(exc_info=True)
return
super(IOStream, self).__init__(*args, **kwargs)
def fileno(self):
- return self.socket.fileno()
+ return self.socket
def close_fd(self):
self.socket.close()
# reported later in _handle_connect.
if (e.args[0] != errno.EINPROGRESS and
e.args[0] not in _ERRNO_WOULDBLOCK):
- gen_log.warning("Connect error on fd %d: %s",
+ gen_log.warning("Connect error on fd %s: %s",
self.socket.fileno(), e)
self.close(exc_info=True)
return
# an error state before the socket becomes writable, so
# in that case a connection failure would be handled by the
# error path in _handle_events instead of here.
- gen_log.warning("Connect error on fd %d: %s",
+ gen_log.warning("Connect error on fd %s: %s",
self.socket.fileno(), errno.errorcode[err])
self.close()
return
peer = self.socket.getpeername()
except Exception:
peer = '(not connected)'
- gen_log.warning("SSL Error on %d %s: %s",
+ gen_log.warning("SSL Error on %s %s: %s",
self.socket.fileno(), peer, err)
return self.close(exc_info=True)
raise
# user callbacks are enqueued asynchronously on the IOLoop,
# but since _handle_events calls _handle_connect immediately
# followed by _handle_write we need this to be synchronous.
+ #
+ # The IOLoop will get confused if we swap out self.socket while the
+ # fd is registered, so remove it now and re-register after
+ # wrap_socket().
+ self.io_loop.remove_handler(self.socket)
+ old_state = self._state
+ self._state = None
self.socket = ssl_wrap_socket(self.socket, self._ssl_options,
server_hostname=self._server_hostname,
do_handshake_on_connect=False)
+ self._add_io_state(old_state)
super(SSLIOStream, self)._handle_connect()
def read_from_fd(self):
for fd in list(self.handlers):
self.remove_handler(fd)
if all_fds:
- try:
- os.close(fd)
- except OSError:
- pass
+ self.close_fd(fd)
if self.close_loop:
self.asyncio_loop.close()
def add_handler(self, fd, handler, events):
if fd in self.handlers:
- raise ValueError("fd %d added twice" % fd)
+ raise ValueError("fd %s added twice" % fd)
self.handlers[fd] = stack_context.wrap(handler)
if events & IOLoop.READ:
self.asyncio_loop.add_reader(
def register(self, fd, events):
if fd in self._active:
- raise IOError("fd %d already registered" % fd)
+ raise IOError("fd %s already registered" % fd)
self._control(fd, events, select.KQ_EV_ADD)
self._active[fd] = events
def register(self, fd, events):
if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds:
- raise IOError("fd %d already registered" % fd)
+ raise IOError("fd %s already registered" % fd)
if events & IOLoop.READ:
self.read_fds.add(fd)
if events & IOLoop.WRITE:
@implementer(IReadDescriptor, IWriteDescriptor)
class _FD(object):
- def __init__(self, fd, handler):
+ def __init__(self, fd, fileobj, handler):
self.fd = fd
+ self.fileobj = fileobj
self.handler = handler
self.reading = False
self.writing = False
self.reactor.callWhenRunning(self.make_current)
def close(self, all_fds=False):
+ fds = self.fds
self.reactor.removeAll()
for c in self.reactor.getDelayedCalls():
c.cancel()
+ if all_fds:
+ for fd in fds.values():
+ self.close_fd(fd.fileobj)
def add_handler(self, fd, handler, events):
if fd in self.fds:
- raise ValueError('fd %d added twice' % fd)
- self.fds[fd] = _FD(fd, wrap(handler))
+ raise ValueError('fd %s added twice' % fd)
+ fd, fileobj = self.split_fd(fd)
+ self.fds[fd] = _FD(fd, fileobj, wrap(handler))
if events & tornado.ioloop.IOLoop.READ:
self.fds[fd].reading = True
self.reactor.addReader(self.fds[fd])
self.reactor.addWriter(self.fds[fd])
def update_handler(self, fd, events):
+ fd, fileobj = self.split_fd(fd)
if events & tornado.ioloop.IOLoop.READ:
if not self.fds[fd].reading:
self.fds[fd].reading = True
self.reactor.removeWriter(self.fds[fd])
def remove_handler(self, fd):
+ fd, fileobj = self.split_fd(fd)
if fd not in self.fds:
return
self.fds[fd].lost = True
import contextlib
import datetime
import functools
-import logging
+import os
import socket
import sys
import threading
import time
from tornado import gen
-from tornado.ioloop import IOLoop, PollIOLoop, TimeoutError
+from tornado.ioloop import IOLoop, TimeoutError
from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
from tornado.testing import AsyncTestCase, bind_unused_port
from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis
self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop))
self.wait()
+ def test_close_file_object(self):
+ """When a file object is used instead of a numeric file descriptor,
+ the object should be closed (by IOLoop.close(all_fds=True),
+ not just the fd.
+ """
+ # Use a socket since they are supported by IOLoop on all platforms.
+ # Unfortunately, sockets don't support the .closed attribute for
+ # inspecting their close status, so we must use a wrapper.
+ class SocketWrapper(object):
+ def __init__(self, sockobj):
+ self.sockobj = sockobj
+ self.closed = False
+
+ def fileno(self):
+ return self.sockobj.fileno()
+
+ def close(self):
+ self.closed = True
+ self.sockobj.close()
+ sockobj, port = bind_unused_port()
+ socket_wrapper = SocketWrapper(sockobj)
+ io_loop = IOLoop()
+ io_loop.add_handler(socket_wrapper, lambda fd, events: None,
+ IOLoop.READ)
+ io_loop.close(all_fds=True)
+ self.assertTrue(socket_wrapper.closed)
+
# Deliberately not a subclass of AsyncTestCase so the IOLoop isn't
# automatically set as current.
basepython = python3.3
# Pycurl tests currently fail with asyncio.
deps =
+ {[testenv:py33-full]deps}
asyncio
- # pycurl
commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop {posargs:}
# Trollius is the py2.7 backport of asyncio.
basepython = python2.7
# Pycurl tests currently fail with trollius.
deps =
- futures
- mock
- # pycurl
+ {[testenv:py27-full]deps}
trollius>=0.1.1
- twisted
commands = python -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop {posargs:}
[testenv:py2-monotonic]