]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Split IOLoop into a base class and poll-based implementation.
authorBen Darnell <ben@bendarnell.com>
Sun, 7 Oct 2012 18:16:41 +0000 (11:16 -0700)
committerBen Darnell <ben@bendarnell.com>
Sun, 7 Oct 2012 18:16:41 +0000 (11:16 -0700)
The base class has all the methods with docstrings; nearly all the
implementation is in a new subclass PollIOLoop.

tornado/ioloop.py
tornado/platform/epoll.py
tornado/platform/kqueue.py
tornado/platform/select.py

index 2e1c65e84d0fb1197c9386b50cfe6ce9b468f8b6..f3f8a1bf76367ff4e141697144b924afb75b419a 100644 (file)
@@ -98,26 +98,6 @@ class IOLoop(Configurable):
         io_loop.start()
 
     """
-    @classmethod
-    def configurable_base(cls):
-        return IOLoop
-
-    @classmethod
-    def configurable_default(cls):
-        if hasattr(select, "epoll") or sys.platform.startswith('linux'):
-            try:
-                from tornado.platform.epoll import EPollIOLoop
-                return EPollIOLoop
-            except ImportError:
-                gen_log.warning("unable to import EPollIOLoop, falling back to SelectIOLoop")
-                pass
-        if hasattr(select, "kqueue"):
-            # Python 2.6+ on BSD or Mac
-            from tornado.platform.kqueue import KQueueIOLoop
-            return KQueueIOLoop
-        from tornado.platform.select import SelectIOLoop
-        return SelectIOLoop
-
     # Constants from the epoll module
     _EPOLLIN = 0x001
     _EPOLLPRI = 0x002
@@ -139,28 +119,6 @@ class IOLoop(Configurable):
 
     _current = threading.local()
 
-    def initialize(self, impl, time_func=None):
-        self._impl = impl
-        if hasattr(self._impl, 'fileno'):
-            set_close_exec(self._impl.fileno())
-        self.time_func = time_func or time.time
-        self._handlers = {}
-        self._events = {}
-        self._callbacks = []
-        self._callback_lock = threading.Lock()
-        self._timeouts = []
-        self._running = False
-        self._stopped = False
-        self._thread_ident = None
-        self._blocking_signal_threshold = None
-
-        # Create a pipe that we send bogus data to when we want to wake
-        # the I/O loop when it is idle
-        self._waker = Waker()
-        self.add_handler(self._waker.fileno(),
-                         lambda fd, events: self._waker.consume(),
-                         self.READ)
-
     @staticmethod
     def instance():
         """Returns a global IOLoop instance.
@@ -213,6 +171,29 @@ class IOLoop(Configurable):
         assert IOLoop._current.instance is self
         IOLoop._current.instance = None
 
+    @classmethod
+    def configurable_base(cls):
+        return IOLoop
+
+    @classmethod
+    def configurable_default(cls):
+        if hasattr(select, "epoll") or sys.platform.startswith('linux'):
+            try:
+                from tornado.platform.epoll import EPollIOLoop
+                return EPollIOLoop
+            except ImportError:
+                gen_log.warning("unable to import EPollIOLoop, falling back to SelectIOLoop")
+                pass
+        if hasattr(select, "kqueue"):
+            # Python 2.6+ on BSD or Mac
+            from tornado.platform.kqueue import KQueueIOLoop
+            return KQueueIOLoop
+        from tornado.platform.select import SelectIOLoop
+        return SelectIOLoop
+
+    def initialize(self):
+        pass
+
     def close(self, all_fds=False):
         """Closes the IOLoop, freeing any resources used.
 
@@ -232,33 +213,19 @@ class IOLoop(Configurable):
         Therefore the call to `close` will usually appear just after
         the call to `start` rather than near the call to `stop`.
         """
