]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Move the waker pipe into tornado.platform.
authorBen Darnell <ben@bendarnell.com>
Tue, 5 Jul 2011 04:04:53 +0000 (21:04 -0700)
committerBen Darnell <ben@bendarnell.com>
Tue, 5 Jul 2011 04:04:53 +0000 (21:04 -0700)
tornado/ioloop.py
tornado/platform/auto.py
tornado/platform/interface.py
tornado/platform/posix.py
tornado/platform/windows.py

index 4bf64fc20deb3f82dee4d667b0f8bb29c6502357..261c75d0f1b2af02ea0730ecf7041eaa890407b5 100644 (file)
@@ -36,23 +36,13 @@ import time
 import traceback
 
 from tornado import stack_context
-from tornado.escape import utf8
 
 try:
     import signal
 except ImportError:
     signal = None
 
-from tornado.platform.auto import set_close_exec
-
-try:
-    import fcntl
-except ImportError:
-    if os.name == 'nt':
-        from tornado.platform import windows
-    else:
-        raise
-    
+from tornado.platform.auto import set_close_exec, Waker
 
 
 class IOLoop(object):
@@ -125,18 +115,10 @@ class IOLoop(object):
 
         # Create a pipe that we send bogus data to when we want to wake
         # the I/O loop when it is idle
-        if os.name != 'nt':
-            r, w = os.pipe()
-            self._set_nonblocking(r)
-            self._set_nonblocking(w)
-            set_close_exec(r)
-            set_close_exec(w)
-            self._waker_reader = os.fdopen(r, "rb", 0)
-            self._waker_writer = os.fdopen(w, "wb", 0)
-        else:
-            self._waker_reader = self._waker_writer = windows.Pipe()
-            r = self._waker_writer.reader_fd
-        self.add_handler(r, self._read_waker, self.READ)
+        self._waker = Waker()
+        self.add_handler(self._waker.fileno(),
+                         lambda fd, events: self._waker.consume(),
+                         self.READ)
 
     @classmethod
     def instance(cls):
