From: Ben Darnell Date: Tue, 5 Jul 2011 04:04:53 +0000 (-0700) Subject: Move the waker pipe into tornado.platform. X-Git-Tag: v2.1.0~113 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2963eed30a4ffaa02a96cdccd352cd9bbf6f3f18;p=thirdparty%2Ftornado.git Move the waker pipe into tornado.platform. --- diff --git a/tornado/ioloop.py b/tornado/ioloop.py index 4bf64fc20..261c75d0f 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -36,23 +36,13 @@ import time import traceback from tornado import stack_context -from tornado.escape import utf8 try: import signal except ImportError: signal = None -from tornado.platform.auto import set_close_exec - -try: - import fcntl -except ImportError: - if os.name == 'nt': - from tornado.platform import windows - else: - raise - +from tornado.platform.auto import set_close_exec, Waker class IOLoop(object): @@ -125,18 +115,10 @@ class IOLoop(object): # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle - if os.name != 'nt': - r, w = os.pipe() - self._set_nonblocking(r) - self._set_nonblocking(w) - set_close_exec(r) - set_close_exec(w) - self._waker_reader = os.fdopen(r, "rb", 0) - self._waker_writer = os.fdopen(w, "wb", 0) - else: - self._waker_reader = self._waker_writer = windows.Pipe() - r = self._waker_writer.reader_fd - self.add_handler(r, self._read_waker, self.READ) + self._waker = Waker() + self.add_handler(self._waker.fileno(), + lambda fd, events: self._waker.consume(), + self.READ) @classmethod def instance(cls): @@ -169,20 +151,14 @@ class IOLoop(object): If ``all_fds`` is true, all file descriptors registered on the IOLoop will be closed (not just the ones created by the IOLoop itself. """ + self.remove_handler(self._waker.fileno()) if all_fds: for fd in self._handlers.keys()[:]: - if fd in (self._waker_reader.fileno(), - self._waker_writer.fileno()): - # Close these through the file objects that wrap them, - # or else the destructor will try to close them later - # and log a warning - continue try: os.close(fd) except Exception: logging.debug("error closing fd %d", fd, exc_info=True) - self._waker_reader.close() - self._waker_writer.close() + self._waker.close() self._impl.close() def add_handler(self, fd, handler, events): @@ -347,7 +323,7 @@ class IOLoop(object): """ self._running = False self._stopped = True - self._wake() + self._waker.wake() def running(self): """Returns true if this IOLoop is currently running.""" @@ -384,15 +360,9 @@ class IOLoop(object): control from other threads to the IOLoop's thread. """ if not self._callbacks and thread.get_ident() != self._thread_ident: - self._wake() + self._waker.wake() self._callbacks.append(stack_context.wrap(callback)) - def _wake(self): - try: - self._waker_writer.write(utf8("x")) - except IOError: - pass - def _run_callback(self, callback): try: callback() @@ -411,18 +381,6 @@ class IOLoop(object): """ logging.error("Exception in callback %r", callback, exc_info=True) - def _read_waker(self, fd, events): - try: - while True: - result = self._waker_reader.read() - if not result: break - except IOError: - pass - - def _set_nonblocking(self, fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - class _Timeout(object): """An IOLoop timeout, a UNIX timestamp and a callback""" diff --git a/tornado/platform/auto.py b/tornado/platform/auto.py index 9bc411daa..e76d731b3 100644 --- a/tornado/platform/auto.py +++ b/tornado/platform/auto.py @@ -26,6 +26,6 @@ Most code that needs access to this functionality should do e.g.:: import os if os.name == 'nt': - from .windows import set_close_exec + from tornado.platform.windows import set_close_exec, Waker else: - from .posix import set_close_exec + from tornado.platform.posix import set_close_exec, Waker diff --git a/tornado/platform/interface.py b/tornado/platform/interface.py index fb5e61802..20f0f7161 100644 --- a/tornado/platform/interface.py +++ b/tornado/platform/interface.py @@ -24,3 +24,34 @@ implementation from `tornado.platform.auto`. def set_close_exec(fd): """Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor.""" raise NotImplementedError() + +class Waker(object): + """A socket-like object that can wake another thread from ``select()``. + + The `~tornado.ioloop.IOLoop` will add the Waker's `fileno()` to + its ``select`` (or ``epoll`` or ``kqueue``) calls. When another + thread wants to wake up the loop, it calls `wake`. Once it has woken + up, it will call `consume` to do any necessary per-wake cleanup. When + the ``IOLoop`` is closed, it closes its waker too. + """ + def fileno(self): + """Returns a file descriptor for this waker. + + Must be suitable for use with ``select()`` or equivalent on the + local platform. + """ + raise NotImplementedError() + + def wake(self): + """Triggers activity on the waker's file descriptor.""" + raise NotImplementedError() + + def consume(self): + """Called after the listen has woken up to do any necessary cleanup.""" + raise NotImplementedError() + + def close(self): + """Closes the waker's file descriptor(s).""" + raise NotImplementedError() + + diff --git a/tornado/platform/posix.py b/tornado/platform/posix.py index 673660de4..aa09b31c7 100644 --- a/tornado/platform/posix.py +++ b/tornado/platform/posix.py @@ -17,7 +17,46 @@ """Posix implementations of platform-specific functionality.""" import fcntl +import os + +from tornado.platform import interface +from tornado.util import b def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) + +def _set_nonblocking(fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + +class Waker(interface.Waker): + def __init__(self): + r, w = os.pipe() + _set_nonblocking(r) + _set_nonblocking(w) + set_close_exec(r) + set_close_exec(w) + self.reader = os.fdopen(r, "rb", 0) + self.writer = os.fdopen(w, "wb", 0) + + def fileno(self): + return self.reader.fileno() + + def wake(self): + try: + self.writer.write(b("x")) + except IOError: + pass + + def consume(self): + try: + while True: + result = self.reader.read() + if not result: break; + except IOError: + pass + + def close(self): + self.reader.close() + self.writer.close() diff --git a/tornado/platform/windows.py b/tornado/platform/windows.py index e138dc184..1735f1b38 100644 --- a/tornado/platform/windows.py +++ b/tornado/platform/windows.py @@ -6,6 +6,9 @@ import ctypes.wintypes import socket import errno +from tornado.platform import interface +from tornado.util import b + # See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation SetHandleInformation.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD) @@ -20,7 +23,7 @@ def set_close_exec(fd): raise ctypes.GetLastError() -class Pipe(object): +class Waker(interface.Waker): """Create an OS independent asynchronous pipe""" def __init__(self): # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py @@ -72,15 +75,23 @@ class Pipe(object): a.close() self.reader_fd = self.reader.fileno() - def read(self): - """Emulate a file descriptors read method""" + def fileno(self): + return self.reader.fileno() + + def wake(self): + try: + self.writer.send(b("x")) + except IOError: + pass + + def consume(self): try: - return self.reader.recv(1) - except socket.error, ex: - if ex.args[0] == errno.EWOULDBLOCK: - raise IOError - raise + while True: + result = self.reader.recv(1024) + if not result: break + except IOError: + pass - def write(self, data): - """Emulate a file descriptors write method""" - return self.writer.send(data) + def close(self): + self.reader.close() + self.writer.close()