-        self.remove_handler(self._waker.fileno())
-        if all_fds:
-            for fd in self._handlers.keys()[:]:
-                try:
-                    os.close(fd)
-                except Exception:
-                    gen_log.debug("error closing fd %s", fd, exc_info=True)
-        self._waker.close()
-        self._impl.close()
+        raise NotImplementedError()
 
     def add_handler(self, fd, handler, events):
         """Registers the given handler to receive the given events for fd."""
-        self._handlers[fd] = stack_context.wrap(handler)
-        self._impl.register(fd, events | self.ERROR)
+        raise NotImplementedError()
 
     def update_handler(self, fd, events):
         """Changes the events we listen for fd."""
-        self._impl.modify(fd, events | self.ERROR)
+        raise NotImplementedError()
 
     def remove_handler(self, fd):
         """Stop listening for events on fd."""
-        self._handlers.pop(fd, None)
-        self._events.pop(fd, None)
-        try:
-            self._impl.unregister(fd)
-        except (OSError, IOError):
-            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
+        raise NotImplementedError()
 
     def set_blocking_signal_threshold(self, seconds, action):
         """Sends a signal if the ioloop is blocked for more than s seconds.
@@ -271,14 +238,7 @@ class IOLoop(Configurable):
         If action is None, the process will be killed if it is blocked for
         too long.
         """
-        if not hasattr(signal, "setitimer"):
-            gen_log.error("set_blocking_signal_threshold requires a signal module "
-                           "with the setitimer method")
-            return
-        self._blocking_signal_threshold = seconds
-        if seconds is not None:
-            signal.signal(signal.SIGALRM,
-                          action if action is not None else signal.SIG_DFL)
+        raise NotImplementedError()
 
     def set_blocking_log_threshold(self, seconds):
         """Logs a stack trace if the ioloop is blocked for more than s seconds.
@@ -301,6 +261,202 @@ class IOLoop(Configurable):
         The loop will run until one of the I/O handlers calls stop(), which
         will make the loop stop after the current event iteration completes.
         """
