import traceback
from tornado import stack_context
-from tornado.escape import utf8
try:
import signal
except ImportError:
signal = None
-from tornado.platform.auto import set_close_exec
-
-try:
- import fcntl
-except ImportError:
- if os.name == 'nt':
- from tornado.platform import windows
- else:
- raise
-
+from tornado.platform.auto import set_close_exec, Waker
class IOLoop(object):
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
- if os.name != 'nt':
- r, w = os.pipe()
- self._set_nonblocking(r)
- self._set_nonblocking(w)
- set_close_exec(r)
- set_close_exec(w)
- self._waker_reader = os.fdopen(r, "rb", 0)
- self._waker_writer = os.fdopen(w, "wb", 0)
- else:
- self._waker_reader = self._waker_writer = windows.Pipe()
- r = self._waker_writer.reader_fd
- self.add_handler(r, self._read_waker, self.READ)
+ self._waker = Waker()
+ self.add_handler(self._waker.fileno(),
+ lambda fd, events: self._waker.consume(),
+ self.READ)
@classmethod
def instance(cls):
If ``all_fds`` is true, all file descriptors registered on the
IOLoop will be closed (not just the ones created by the IOLoop itself.
"""
+ self.remove_handler(self._waker.fileno())
if all_fds:
for fd in self._handlers.keys()[:]:
- if fd in (self._waker_reader.fileno(),
- self._waker_writer.fileno()):
- # Close these through the file objects that wrap them,
- # or else the destructor will try to close them later
- # and log a warning
- continue
try:
os.close(fd)
except Exception:
logging.debug("error closing fd %d", fd, exc_info=True)
- self._waker_reader.close()
- self._waker_writer.close()
+ self._waker.close()
self._impl.close()
def add_handler(self, fd, handler, events):
"""
self._running = False
self._stopped = True
- self._wake()
+ self._waker.wake()
def running(self):
"""Returns true if this IOLoop is currently running."""
control from other threads to the IOLoop's thread.
"""
if not self._callbacks and thread.get_ident() != self._thread_ident:
- self._wake()
+ self._waker.wake()
self._callbacks.append(stack_context.wrap(callback))
- def _wake(self):
- try:
- self._waker_writer.write(utf8("x"))
- except IOError:
- pass
-
def _run_callback(self, callback):
try:
callback()
"""
logging.error("Exception in callback %r", callback, exc_info=True)
- def _read_waker(self, fd, events):
- try:
- while True:
- result = self._waker_reader.read()
- if not result: break
- except IOError:
- pass
-
- def _set_nonblocking(self, fd):
- flags = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
class _Timeout(object):
"""An IOLoop timeout, a UNIX timestamp and a callback"""
import os
if os.name == 'nt':
- from .windows import set_close_exec
+ from tornado.platform.windows import set_close_exec, Waker
else:
- from .posix import set_close_exec
+ from tornado.platform.posix import set_close_exec, Waker
def set_close_exec(fd):
"""Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor."""
raise NotImplementedError()
+
+class Waker(object):
+ """A socket-like object that can wake another thread from ``select()``.
+
+ The `~tornado.ioloop.IOLoop` will add the Waker's `fileno()` to
+ its ``select`` (or ``epoll`` or ``kqueue``) calls. When another
+ thread wants to wake up the loop, it calls `wake`. Once it has woken
+ up, it will call `consume` to do any necessary per-wake cleanup. When
+ the ``IOLoop`` is closed, it closes its waker too.
+ """
+ def fileno(self):
+ """Returns a file descriptor for this waker.
+
+ Must be suitable for use with ``select()`` or equivalent on the
+ local platform.
+ """
+ raise NotImplementedError()
+
+ def wake(self):
+ """Triggers activity on the waker's file descriptor."""
+ raise NotImplementedError()
+
+ def consume(self):
+ """Called after the listen has woken up to do any necessary cleanup."""
+ raise NotImplementedError()
+
+ def close(self):
+ """Closes the waker's file descriptor(s)."""
+ raise NotImplementedError()
+
+
"""Posix implementations of platform-specific functionality."""
import fcntl
+import os
+
+from tornado.platform import interface
+from tornado.util import b
def set_close_exec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+def _set_nonblocking(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+class Waker(interface.Waker):
+ def __init__(self):
+ r, w = os.pipe()
+ _set_nonblocking(r)
+ _set_nonblocking(w)
+ set_close_exec(r)
+ set_close_exec(w)
+ self.reader = os.fdopen(r, "rb", 0)
+ self.writer = os.fdopen(w, "wb", 0)
+
+ def fileno(self):
+ return self.reader.fileno()
+
+ def wake(self):
+ try:
+ self.writer.write(b("x"))
+ except IOError:
+ pass
+
+ def consume(self):
+ try:
+ while True:
+ result = self.reader.read()
+ if not result: break;
+ except IOError:
+ pass
+
+ def close(self):
+ self.reader.close()
+ self.writer.close()
import socket
import errno
+from tornado.platform import interface
+from tornado.util import b
+
# See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx
SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation
SetHandleInformation.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD)
raise ctypes.GetLastError()
-class Pipe(object):
+class Waker(interface.Waker):
"""Create an OS independent asynchronous pipe"""
def __init__(self):
# Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py
a.close()
self.reader_fd = self.reader.fileno()
- def read(self):
- """Emulate a file descriptors read method"""
+ def fileno(self):
+ return self.reader.fileno()
+
+ def wake(self):
+ try:
+ self.writer.send(b("x"))
+ except IOError:
+ pass
+
+ def consume(self):
try:
- return self.reader.recv(1)
- except socket.error, ex:
- if ex.args[0] == errno.EWOULDBLOCK:
- raise IOError
- raise
+ while True:
+ result = self.reader.recv(1024)
+ if not result: break
+ except IOError:
+ pass
- def write(self, data):
- """Emulate a file descriptors write method"""
- return self.writer.send(data)
+ def close(self):
+ self.reader.close()
+ self.writer.close()