@@ -169,20 +151,14 @@ class IOLoop(object):
         If ``all_fds`` is true, all file descriptors registered on the
         IOLoop will be closed (not just the ones created by the IOLoop itself.
         """
+        self.remove_handler(self._waker.fileno())
         if all_fds:
             for fd in self._handlers.keys()[:]:
-                if fd in (self._waker_reader.fileno(),
-                          self._waker_writer.fileno()):
-                    # Close these through the file objects that wrap them,
-                    # or else the destructor will try to close them later
-                    # and log a warning
-                    continue
                 try:
                     os.close(fd)
                 except Exception:
                     logging.debug("error closing fd %d", fd, exc_info=True)
-        self._waker_reader.close()
-        self._waker_writer.close()
+        self._waker.close()
         self._impl.close()
 
     def add_handler(self, fd, handler, events):
@@ -347,7 +323,7 @@ class IOLoop(object):
         """
         self._running = False
         self._stopped = True
-        self._wake()
+        self._waker.wake()
 
     def running(self):
         """Returns true if this IOLoop is currently running."""
@@ -384,15 +360,9 @@ class IOLoop(object):
         control from other threads to the IOLoop's thread.
         """
         if not self._callbacks and thread.get_ident() != self._thread_ident:
-            self._wake()
+            self._waker.wake()
         self._callbacks.append(stack_context.wrap(callback))
 
-    def _wake(self):
-        try:
-            self._waker_writer.write(utf8("x"))
-        except IOError:
-            pass
-
     def _run_callback(self, callback):
         try:
             callback()
@@ -411,18 +381,6 @@ class IOLoop(object):
         """
         logging.error("Exception in callback %r", callback, exc_info=True)
 
-    def _read_waker(self, fd, events):
-        try:
-            while True:
-                result = self._waker_reader.read()
-                if not result: break
-        except IOError:
-            pass
-
-    def _set_nonblocking(self, fd):
-        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
-        fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
 
 class _Timeout(object):
     """An IOLoop timeout, a UNIX timestamp and a callback"""
index 9bc411daa2e20de82a34c47745e22be0df4fc664..e76d731b3ed15eccd50336ded1565ffc8640749a 100644 (file)
@@ -26,6 +26,6 @@ Most code that needs access to this functionality should do e.g.::
 import os
 
 if os.name == 'nt':
-    from .windows import set_close_exec
+    from tornado.platform.windows import set_close_exec, Waker
 else:
-    from .posix import set_close_exec
+    from tornado.platform.posix import set_close_exec, Waker
index fb5e61802a2c4c0a17c887210476d80b5faa6d57..20f0f7161bf0d4fe2f4317a4c4e4abc4f8d8d6e2 100644 (file)
@@ -24,3 +24,34 @@ implementation from `tornado.platform.auto`.
 def set_close_exec(fd):
     """Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor."""
     raise NotImplementedError()
+
+class Waker(object):
+    """A socket-like object that can wake another thread from ``select()``.
+
+    The `~tornado.ioloop.IOLoop` will add the Waker's `fileno()` to
+    its ``select`` (or ``epoll`` or ``kqueue``) calls.  When another
+    thread wants to wake up the loop, it calls `wake`.  Once it has woken
+    up, it will call `consume` to do any necessary per-wake cleanup.  When
+    the ``IOLoop`` is closed, it closes its waker too.
+    """
+    def fileno(self):
+        """Returns a file descriptor for this waker.
+        
+        Must be suitable for use with ``select()`` or equivalent on the
+        local platform.
+        """
+        raise NotImplementedError()
+
+    def wake(self):
+        """Triggers activity on the waker's file descriptor."""
+        raise NotImplementedError()
+
+    def consume(self):
+        """Called after the listen has woken up to do any necessary cleanup."""
+        raise NotImplementedError()
+
+    def close(self):
+        """Closes the waker's file descriptor(s)."""
+        raise NotImplementedError()
+
+    
index 673660de4645e4d824172f4d46ee2673d80d2d0c..aa09b31c75506d0b11a0f6ad0e01156256a0eed0 100644 (file)
 """Posix implementations of platform-specific functionality."""
 
 import fcntl
+import os
+
+from tornado.platform import interface
+from tornado.util import b
 
 def set_close_exec(fd):
     flags = fcntl.fcntl(fd, fcntl.F_GETFD)
     fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+def _set_nonblocking(fd):
+    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+    
+class Waker(interface.Waker):
+    def __init__(self):
+        r, w = os.pipe()
+        _set_nonblocking(r)
+        _set_nonblocking(w)
+        set_close_exec(r)
+        set_close_exec(w)
+        self.reader = os.fdopen(r, "rb", 0)
+        self.writer = os.fdopen(w, "wb", 0)
+
+    def fileno(self):
+        return self.reader.fileno()
+
+    def wake(self):
+        try:
+            self.writer.write(b("x"))
+        except IOError:
+            pass
+
+    def consume(self):
+        try:
+            while True:
+                result = self.reader.read()
+                if not result: break;
+        except IOError:
+            pass
+
+    def close(self):
+        self.reader.close()
+        self.writer.close()
index e138dc1842cff76bd23b9028ca9ef595df1099ad..1735f1b38d0f3fd4119f4844209a6d72fc3a4d8b 100644 (file)
@@ -6,6 +6,9 @@ import ctypes.wintypes
 import socket
 import errno
 
+from tornado.platform import interface
+from tornado.util import b
+
 # See: http://msdn.microsoft.com/en-us/library/ms724935(VS.85).aspx
 SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation
 SetHandleInformation.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD)
@@ -20,7 +23,7 @@ def set_close_exec(fd):
         raise ctypes.GetLastError()
 
 
-class Pipe(object):
+class Waker(interface.Waker):
     """Create an OS independent asynchronous pipe"""
     def __init__(self):
         # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py
@@ -72,15 +75,23 @@ class Pipe(object):
         a.close()
         self.reader_fd = self.reader.fileno()
 
-    def read(self):
-        """Emulate a file descriptors read method"""
+    def fileno(self):
+        return self.reader.fileno()
+
+    def wake(self):
+        try:
+            self.writer.send(b("x"))
+        except IOError:
+            pass
+
+    def consume(self):
         try:
-            return self.reader.recv(1)
-        except socket.error, ex:
-            if ex.args[0] == errno.EWOULDBLOCK:
-                raise IOError
-            raise
+            while True:
+                result = self.reader.recv(1024)
+                if not result: break
+        except IOError:
+            pass
 
-    def write(self, data):
-        """Emulate a file descriptors write method"""
-        return self.writer.send(data)
+    def close(self):
+        self.reader.close()
+        self.writer.close()