+        raise NotImplementedError()
+
+    def stop(self):
+        """Stop the loop after the current event loop iteration is complete.
+        If the event loop is not currently running, the next call to start()
+        will return immediately.
+
+        To use asynchronous methods from otherwise-synchronous code (such as
+        unit tests), you can start and stop the event loop like this::
+
+          ioloop = IOLoop()
+          async_method(ioloop=ioloop, callback=ioloop.stop)
+          ioloop.start()
+
+        ioloop.start() will return after async_method has run its callback,
+        whether that callback was invoked before or after ioloop.start.
+
+        Note that even after `stop` has been called, the IOLoop is not
+        completely stopped until `IOLoop.start` has also returned.
+        """
+        raise NotImplementedError()
+
+    def running(self):
+        """Returns true if this IOLoop is currently running."""
+        raise NotImplementedError()
+
+    def time(self):
+        """Returns the current time according to the IOLoop's clock.
+
+        The return value is a floating-point number relative to an
+        unspecified time in the past.
+
+        By default, the IOLoop's time function is `time.time`.  However,
+        it may be configured to use e.g. `time.monotonic` instead.
+        Calls to `add_timeout` that pass a number instead of a
+        `datetime.timedelta` should use this function to compute the
+        appropriate time, so they can work no matter what time function
+        is chosen.
+        """
+        return time.time()
+
+    def add_timeout(self, deadline, callback):
+        """Calls the given callback at the time deadline from the I/O loop.
+
+        Returns a handle that may be passed to remove_timeout to cancel.
+
+        ``deadline`` may be a number denoting a time relative to
+        `IOLoop.time`, or a ``datetime.timedelta`` object for a
+        deadline relative to the current time.
+
+        Note that it is not safe to call `add_timeout` from other threads.
+        Instead, you must use `add_callback` to transfer control to the
+        IOLoop's thread, and then call `add_timeout` from there.
+        """
+        raise NotImplementedError()
+
+    def remove_timeout(self, timeout):
+        """Cancels a pending timeout.
+
+        The argument is a handle as returned by add_timeout.
+        """
+        raise NotImplementedError()
+
+    def add_callback(self, callback):
+        """Calls the given callback on the next I/O loop iteration.
+
+        It is safe to call this method from any thread at any time,
+        except from a signal handler.  Note that this is the *only*
+        method in IOLoop that makes this thread-safety guarantee; all
+        other interaction with the IOLoop must be done from that
+        IOLoop's thread.  add_callback() may be used to transfer
+        control from other threads to the IOLoop's thread.
+
+        To add a callback from a signal handler, see
+        `add_callback_from_signal`.
+        """
+        raise NotImplementedError()
+
+    def add_callback_from_signal(self, callback):
+        """Calls the given callback on the next I/O loop iteration.
+
+        Safe for use from a Python signal handler; should not be used
+        otherwise.
+
+        Callbacks added with this method will be run without any
+        stack_context, to avoid picking up the context of the function
+        that was interrupted by the signal.
+        """
+        raise NotImplementedError()
+
+    if futures is not None:
+        _FUTURE_TYPES = (futures.Future, DummyFuture)
+    else:
+        _FUTURE_TYPES = DummyFuture
+    def add_future(self, future, callback):
+        """Schedules a callback on the IOLoop when the given future is finished.
+
+        The callback is invoked with one argument, the future.
+        """
+        assert isinstance(future, IOLoop._FUTURE_TYPES)
+        callback = stack_context.wrap(callback)
+        future.add_done_callback(
+            lambda future: self.add_callback(
+                functools.partial(callback, future)))
+
+    def _run_callback(self, callback):
+        """Runs a callback with error handling.
+
+        For use in subclasses.
+        """
+        try:
+            callback()
+        except Exception:
+            self.handle_callback_exception(callback)
+
+    def handle_callback_exception(self, callback):
+        """This method is called whenever a callback run by the IOLoop
+        throws an exception.
+
+        By default simply logs the exception as an error.  Subclasses
+        may override this method to customize reporting of exceptions.
+
+        The exception itself is not passed explicitly, but is available
+        in sys.exc_info.
+        """
+        app_log.error("Exception in callback %r", callback, exc_info=True)
+
+
+
+class PollIOLoop(IOLoop):
+    """Base class for IOLoops built around a select-like function.
+
+    For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
+    (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
+    `tornado.platform.select.SelectIOLoop` (all platforms).
+    """
+    def initialize(self, impl, time_func=None):
+        super(PollIOLoop, self).initialize()
+        self._impl = impl
+        if hasattr(self._impl, 'fileno'):
+            set_close_exec(self._impl.fileno())
+        self.time_func = time_func or time.time
+        self._handlers = {}
+        self._events = {}
+        self._callbacks = []
+        self._callback_lock = threading.Lock()
+        self._timeouts = []
+        self._running = False
+        self._stopped = False
+        self._thread_ident = None
+        self._blocking_signal_threshold = None
+
+        # Create a pipe that we send bogus data to when we want to wake
+        # the I/O loop when it is idle
+        self._waker = Waker()
+        self.add_handler(self._waker.fileno(),
+                         lambda fd, events: self._waker.consume(),
+                         self.READ)
+
+    def close(self, all_fds=False):
+        self.remove_handler(self._waker.fileno())
+        if all_fds:
+            for fd in self._handlers.keys()[:]:
+                try:
+                    os.close(fd)
+                except Exception:
+                    gen_log.debug("error closing fd %s", fd, exc_info=True)
+        self._waker.close()
+        self._impl.close()
+
+    def add_handler(self, fd, handler, events):
+        self._handlers[fd] = stack_context.wrap(handler)
+        self._impl.register(fd, events | self.ERROR)
+
+    def update_handler(self, fd, events):
+        self._impl.modify(fd, events | self.ERROR)
+
+    def remove_handler(self, fd):
+        self._handlers.pop(fd, None)
+        self._events.pop(fd, None)
+        try:
+            self._impl.unregister(fd)
+        except (OSError, IOError):
+            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
+
+    def set_blocking_signal_threshold(self, seconds, action):
+        if not hasattr(signal, "setitimer"):
+            gen_log.error("set_blocking_signal_threshold requires a signal module "
+                           "with the setitimer method")
+            return
+        self._blocking_signal_threshold = seconds
+        if seconds is not None:
+            signal.signal(signal.SIGALRM,
+                          action if action is not None else signal.SIG_DFL)
+
+    def start(self):
         if not logging.getLogger().handlers:
             # The IOLoop catches and logs exceptions, so it's
             # important that log output be visible.  However, python's
