From 1a66183f19ccc61d344771c46b591c239513a254 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 7 Oct 2012 11:16:41 -0700 Subject: [PATCH] Split IOLoop into a base class and poll-based implementation. The base class has all the methods with docstrings; nearly all the implementation is in a new subclass PollIOLoop. --- tornado/ioloop.py | 392 +++++++++++++++++++++---------------- tornado/platform/epoll.py | 6 +- tornado/platform/kqueue.py | 4 +- tornado/platform/select.py | 4 +- 4 files changed, 231 insertions(+), 175 deletions(-) diff --git a/tornado/ioloop.py b/tornado/ioloop.py index 2e1c65e84..f3f8a1bf7 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -98,26 +98,6 @@ class IOLoop(Configurable): io_loop.start() """ - @classmethod - def configurable_base(cls): - return IOLoop - - @classmethod - def configurable_default(cls): - if hasattr(select, "epoll") or sys.platform.startswith('linux'): - try: - from tornado.platform.epoll import EPollIOLoop - return EPollIOLoop - except ImportError: - gen_log.warning("unable to import EPollIOLoop, falling back to SelectIOLoop") - pass - if hasattr(select, "kqueue"): - # Python 2.6+ on BSD or Mac - from tornado.platform.kqueue import KQueueIOLoop - return KQueueIOLoop - from tornado.platform.select import SelectIOLoop - return SelectIOLoop - # Constants from the epoll module _EPOLLIN = 0x001 _EPOLLPRI = 0x002 @@ -139,28 +119,6 @@ class IOLoop(Configurable): _current = threading.local() - def initialize(self, impl, time_func=None): - self._impl = impl - if hasattr(self._impl, 'fileno'): - set_close_exec(self._impl.fileno()) - self.time_func = time_func or time.time - self._handlers = {} - self._events = {} - self._callbacks = [] - self._callback_lock = threading.Lock() - self._timeouts = [] - self._running = False - self._stopped = False - self._thread_ident = None - self._blocking_signal_threshold = None - - # Create a pipe that we send bogus data to when we want to wake - # the I/O loop when it is idle - self._waker = Waker() - self.add_handler(self._waker.fileno(), - lambda fd, events: self._waker.consume(), - self.READ) - @staticmethod def instance(): """Returns a global IOLoop instance. @@ -213,6 +171,29 @@ class IOLoop(Configurable): assert IOLoop._current.instance is self IOLoop._current.instance = None + @classmethod + def configurable_base(cls): + return IOLoop + + @classmethod + def configurable_default(cls): + if hasattr(select, "epoll") or sys.platform.startswith('linux'): + try: + from tornado.platform.epoll import EPollIOLoop + return EPollIOLoop + except ImportError: + gen_log.warning("unable to import EPollIOLoop, falling back to SelectIOLoop") + pass + if hasattr(select, "kqueue"): + # Python 2.6+ on BSD or Mac + from tornado.platform.kqueue import KQueueIOLoop + return KQueueIOLoop + from tornado.platform.select import SelectIOLoop + return SelectIOLoop + + def initialize(self): + pass + def close(self, all_fds=False): """Closes the IOLoop, freeing any resources used. @@ -232,33 +213,19 @@ class IOLoop(Configurable): Therefore the call to `close` will usually appear just after the call to `start` rather than near the call to `stop`. """ - self.remove_handler(self._waker.fileno()) - if all_fds: - for fd in self._handlers.keys()[:]: - try: - os.close(fd) - except Exception: - gen_log.debug("error closing fd %s", fd, exc_info=True) - self._waker.close() - self._impl.close() + raise NotImplementedError() def add_handler(self, fd, handler, events): """Registers the given handler to receive the given events for fd.""" - self._handlers[fd] = stack_context.wrap(handler) - self._impl.register(fd, events | self.ERROR) + raise NotImplementedError() def update_handler(self, fd, events): """Changes the events we listen for fd.""" - self._impl.modify(fd, events | self.ERROR) + raise NotImplementedError() def remove_handler(self, fd): """Stop listening for events on fd.""" - self._handlers.pop(fd, None) - self._events.pop(fd, None) - try: - self._impl.unregister(fd) - except (OSError, IOError): - gen_log.debug("Error deleting fd from IOLoop", exc_info=True) + raise NotImplementedError() def set_blocking_signal_threshold(self, seconds, action): """Sends a signal if the ioloop is blocked for more than s seconds. @@ -271,14 +238,7 @@ class IOLoop(Configurable): If action is None, the process will be killed if it is blocked for too long. """ - if not hasattr(signal, "setitimer"): - gen_log.error("set_blocking_signal_threshold requires a signal module " - "with the setitimer method") - return - self._blocking_signal_threshold = seconds - if seconds is not None: - signal.signal(signal.SIGALRM, - action if action is not None else signal.SIG_DFL) + raise NotImplementedError() def set_blocking_log_threshold(self, seconds): """Logs a stack trace if the ioloop is blocked for more than s seconds. @@ -301,6 +261,202 @@ class IOLoop(Configurable): The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ + raise NotImplementedError() + + def stop(self): + """Stop the loop after the current event loop iteration is complete. + If the event loop is not currently running, the next call to start() + will return immediately. + + To use asynchronous methods from otherwise-synchronous code (such as + unit tests), you can start and stop the event loop like this:: + + ioloop = IOLoop() + async_method(ioloop=ioloop, callback=ioloop.stop) + ioloop.start() + + ioloop.start() will return after async_method has run its callback, + whether that callback was invoked before or after ioloop.start. + + Note that even after `stop` has been called, the IOLoop is not + completely stopped until `IOLoop.start` has also returned. + """ + raise NotImplementedError() + + def running(self): + """Returns true if this IOLoop is currently running.""" + raise NotImplementedError() + + def time(self): + """Returns the current time according to the IOLoop's clock. + + The return value is a floating-point number relative to an + unspecified time in the past. + + By default, the IOLoop's time function is `time.time`. However, + it may be configured to use e.g. `time.monotonic` instead. + Calls to `add_timeout` that pass a number instead of a + `datetime.timedelta` should use this function to compute the + appropriate time, so they can work no matter what time function + is chosen. + """ + return time.time() + + def add_timeout(self, deadline, callback): + """Calls the given callback at the time deadline from the I/O loop. + + Returns a handle that may be passed to remove_timeout to cancel. + + ``deadline`` may be a number denoting a time relative to + `IOLoop.time`, or a ``datetime.timedelta`` object for a + deadline relative to the current time. + + Note that it is not safe to call `add_timeout` from other threads. + Instead, you must use `add_callback` to transfer control to the + IOLoop's thread, and then call `add_timeout` from there. + """ + raise NotImplementedError() + + def remove_timeout(self, timeout): + """Cancels a pending timeout. + + The argument is a handle as returned by add_timeout. + """ + raise NotImplementedError() + + def add_callback(self, callback): + """Calls the given callback on the next I/O loop iteration. + + It is safe to call this method from any thread at any time, + except from a signal handler. Note that this is the *only* + method in IOLoop that makes this thread-safety guarantee; all + other interaction with the IOLoop must be done from that + IOLoop's thread. add_callback() may be used to transfer + control from other threads to the IOLoop's thread. + + To add a callback from a signal handler, see + `add_callback_from_signal`. + """ + raise NotImplementedError() + + def add_callback_from_signal(self, callback): + """Calls the given callback on the next I/O loop iteration. + + Safe for use from a Python signal handler; should not be used + otherwise. + + Callbacks added with this method will be run without any + stack_context, to avoid picking up the context of the function + that was interrupted by the signal. + """ + raise NotImplementedError() + + if futures is not None: + _FUTURE_TYPES = (futures.Future, DummyFuture) + else: + _FUTURE_TYPES = DummyFuture + def add_future(self, future, callback): + """Schedules a callback on the IOLoop when the given future is finished. + + The callback is invoked with one argument, the future. + """ + assert isinstance(future, IOLoop._FUTURE_TYPES) + callback = stack_context.wrap(callback) + future.add_done_callback( + lambda future: self.add_callback( + functools.partial(callback, future))) + + def _run_callback(self, callback): + """Runs a callback with error handling. + + For use in subclasses. + """ + try: + callback() + except Exception: + self.handle_callback_exception(callback) + + def handle_callback_exception(self, callback): + """This method is called whenever a callback run by the IOLoop + throws an exception. + + By default simply logs the exception as an error. Subclasses + may override this method to customize reporting of exceptions. + + The exception itself is not passed explicitly, but is available + in sys.exc_info. + """ + app_log.error("Exception in callback %r", callback, exc_info=True) + + + +class PollIOLoop(IOLoop): + """Base class for IOLoops built around a select-like function. + + For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` + (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or + `tornado.platform.select.SelectIOLoop` (all platforms). + """ + def initialize(self, impl, time_func=None): + super(PollIOLoop, self).initialize() + self._impl = impl + if hasattr(self._impl, 'fileno'): + set_close_exec(self._impl.fileno()) + self.time_func = time_func or time.time + self._handlers = {} + self._events = {} + self._callbacks = [] + self._callback_lock = threading.Lock() + self._timeouts = [] + self._running = False + self._stopped = False + self._thread_ident = None + self._blocking_signal_threshold = None + + # Create a pipe that we send bogus data to when we want to wake + # the I/O loop when it is idle + self._waker = Waker() + self.add_handler(self._waker.fileno(), + lambda fd, events: self._waker.consume(), + self.READ) + + def close(self, all_fds=False): + self.remove_handler(self._waker.fileno()) + if all_fds: + for fd in self._handlers.keys()[:]: + try: + os.close(fd) + except Exception: + gen_log.debug("error closing fd %s", fd, exc_info=True) + self._waker.close() + self._impl.close() + + def add_handler(self, fd, handler, events): + self._handlers[fd] = stack_context.wrap(handler) + self._impl.register(fd, events | self.ERROR) + + def update_handler(self, fd, events): + self._impl.modify(fd, events | self.ERROR) + + def remove_handler(self, fd): + self._handlers.pop(fd, None) + self._events.pop(fd, None) + try: + self._impl.unregister(fd) + except (OSError, IOError): + gen_log.debug("Error deleting fd from IOLoop", exc_info=True) + + def set_blocking_signal_threshold(self, seconds, action): + if not hasattr(signal, "setitimer"): + gen_log.error("set_blocking_signal_threshold requires a signal module " + "with the setitimer method") + return + self._blocking_signal_threshold = seconds + if seconds is not None: + signal.signal(signal.SIGALRM, + action if action is not None else signal.SIG_DFL) + + def start(self): if not logging.getLogger().handlers: # The IOLoop catches and logs exceptions, so it's # important that log output be visible. However, python's @@ -434,68 +590,22 @@ class IOLoop(Configurable): signal.set_wakeup_fd(old_wakeup_fd) def stop(self): - """Stop the loop after the current event loop iteration is complete. - If the event loop is not currently running, the next call to start() - will return immediately. - - To use asynchronous methods from otherwise-synchronous code (such as - unit tests), you can start and stop the event loop like this:: - - ioloop = IOLoop() - async_method(ioloop=ioloop, callback=ioloop.stop) - ioloop.start() - - ioloop.start() will return after async_method has run its callback, - whether that callback was invoked before or after ioloop.start. - - Note that even after `stop` has been called, the IOLoop is not - completely stopped until `IOLoop.start` has also returned. - """ self._running = False self._stopped = True self._waker.wake() def running(self): - """Returns true if this IOLoop is currently running.""" return self._running def time(self): - """Returns the current time according to the IOLoop's clock. - - The return value is a floating-point number relative to an - unspecified time in the past. - - By default, the IOLoop's time function is `time.time`. However, - it may be configured to use e.g. `time.monotonic` instead. - Calls to `add_timeout` that pass a number instead of a - `datetime.timedelta` should use this function to compute the - appropriate time, so they can work no matter what time function - is chosen. - """ return self.time_func() def add_timeout(self, deadline, callback): - """Calls the given callback at the time deadline from the I/O loop. - - Returns a handle that may be passed to remove_timeout to cancel. - - ``deadline`` may be a number denoting a time relative to - `IOLoop.time`, or a ``datetime.timedelta`` object for a - deadline relative to the current time. - - Note that it is not safe to call `add_timeout` from other threads. - Instead, you must use `add_callback` to transfer control to the - IOLoop's thread, and then call `add_timeout` from there. - """ timeout = _Timeout(deadline, stack_context.wrap(callback), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): - """Cancels a pending timeout. - - The argument is a handle as returned by add_timeout. - """ # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). @@ -504,18 +614,6 @@ class IOLoop(Configurable): timeout.callback = None def add_callback(self, callback): - """Calls the given callback on the next I/O loop iteration. - - It is safe to call this method from any thread at any time, - except from a signal handler. Note that this is the *only* - method in IOLoop that makes this thread-safety guarantee; all - other interaction with the IOLoop must be done from that - IOLoop's thread. add_callback() may be used to transfer - control from other threads to the IOLoop's thread. - - To add a callback from a signal handler, see - `add_callback_from_signal`. - """ with self._callback_lock: list_empty = not self._callbacks self._callbacks.append(stack_context.wrap(callback)) @@ -529,15 +627,6 @@ class IOLoop(Configurable): self._waker.wake() def add_callback_from_signal(self, callback): - """Calls the given callback on the next I/O loop iteration. - - Safe for use from a Python signal handler; should not be used - otherwise. - - Callbacks added with this method will be run without any - stack_context, to avoid picking up the context of the function - that was interrupted by the signal. - """ with stack_context.NullContext(): if thread.get_ident() != self._thread_ident: # if the signal is handled on another thread, we can add @@ -554,39 +643,6 @@ class IOLoop(Configurable): # but either way will work. self._callbacks.append(stack_context.wrap(callback)) - if futures is not None: - _FUTURE_TYPES = (futures.Future, DummyFuture) - else: - _FUTURE_TYPES = DummyFuture - def add_future(self, future, callback): - """Schedules a callback on the IOLoop when the given future is finished. - - The callback is invoked with one argument, the future. - """ - assert isinstance(future, IOLoop._FUTURE_TYPES) - callback = stack_context.wrap(callback) - future.add_done_callback( - lambda future: self.add_callback( - functools.partial(callback, future))) - - def _run_callback(self, callback): - try: - callback() - except Exception: - self.handle_callback_exception(callback) - - def handle_callback_exception(self, callback): - """This method is called whenever a callback run by the IOLoop - throws an exception. - - By default simply logs the exception as an error. Subclasses - may override this method to customize reporting of exceptions. - - The exception itself is not passed explicitly, but is available - in sys.exc_info. - """ - app_log.error("Exception in callback %r", callback, exc_info=True) - class _Timeout(object): """An IOLoop timeout, a UNIX timestamp and a callback""" diff --git a/tornado/platform/epoll.py b/tornado/platform/epoll.py index 5a6af2224..fa5b68eda 100644 --- a/tornado/platform/epoll.py +++ b/tornado/platform/epoll.py @@ -23,11 +23,11 @@ from __future__ import absolute_import, division, with_statement import os import select -from tornado.ioloop import IOLoop +from tornado.ioloop import PollIOLoop if hasattr(select, 'epoll'): # Python 2.6+ - class EPollIOLoop(IOLoop): + class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) else: @@ -62,7 +62,7 @@ else: return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000)) - class EPollIOLoop(IOLoop): + class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=_EPoll(), **kwargs) diff --git a/tornado/platform/kqueue.py b/tornado/platform/kqueue.py index 45f4cc9d5..2f14c15c3 100644 --- a/tornado/platform/kqueue.py +++ b/tornado/platform/kqueue.py @@ -18,7 +18,7 @@ from __future__ import absolute_import, division, with_statement import select -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PollIOLoop assert hasattr(select, 'kqueue'), 'kqueue not supported' @@ -86,6 +86,6 @@ class _KQueue(object): return events.items() -class KQueueIOLoop(IOLoop): +class KQueueIOLoop(PollIOLoop): def initialize(self, **kwargs): super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs) diff --git a/tornado/platform/select.py b/tornado/platform/select.py index a8788aca6..51dc964e0 100644 --- a/tornado/platform/select.py +++ b/tornado/platform/select.py @@ -21,7 +21,7 @@ from __future__ import absolute_import, division, with_statement import select -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PollIOLoop class _Select(object): """A simple, select()-based IOLoop implementation for non-Linux systems""" @@ -69,7 +69,7 @@ class _Select(object): events[fd] = events.get(fd, 0) | IOLoop.ERROR return events.items() -class SelectIOLoop(IOLoop): +class SelectIOLoop(PollIOLoop): def initialize(self, **kwargs): super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs) -- 2.47.2