@@ -434,68 +590,22 @@ class IOLoop(Configurable):
             signal.set_wakeup_fd(old_wakeup_fd)
 
     def stop(self):
-        """Stop the loop after the current event loop iteration is complete.
-        If the event loop is not currently running, the next call to start()
-        will return immediately.
-
-        To use asynchronous methods from otherwise-synchronous code (such as
-        unit tests), you can start and stop the event loop like this::
-
-          ioloop = IOLoop()
-          async_method(ioloop=ioloop, callback=ioloop.stop)
-          ioloop.start()
-
-        ioloop.start() will return after async_method has run its callback,
-        whether that callback was invoked before or after ioloop.start.
-
-        Note that even after `stop` has been called, the IOLoop is not
-        completely stopped until `IOLoop.start` has also returned.
-        """
         self._running = False
         self._stopped = True
         self._waker.wake()
 
     def running(self):
-        """Returns true if this IOLoop is currently running."""
         return self._running
 
     def time(self):
-        """Returns the current time according to the IOLoop's clock.
-
-        The return value is a floating-point number relative to an
-        unspecified time in the past.
-
-        By default, the IOLoop's time function is `time.time`.  However,
-        it may be configured to use e.g. `time.monotonic` instead.
-        Calls to `add_timeout` that pass a number instead of a
-        `datetime.timedelta` should use this function to compute the
-        appropriate time, so they can work no matter what time function
-        is chosen.
-        """
         return self.time_func()
 
     def add_timeout(self, deadline, callback):
-        """Calls the given callback at the time deadline from the I/O loop.
-
-        Returns a handle that may be passed to remove_timeout to cancel.
-
-        ``deadline`` may be a number denoting a time relative to
-        `IOLoop.time`, or a ``datetime.timedelta`` object for a
-        deadline relative to the current time.
-
-        Note that it is not safe to call `add_timeout` from other threads.
-        Instead, you must use `add_callback` to transfer control to the
-        IOLoop's thread, and then call `add_timeout` from there.
-        """
         timeout = _Timeout(deadline, stack_context.wrap(callback), self)
         heapq.heappush(self._timeouts, timeout)
         return timeout
 
     def remove_timeout(self, timeout):
-        """Cancels a pending timeout.
-
-        The argument is a handle as returned by add_timeout.
-        """
         # Removing from a heap is complicated, so just leave the defunct
         # timeout object in the queue (see discussion in
         # http://docs.python.org/library/heapq.html).
@@ -504,18 +614,6 @@ class IOLoop(Configurable):
         timeout.callback = None
 
     def add_callback(self, callback):
-        """Calls the given callback on the next I/O loop iteration.
-
-        It is safe to call this method from any thread at any time,
-        except from a signal handler.  Note that this is the *only*
-        method in IOLoop that makes this thread-safety guarantee; all
-        other interaction with the IOLoop must be done from that
-        IOLoop's thread.  add_callback() may be used to transfer
-        control from other threads to the IOLoop's thread.
-
-        To add a callback from a signal handler, see
-        `add_callback_from_signal`.
-        """
         with self._callback_lock:
             list_empty = not self._callbacks
             self._callbacks.append(stack_context.wrap(callback))
@@ -529,15 +627,6 @@ class IOLoop(Configurable):
             self._waker.wake()
 
     def add_callback_from_signal(self, callback):
-        """Calls the given callback on the next I/O loop iteration.
-
-        Safe for use from a Python signal handler; should not be used
-        otherwise.
-
-        Callbacks added with this method will be run without any
-        stack_context, to avoid picking up the context of the function
-        that was interrupted by the signal.
-        """
         with stack_context.NullContext():
             if thread.get_ident() != self._thread_ident:
                 # if the signal is handled on another thread, we can add
@@ -554,39 +643,6 @@ class IOLoop(Configurable):
                 # but either way will work.
                 self._callbacks.append(stack_context.wrap(callback))
 
-    if futures is not None:
-        _FUTURE_TYPES = (futures.Future, DummyFuture)
-    else:
-        _FUTURE_TYPES = DummyFuture
-    def add_future(self, future, callback):
-        """Schedules a callback on the IOLoop when the given future is finished.
-
-        The callback is invoked with one argument, the future.
-        """
-        assert isinstance(future, IOLoop._FUTURE_TYPES)
-        callback = stack_context.wrap(callback)
-        future.add_done_callback(
-            lambda future: self.add_callback(
-                functools.partial(callback, future)))
-
-    def _run_callback(self, callback):
-        try:
-            callback()
-        except Exception:
-            self.handle_callback_exception(callback)
-
-    def handle_callback_exception(self, callback):
-        """This method is called whenever a callback run by the IOLoop
-        throws an exception.
-
-        By default simply logs the exception as an error.  Subclasses
-        may override this method to customize reporting of exceptions.
-
-        The exception itself is not passed explicitly, but is available
-        in sys.exc_info.
-        """
-        app_log.error("Exception in callback %r", callback, exc_info=True)
-
 
 class _Timeout(object):
     """An IOLoop timeout, a UNIX timestamp and a callback"""
index 5a6af222418573b7240e55e05af928fa2d604c63..fa5b68eda95e73a07b662e53b0582aeea32c2095 100644 (file)
@@ -23,11 +23,11 @@ from __future__ import absolute_import, division, with_statement
 import os
 import select
 
-from tornado.ioloop import IOLoop
+from tornado.ioloop import PollIOLoop
 
 if hasattr(select, 'epoll'):
     # Python 2.6+
-    class EPollIOLoop(IOLoop):
+    class EPollIOLoop(PollIOLoop):
         def initialize(self, **kwargs):
             super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
 else:
@@ -62,7 +62,7 @@ else:
             return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
 
 
-    class EPollIOLoop(IOLoop):
+    class EPollIOLoop(PollIOLoop):
         def initialize(self, **kwargs):
             super(EPollIOLoop, self).initialize(impl=_EPoll(), **kwargs)
 
index 45f4cc9d5487e888d73053b71442932bc1aec045..2f14c15c3742c6a68741aafa4b3c2826d73aa243 100644 (file)
@@ -18,7 +18,7 @@ from __future__ import absolute_import, division, with_statement
 
 import select
 
-from tornado.ioloop import IOLoop
+from tornado.ioloop import IOLoop, PollIOLoop
 
 assert hasattr(select, 'kqueue'), 'kqueue not supported'
 
@@ -86,6 +86,6 @@ class _KQueue(object):
         return events.items()
 
 
-class KQueueIOLoop(IOLoop):
+class KQueueIOLoop(PollIOLoop):
     def initialize(self, **kwargs):
         super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs)
index a8788aca65d01c65123efb93b2da58a326ef21f9..51dc964e02365942e97e6756e50998c1e16ba157 100644 (file)
@@ -21,7 +21,7 @@ from __future__ import absolute_import, division, with_statement
 
 import select
 
-from tornado.ioloop import IOLoop
+from tornado.ioloop import IOLoop, PollIOLoop
 
 class _Select(object):
     """A simple, select()-based IOLoop implementation for non-Linux systems"""
@@ -69,7 +69,7 @@ class _Select(object):
             events[fd] = events.get(fd, 0) | IOLoop.ERROR
         return events.items()
 
-class SelectIOLoop(IOLoop):
+class SelectIOLoop(PollIOLoop):
     def initialize(self, **kwargs):
         super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)