From: Ben Darnell Date: Fri, 6 Jul 2018 21:08:44 +0000 (-0400) Subject: ioloop: Delete PollIOLoop X-Git-Tag: v6.0.0b1~48^2~25 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d4baaa3d90fe35c0d47e11edeb61eb03bc8a589e;p=thirdparty%2Ftornado.git ioloop: Delete PollIOLoop This was py2-only and is no longer used. --- diff --git a/docs/conf.py b/docs/conf.py index 39345d282..de2f60cfa 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -43,9 +43,6 @@ coverage_ignore_classes = [ # tornado.gen "Runner", - # tornado.ioloop - "PollIOLoop", - # tornado.web "ChunkedTransferEncoding", "GZipContentEncoding", diff --git a/maint/test/pyuv/tox.ini b/maint/test/pyuv/tox.ini deleted file mode 100644 index 8b6d569a7..000000000 --- a/maint/test/pyuv/tox.ini +++ /dev/null @@ -1,13 +0,0 @@ -[tox] -envlist = py27 -setupdir = ../../.. - -[testenv] -commands = - python -m tornado.test.runtests --ioloop=tornaduv.UVLoop {posargs:} -# twisted tests don't work on pyuv IOLoop currently. -deps = - pyuv - tornaduv - futures - mock diff --git a/tornado/ioloop.py b/tornado/ioloop.py index f7ee6dd60..d12b8312b 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -32,16 +32,10 @@ events. `IOLoop.add_timeout` is a non-blocking alternative to from __future__ import absolute_import, division, print_function -import collections import datetime -import errno -import functools -import heapq -import itertools import logging import numbers import os -import select import sys import threading import time @@ -51,12 +45,8 @@ import random from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info, future_add_done_callback # noqa: E501 from tornado.log import app_log, gen_log -from tornado.platform.auto import set_close_exec, Waker from tornado import stack_context -from tornado.util import ( - PY3, Configurable, errno_from_exception, timedelta_to_seconds, - TimeoutError, unicode_type, import_object, -) +from tornado.util import Configurable, timedelta_to_seconds, TimeoutError, unicode_type, import_object try: import signal @@ -68,20 +58,12 @@ try: except ImportError: ThreadPoolExecutor = None -if PY3: - import _thread as thread -else: - import thread - try: import asyncio except ImportError: asyncio = None -_POLL_TIMEOUT = 3600.0 - - class IOLoop(Configurable): """A level-triggered I/O loop. @@ -345,10 +327,8 @@ class IOLoop(Configurable): @classmethod def configurable_default(cls): - if asyncio is not None: - from tornado.platform.asyncio import AsyncIOLoop - return AsyncIOLoop - return PollIOLoop + from tornado.platform.asyncio import AsyncIOLoop + return AsyncIOLoop def initialize(self, make_current=None): if make_current is None: @@ -842,303 +822,6 @@ class IOLoop(Configurable): pass -class PollIOLoop(IOLoop): - """Base class for IOLoops built around a select-like function. - - For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` - (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or - `tornado.platform.select.SelectIOLoop` (all platforms). - """ - def initialize(self, impl, time_func=None, **kwargs): - super(PollIOLoop, self).initialize(**kwargs) - self._impl = impl - if hasattr(self._impl, 'fileno'): - set_close_exec(self._impl.fileno()) - self.time_func = time_func or time.time - self._handlers = {} - self._events = {} - self._callbacks = collections.deque() - self._timeouts = [] - self._cancellations = 0 - self._running = False - self._stopped = False - self._closing = False - self._thread_ident = None - self._pid = os.getpid() - self._blocking_signal_threshold = None - self._timeout_counter = itertools.count() - - # Create a pipe that we send bogus data to when we want to wake - # the I/O loop when it is idle - self._waker = Waker() - self.add_handler(self._waker.fileno(), - lambda fd, events: self._waker.consume(), - self.READ) - - @classmethod - def configurable_base(cls): - return PollIOLoop - - @classmethod - def configurable_default(cls): - if hasattr(select, "epoll"): - from tornado.platform.epoll import EPollIOLoop - return EPollIOLoop - if hasattr(select, "kqueue"): - # Python 2.6+ on BSD or Mac - from tornado.platform.kqueue import KQueueIOLoop - return KQueueIOLoop - from tornado.platform.select import SelectIOLoop - return SelectIOLoop - - def close(self, all_fds=False): - self._closing = True - self.remove_handler(self._waker.fileno()) - if all_fds: - for fd, handler in list(self._handlers.values()): - self.close_fd(fd) - self._waker.close() - self._impl.close() - self._callbacks = None - self._timeouts = None - if hasattr(self, '_executor'): - self._executor.shutdown() - - def add_handler(self, fd, handler, events): - fd, obj = self.split_fd(fd) - self._handlers[fd] = (obj, stack_context.wrap(handler)) - self._impl.register(fd, events | self.ERROR) - - def update_handler(self, fd, events): - fd, obj = self.split_fd(fd) - self._impl.modify(fd, events | self.ERROR) - - def remove_handler(self, fd): - fd, obj = self.split_fd(fd) - self._handlers.pop(fd, None) - self._events.pop(fd, None) - try: - self._impl.unregister(fd) - except Exception: - gen_log.debug("Error deleting fd from IOLoop", exc_info=True) - - def set_blocking_signal_threshold(self, seconds, action): - if not hasattr(signal, "setitimer"): - gen_log.error("set_blocking_signal_threshold requires a signal module " - "with the setitimer method") - return - self._blocking_signal_threshold = seconds - if seconds is not None: - signal.signal(signal.SIGALRM, - action if action is not None else signal.SIG_DFL) - - def start(self): - if self._running: - raise RuntimeError("IOLoop is already running") - if os.getpid() != self._pid: - raise RuntimeError("Cannot share PollIOLoops across processes") - self._setup_logging() - if self._stopped: - self._stopped = False - return - old_current = IOLoop.current(instance=False) - if old_current is not self: - self.make_current() - self._thread_ident = thread.get_ident() - self._running = True - - # signal.set_wakeup_fd closes a race condition in event loops: - # a signal may arrive at the beginning of select/poll/etc - # before it goes into its interruptible sleep, so the signal - # will be consumed without waking the select. The solution is - # for the (C, synchronous) signal handler to write to a pipe, - # which will then be seen by select. - # - # In python's signal handling semantics, this only matters on the - # main thread (fortunately, set_wakeup_fd only works on the main - # thread and will raise a ValueError otherwise). - # - # If someone has already set a wakeup fd, we don't want to - # disturb it. This is an issue for twisted, which does its - # SIGCHLD processing in response to its own wakeup fd being - # written to. As long as the wakeup fd is registered on the IOLoop, - # the loop will still wake up and everything should work. - old_wakeup_fd = None - if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': - # requires python 2.6+, unix. set_wakeup_fd exists but crashes - # the python process on windows. - try: - old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) - if old_wakeup_fd != -1: - # Already set, restore previous value. This is a little racy, - # but there's no clean get_wakeup_fd and in real use the - # IOLoop is just started once at the beginning. - signal.set_wakeup_fd(old_wakeup_fd) - old_wakeup_fd = None - except ValueError: - # Non-main thread, or the previous value of wakeup_fd - # is no longer valid. - old_wakeup_fd = None - - try: - while True: - # Prevent IO event starvation by delaying new callbacks - # to the next iteration of the event loop. - ncallbacks = len(self._callbacks) - - # Add any timeouts that have come due to the callback list. - # Do not run anything until we have determined which ones - # are ready, so timeouts that call add_timeout cannot - # schedule anything in this iteration. - due_timeouts = [] - if self._timeouts: - now = self.time() - while self._timeouts: - if self._timeouts[0].callback is None: - # The timeout was cancelled. Note that the - # cancellation check is repeated below for timeouts - # that are cancelled by another timeout or callback. - heapq.heappop(self._timeouts) - self._cancellations -= 1 - elif self._timeouts[0].deadline <= now: - due_timeouts.append(heapq.heappop(self._timeouts)) - else: - break - if (self._cancellations > 512 and - self._cancellations > (len(self._timeouts) >> 1)): - # Clean up the timeout queue when it gets large and it's - # more than half cancellations. - self._cancellations = 0 - self._timeouts = [x for x in self._timeouts - if x.callback is not None] - heapq.heapify(self._timeouts) - - for i in range(ncallbacks): - self._run_callback(self._callbacks.popleft()) - for timeout in due_timeouts: - if timeout.callback is not None: - self._run_callback(timeout.callback) - # Closures may be holding on to a lot of memory, so allow - # them to be freed before we go into our poll wait. - due_timeouts = timeout = None - - if self._callbacks: - # If any callbacks or timeouts called add_callback, - # we don't want to wait in poll() before we run them. - poll_timeout = 0.0 - elif self._timeouts: - # If there are any timeouts, schedule the first one. - # Use self.time() instead of 'now' to account for time - # spent running callbacks. - poll_timeout = self._timeouts[0].deadline - self.time() - poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) - else: - # No timeouts and no callbacks, so use the default. - poll_timeout = _POLL_TIMEOUT - - if not self._running: - break - - if self._blocking_signal_threshold is not None: - # clear alarm so it doesn't fire while poll is waiting for - # events. - signal.setitimer(signal.ITIMER_REAL, 0, 0) - - try: - event_pairs = self._impl.poll(poll_timeout) - except Exception as e: - # Depending on python version and IOLoop implementation, - # different exception types may be thrown and there are - # two ways EINTR might be signaled: - # * e.errno == errno.EINTR - # * e.args is like (errno.EINTR, 'Interrupted system call') - if errno_from_exception(e) == errno.EINTR: - continue - else: - raise - - if self._blocking_signal_threshold is not None: - signal.setitimer(signal.ITIMER_REAL, - self._blocking_signal_threshold, 0) - - # Pop one fd at a time from the set of pending fds and run - # its handler. Since that handler may perform actions on - # other file descriptors, there may be reentrant calls to - # this IOLoop that modify self._events - self._events.update(event_pairs) - while self._events: - fd, events = self._events.popitem() - try: - fd_obj, handler_func = self._handlers[fd] - handler_func(fd_obj, events) - except (OSError, IOError) as e: - if errno_from_exception(e) == errno.EPIPE: - # Happens when the client closes the connection - pass - else: - self.handle_callback_exception(self._handlers.get(fd)) - except Exception: - self.handle_callback_exception(self._handlers.get(fd)) - fd_obj = handler_func = None - - finally: - # reset the stopped flag so another start/stop pair can be issued - self._stopped = False - if self._blocking_signal_threshold is not None: - signal.setitimer(signal.ITIMER_REAL, 0, 0) - if old_current is None: - IOLoop.clear_current() - elif old_current is not self: - old_current.make_current() - if old_wakeup_fd is not None: - signal.set_wakeup_fd(old_wakeup_fd) - - def stop(self): - self._running = False - self._stopped = True - self._waker.wake() - - def time(self): - return self.time_func() - - def call_at(self, deadline, callback, *args, **kwargs): - timeout = _Timeout( - deadline, - functools.partial(stack_context.wrap(callback), *args, **kwargs), - self) - heapq.heappush(self._timeouts, timeout) - return timeout - - def remove_timeout(self, timeout): - # Removing from a heap is complicated, so just leave the defunct - # timeout object in the queue (see discussion in - # http://docs.python.org/library/heapq.html). - # If this turns out to be a problem, we could add a garbage - # collection pass whenever there are too many dead timeouts. - timeout.callback = None - self._cancellations += 1 - - def add_callback(self, callback, *args, **kwargs): - if self._closing: - return - # Blindly insert into self._callbacks. This is safe even - # from signal handlers because deque.append is atomic. - self._callbacks.append(functools.partial( - stack_context.wrap(callback), *args, **kwargs)) - if thread.get_ident() != self._thread_ident: - # This will write one byte but Waker.consume() reads many - # at once, so it's ok to write even when not strictly - # necessary. - self._waker.wake() - else: - # If we're on the IOLoop's thread, we don't need to wake anyone. - pass - - def add_callback_from_signal(self, callback, *args, **kwargs): - with stack_context.NullContext(): - self.add_callback(callback, *args, **kwargs) - - class _Timeout(object): """An IOLoop timeout, a UNIX timestamp and a callback""" diff --git a/tornado/platform/auto.py b/tornado/platform/auto.py index 1a9133faf..553b70e36 100644 --- a/tornado/platform/auto.py +++ b/tornado/platform/auto.py @@ -27,32 +27,11 @@ from __future__ import absolute_import, division, print_function import os if 'APPENGINE_RUNTIME' in os.environ: - from tornado.platform.common import Waker - def set_close_exec(fd): pass elif os.name == 'nt': - from tornado.platform.common import Waker from tornado.platform.windows import set_close_exec else: - from tornado.platform.posix import set_close_exec, Waker - -try: - # monotime monkey-patches the time module to have a monotonic function - # in versions of python before 3.3. - import monotime - # Silence pyflakes warning about this unused import - monotime -except ImportError: - pass -try: - # monotonic can provide a monotonic function in versions of python before - # 3.3, too. - from monotonic import monotonic as monotonic_time -except ImportError: - try: - from time import monotonic as monotonic_time - except ImportError: - monotonic_time = None + from tornado.platform.posix import set_close_exec -__all__ = ['Waker', 'set_close_exec', 'monotonic_time'] +__all__ = ['set_close_exec'] diff --git a/tornado/platform/common.py b/tornado/platform/common.py deleted file mode 100644 index b597748d1..000000000 --- a/tornado/platform/common.py +++ /dev/null @@ -1,113 +0,0 @@ -"""Lowest-common-denominator implementations of platform functionality.""" -from __future__ import absolute_import, division, print_function - -import errno -import socket -import time - -from tornado.platform import interface -from tornado.util import errno_from_exception - - -def try_close(f): - # Avoid issue #875 (race condition when using the file in another - # thread). - for i in range(10): - try: - f.close() - except IOError: - # Yield to another thread - time.sleep(1e-3) - else: - break - # Try a last time and let raise - f.close() - - -class Waker(interface.Waker): - """Create an OS independent asynchronous pipe. - - For use on platforms that don't have os.pipe() (or where pipes cannot - be passed to select()), but do have sockets. This includes Windows - and Jython. - """ - def __init__(self): - from .auto import set_close_exec - # Based on Zope select_trigger.py: - # https://github.com/zopefoundation/Zope/blob/master/src/ZServer/medusa/thread/select_trigger.py - - self.writer = socket.socket() - set_close_exec(self.writer.fileno()) - # Disable buffering -- pulling the trigger sends 1 byte, - # and we want that sent immediately, to wake up ASAP. - self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - count = 0 - while 1: - count += 1 - # Bind to a local port; for efficiency, let the OS pick - # a free port for us. - # Unfortunately, stress tests showed that we may not - # be able to connect to that port ("Address already in - # use") despite that the OS picked it. This appears - # to be a race bug in the Windows socket implementation. - # So we loop until a connect() succeeds (almost always - # on the first try). See the long thread at - # http://mail.zope.org/pipermail/zope/2005-July/160433.html - # for hideous details. - a = socket.socket() - set_close_exec(a.fileno()) - a.bind(("127.0.0.1", 0)) - a.listen(1) - connect_address = a.getsockname() # assigned (host, port) pair - try: - self.writer.connect(connect_address) - break # success - except socket.error as detail: - if (not hasattr(errno, 'WSAEADDRINUSE') or - errno_from_exception(detail) != errno.WSAEADDRINUSE): - # "Address already in use" is the only error - # I've seen on two WinXP Pro SP2 boxes, under - # Pythons 2.3.5 and 2.4.1. - raise - # (10048, 'Address already in use') - # assert count <= 2 # never triggered in Tim's tests - if count >= 10: # I've never seen it go above 2 - a.close() - self.writer.close() - raise socket.error("Cannot bind trigger!") - # Close `a` and try again. Note: I originally put a short - # sleep() here, but it didn't appear to help or hurt. - a.close() - - self.reader, addr = a.accept() - set_close_exec(self.reader.fileno()) - self.reader.setblocking(0) - self.writer.setblocking(0) - a.close() - self.reader_fd = self.reader.fileno() - - def fileno(self): - return self.reader.fileno() - - def write_fileno(self): - return self.writer.fileno() - - def wake(self): - try: - self.writer.send(b"x") - except (IOError, socket.error, ValueError): - pass - - def consume(self): - try: - while True: - result = self.reader.recv(1024) - if not result: - break - except (IOError, socket.error): - pass - - def close(self): - self.reader.close() - try_close(self.writer) diff --git a/tornado/platform/epoll.py b/tornado/platform/epoll.py deleted file mode 100644 index 4e3461740..000000000 --- a/tornado/platform/epoll.py +++ /dev/null @@ -1,25 +0,0 @@ -# -# Copyright 2012 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""EPoll-based IOLoop implementation for Linux systems.""" -from __future__ import absolute_import, division, print_function - -import select - -from tornado.ioloop import PollIOLoop - - -class EPollIOLoop(PollIOLoop): - def initialize(self, **kwargs): - super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) diff --git a/tornado/platform/interface.py b/tornado/platform/interface.py index cac532646..c2d54e4ce 100644 --- a/tornado/platform/interface.py +++ b/tornado/platform/interface.py @@ -26,41 +26,3 @@ from __future__ import absolute_import, division, print_function def set_close_exec(fd): """Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor.""" raise NotImplementedError() - - -class Waker(object): - """A socket-like object that can wake another thread from ``select()``. - - The `~tornado.ioloop.IOLoop` will add the Waker's `fileno()` to - its ``select`` (or ``epoll`` or ``kqueue``) calls. When another - thread wants to wake up the loop, it calls `wake`. Once it has woken - up, it will call `consume` to do any necessary per-wake cleanup. When - the ``IOLoop`` is closed, it closes its waker too. - """ - def fileno(self): - """Returns the read file descriptor for this waker. - - Must be suitable for use with ``select()`` or equivalent on the - local platform. - """ - raise NotImplementedError() - - def write_fileno(self): - """Returns the write file descriptor for this waker.""" - raise NotImplementedError() - - def wake(self): - """Triggers activity on the waker's file descriptor.""" - raise NotImplementedError() - - def consume(self): - """Called after the listen has woken up to do any necessary cleanup.""" - raise NotImplementedError() - - def close(self): - """Closes the waker's file descriptor(s).""" - raise NotImplementedError() - - -def monotonic_time(): - raise NotImplementedError() diff --git a/tornado/platform/kqueue.py b/tornado/platform/kqueue.py deleted file mode 100644 index 4e0aee02e..000000000 --- a/tornado/platform/kqueue.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Copyright 2012 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""KQueue-based IOLoop implementation for BSD/Mac systems.""" -from __future__ import absolute_import, division, print_function - -import select - -from tornado.ioloop import IOLoop, PollIOLoop - -assert hasattr(select, 'kqueue'), 'kqueue not supported' - - -class _KQueue(object): - """A kqueue-based event loop for BSD/Mac systems.""" - def __init__(self): - self._kqueue = select.kqueue() - self._active = {} - - def fileno(self): - return self._kqueue.fileno() - - def close(self): - self._kqueue.close() - - def register(self, fd, events): - if fd in self._active: - raise IOError("fd %s already registered" % fd) - self._control(fd, events, select.KQ_EV_ADD) - self._active[fd] = events - - def modify(self, fd, events): - self.unregister(fd) - self.register(fd, events) - - def unregister(self, fd): - events = self._active.pop(fd) - self._control(fd, events, select.KQ_EV_DELETE) - - def _control(self, fd, events, flags): - kevents = [] - if events & IOLoop.WRITE: - kevents.append(select.kevent( - fd, filter=select.KQ_FILTER_WRITE, flags=flags)) - if events & IOLoop.READ: - kevents.append(select.kevent( - fd, filter=select.KQ_FILTER_READ, flags=flags)) - # Even though control() takes a list, it seems to return EINVAL - # on Mac OS X (10.6) when there is more than one event in the list. - for kevent in kevents: - self._kqueue.control([kevent], 0) - - def poll(self, timeout): - kevents = self._kqueue.control(None, 1000, timeout) - events = {} - for kevent in kevents: - fd = kevent.ident - if kevent.filter == select.KQ_FILTER_READ: - events[fd] = events.get(fd, 0) | IOLoop.READ - if kevent.filter == select.KQ_FILTER_WRITE: - if kevent.flags & select.KQ_EV_EOF: - # If an asynchronous connection is refused, kqueue - # returns a write event with the EOF flag set. - # Turn this into an error for consistency with the - # other IOLoop implementations. - # Note that for read events, EOF may be returned before - # all data has been consumed from the socket buffer, - # so we only check for EOF on write events. - events[fd] = IOLoop.ERROR - else: - events[fd] = events.get(fd, 0) | IOLoop.WRITE - if kevent.flags & select.KQ_EV_ERROR: - events[fd] = events.get(fd, 0) | IOLoop.ERROR - return events.items() - - -class KQueueIOLoop(PollIOLoop): - def initialize(self, **kwargs): - super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs) diff --git a/tornado/platform/posix.py b/tornado/platform/posix.py index 6fe1fa837..c8571e6b0 100644 --- a/tornado/platform/posix.py +++ b/tornado/platform/posix.py @@ -20,8 +20,6 @@ from __future__ import absolute_import, division, print_function import fcntl import os -from tornado.platform import common, interface - def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) @@ -31,39 +29,3 @@ def set_close_exec(fd): def _set_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - -class Waker(interface.Waker): - def __init__(self): - r, w = os.pipe() - _set_nonblocking(r) - _set_nonblocking(w) - set_close_exec(r) - set_close_exec(w) - self.reader = os.fdopen(r, "rb", 0) - self.writer = os.fdopen(w, "wb", 0) - - def fileno(self): - return self.reader.fileno() - - def write_fileno(self): - return self.writer.fileno() - - def wake(self): - try: - self.writer.write(b"x") - except (IOError, ValueError): - pass - - def consume(self): - try: - while True: - result = self.reader.read() - if not result: - break - except IOError: - pass - - def close(self): - self.reader.close() - common.try_close(self.writer) diff --git a/tornado/platform/select.py b/tornado/platform/select.py deleted file mode 100644 index 14e8a4745..000000000 --- a/tornado/platform/select.py +++ /dev/null @@ -1,75 +0,0 @@ -# -# Copyright 2012 Facebook -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Select-based IOLoop implementation. - -Used as a fallback for systems that don't support epoll or kqueue. -""" -from __future__ import absolute_import, division, print_function - -import select - -from tornado.ioloop import IOLoop, PollIOLoop - - -class _Select(object): - """A simple, select()-based IOLoop implementation for non-Linux systems""" - def __init__(self): - self.read_fds = set() - self.write_fds = set() - self.error_fds = set() - self.fd_sets = (self.read_fds, self.write_fds, self.error_fds) - - def close(self): - pass - - def register(self, fd, events): - if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds: - raise IOError("fd %s already registered" % fd) - if events & IOLoop.READ: - self.read_fds.add(fd) - if events & IOLoop.WRITE: - self.write_fds.add(fd) - if events & IOLoop.ERROR: - self.error_fds.add(fd) - # Closed connections are reported as errors by epoll and kqueue, - # but as zero-byte reads by select, so when errors are requested - # we need to listen for both read and error. - # self.read_fds.add(fd) - - def modify(self, fd, events): - self.unregister(fd) - self.register(fd, events) - - def unregister(self, fd): - self.read_fds.discard(fd) - self.write_fds.discard(fd) - self.error_fds.discard(fd) - - def poll(self, timeout): - readable, writeable, errors = select.select( - self.read_fds, self.write_fds, self.error_fds, timeout) - events = {} - for fd in readable: - events[fd] = events.get(fd, 0) | IOLoop.READ - for fd in writeable: - events[fd] = events.get(fd, 0) | IOLoop.WRITE - for fd in errors: - events[fd] = events.get(fd, 0) | IOLoop.ERROR - return events.items() - - -class SelectIOLoop(PollIOLoop): - def initialize(self, **kwargs): - super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs) diff --git a/tornado/platform/twisted.py b/tornado/platform/twisted.py index b38a755c8..344068b2c 100644 --- a/tornado/platform/twisted.py +++ b/tornado/platform/twisted.py @@ -23,16 +23,14 @@ This module has been tested with Twisted versions 11.0.0 and newer. from __future__ import absolute_import, division, print_function -import datetime import functools -import numbers import socket import sys import twisted.internet.abstract # type: ignore from twisted.internet.defer import Deferred # type: ignore from twisted.internet.posixbase import PosixReactorBase # type: ignore -from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor # type: ignore # noqa: E501 +from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime # type: ignore from twisted.python import failure, log # type: ignore from twisted.internet import error # type: ignore import twisted.names.cache # type: ignore @@ -48,9 +46,8 @@ from tornado import gen import tornado.ioloop from tornado.log import app_log from tornado.netutil import Resolver -from tornado.stack_context import NullContext, wrap +from tornado.stack_context import NullContext from tornado.ioloop import IOLoop -from tornado.util import timedelta_to_seconds @implementer(IDelayedCall) @@ -368,167 +365,6 @@ def install(): return reactor -@implementer(IReadDescriptor, IWriteDescriptor) -class _FD(object): - def __init__(self, fd, fileobj, handler): - self.fd = fd - self.fileobj = fileobj - self.handler = handler - self.reading = False - self.writing = False - self.lost = False - - def fileno(self): - return self.fd - - def doRead(self): - if not self.lost: - self.handler(self.fileobj, tornado.ioloop.IOLoop.READ) - - def doWrite(self): - if not self.lost: - self.handler(self.fileobj, tornado.ioloop.IOLoop.WRITE) - - def connectionLost(self, reason): - if not self.lost: - self.handler(self.fileobj, tornado.ioloop.IOLoop.ERROR) - self.lost = True - - writeConnectionLost = readConnectionLost = connectionLost - - def logPrefix(self): - return '' - - -class TwistedIOLoop(tornado.ioloop.IOLoop): - """IOLoop implementation that runs on Twisted. - - `TwistedIOLoop` implements the Tornado IOLoop interface on top of - the Twisted reactor. Recommended usage:: - - from tornado.platform.twisted import TwistedIOLoop - from twisted.internet import reactor - TwistedIOLoop().install() - # Set up your tornado application as usual using `IOLoop.instance` - reactor.run() - - Uses the global Twisted reactor by default. To create multiple - ``TwistedIOLoops`` in the same process, you must pass a unique reactor - when constructing each one. - - Not compatible with `tornado.process.Subprocess.set_exit_callback` - because the ``SIGCHLD`` handlers used by Tornado and Twisted conflict - with each other. - - See also :meth:`tornado.ioloop.IOLoop.install` for general notes on - installing alternative IOLoops. - - .. deprecated:: 5.1 - - The `asyncio` event loop will be the only available implementation in - Tornado 6.0. - """ - def initialize(self, reactor=None, **kwargs): - super(TwistedIOLoop, self).initialize(**kwargs) - if reactor is None: - import twisted.internet.reactor # type: ignore - reactor = twisted.internet.reactor - self.reactor = reactor - self.fds = {} - - def close(self, all_fds=False): - fds = self.fds - self.reactor.removeAll() - for c in self.reactor.getDelayedCalls(): - c.cancel() - if all_fds: - for fd in fds.values(): - self.close_fd(fd.fileobj) - - def add_handler(self, fd, handler, events): - if fd in self.fds: - raise ValueError('fd %s added twice' % fd) - fd, fileobj = self.split_fd(fd) - self.fds[fd] = _FD(fd, fileobj, wrap(handler)) - if events & tornado.ioloop.IOLoop.READ: - self.fds[fd].reading = True - self.reactor.addReader(self.fds[fd]) - if events & tornado.ioloop.IOLoop.WRITE: - self.fds[fd].writing = True - self.reactor.addWriter(self.fds[fd]) - - def update_handler(self, fd, events): - fd, fileobj = self.split_fd(fd) - if events & tornado.ioloop.IOLoop.READ: - if not self.fds[fd].reading: - self.fds[fd].reading = True - self.reactor.addReader(self.fds[fd]) - else: - if self.fds[fd].reading: - self.fds[fd].reading = False - self.reactor.removeReader(self.fds[fd]) - if events & tornado.ioloop.IOLoop.WRITE: - if not self.fds[fd].writing: - self.fds[fd].writing = True - self.reactor.addWriter(self.fds[fd]) - else: - if self.fds[fd].writing: - self.fds[fd].writing = False - self.reactor.removeWriter(self.fds[fd]) - - def remove_handler(self, fd): - fd, fileobj = self.split_fd(fd) - if fd not in self.fds: - return - self.fds[fd].lost = True - if self.fds[fd].reading: - self.reactor.removeReader(self.fds[fd]) - if self.fds[fd].writing: - self.reactor.removeWriter(self.fds[fd]) - del self.fds[fd] - - def start(self): - old_current = IOLoop.current(instance=False) - try: - self._setup_logging() - self.make_current() - self.reactor.run() - finally: - if old_current is None: - IOLoop.clear_current() - else: - old_current.make_current() - - def stop(self): - self.reactor.crash() - - def add_timeout(self, deadline, callback, *args, **kwargs): - # This method could be simplified (since tornado 4.0) by - # overriding call_at instead of add_timeout, but we leave it - # for now as a test of backwards-compatibility. - if isinstance(deadline, numbers.Real): - delay = max(deadline - self.time(), 0) - elif isinstance(deadline, datetime.timedelta): - delay = timedelta_to_seconds(deadline) - else: - raise TypeError("Unsupported deadline %r") - return self.reactor.callLater( - delay, self._run_callback, - functools.partial(wrap(callback), *args, **kwargs)) - - def remove_timeout(self, timeout): - if timeout.active(): - timeout.cancel() - - def add_callback(self, callback, *args, **kwargs): - self.reactor.callFromThread( - self._run_callback, - functools.partial(wrap(callback), *args, **kwargs)) - - def add_callback_from_signal(self, callback, *args, **kwargs): - self.add_callback(callback, *args, **kwargs) - - class TwistedResolver(Resolver): """Twisted-based asynchronous resolver. diff --git a/tornado/test/ioloop_test.py b/tornado/test/ioloop_test.py index b38d0ba32..aec568be2 100644 --- a/tornado/test/ioloop_test.py +++ b/tornado/test/ioloop_test.py @@ -20,9 +20,8 @@ except ImportError: from tornado.escape import native_str from tornado import gen -from tornado.ioloop import IOLoop, TimeoutError, PollIOLoop, PeriodicCallback +from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback from tornado.log import app_log -from tornado.platform.select import _Select from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog, gen_test from tornado.test.util import (unittest, skipIfNonUnix, skipOnTravis, @@ -44,42 +43,6 @@ except ImportError: twisted = None -class FakeTimeSelect(_Select): - def __init__(self): - self._time = 1000 - super(FakeTimeSelect, self).__init__() - - def time(self): - return self._time - - def sleep(self, t): - self._time += t - - def poll(self, timeout): - events = super(FakeTimeSelect, self).poll(0) - if events: - return events - self._time += timeout - return [] - - -class FakeTimeIOLoop(PollIOLoop): - """IOLoop implementation with a fake and deterministic clock. - - The clock advances as needed to trigger timeouts immediately. - For use when testing code that involves the passage of time - and no external dependencies. - """ - def initialize(self): - self.fts = FakeTimeSelect() - super(FakeTimeIOLoop, self).initialize(impl=self.fts, - time_func=self.fts.time) - - def sleep(self, t): - """Simulate a blocking sleep by advancing the clock.""" - self.fts.sleep(t) - - class TestIOLoop(AsyncTestCase): def test_add_callback_return_sequence(self): # A callback returning {} or [] shouldn't spin the CPU, see Issue #1803. @@ -278,7 +241,7 @@ class TestIOLoop(AsyncTestCase): self.io_loop.call_later(0, self.stop) self.wait() # The asyncio event loop does not guarantee the order of these - # callbacks, but PollIOLoop does. + # callbacks. self.assertEqual(sorted(results), [1, 2, 3, 4]) def test_add_timeout_return(self): @@ -751,66 +714,6 @@ class TestIOLoopRunSync(unittest.TestCase): self.io_loop.run_sync(namespace['f2']) -@unittest.skipIf(asyncio is not None, - 'IOLoop configuration not available') -class TestPeriodicCallback(unittest.TestCase): - def setUp(self): - self.io_loop = FakeTimeIOLoop() - self.io_loop.make_current() - - def tearDown(self): - self.io_loop.close() - - def test_basic(self): - calls = [] - - def cb(): - calls.append(self.io_loop.time()) - pc = PeriodicCallback(cb, 10000) - pc.start() - self.io_loop.call_later(50, self.io_loop.stop) - self.io_loop.start() - self.assertEqual(calls, [1010, 1020, 1030, 1040, 1050]) - - def test_overrun(self): - sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0] - expected = [ - 1010, 1020, 1030, # first 3 calls on schedule - 1050, 1070, # next 2 delayed one cycle - 1100, 1130, # next 2 delayed 2 cycles - 1170, 1210, # next 2 delayed 3 cycles - 1220, 1230, # then back on schedule. - ] - calls = [] - - def cb(): - calls.append(self.io_loop.time()) - if not sleep_durations: - self.io_loop.stop() - return - self.io_loop.sleep(sleep_durations.pop(0)) - pc = PeriodicCallback(cb, 10000) - pc.start() - self.io_loop.start() - self.assertEqual(calls, expected) - - def test_io_loop_set_at_start(self): - # Check PeriodicCallback uses the current IOLoop at start() time, - # not at instantiation time. - calls = [] - io_loop = FakeTimeIOLoop() - - def cb(): - calls.append(io_loop.time()) - pc = PeriodicCallback(cb, 10000) - io_loop.make_current() - pc.start() - io_loop.call_later(50, io_loop.stop) - io_loop.start() - self.assertEqual(calls, [1010, 1020, 1030, 1040, 1050]) - io_loop.close() - - class TestPeriodicCallbackMath(unittest.TestCase): def simulate_calls(self, pc, durations): """Simulate a series of calls to the PeriodicCallback. @@ -884,24 +787,18 @@ class TestPeriodicCallbackMath(unittest.TestCase): class TestIOLoopConfiguration(unittest.TestCase): def run_python(self, *statements): statements = [ - 'from tornado.ioloop import IOLoop, PollIOLoop', + 'from tornado.ioloop import IOLoop', 'classname = lambda x: x.__class__.__name__', ] + list(statements) args = [sys.executable, '-c', '; '.join(statements)] return native_str(subprocess.check_output(args)).strip() def test_default(self): - if asyncio is not None: - # When asyncio is available, it is used by default. - cls = self.run_python('print(classname(IOLoop.current()))') - self.assertEqual(cls, 'AsyncIOMainLoop') - cls = self.run_python('print(classname(IOLoop()))') - self.assertEqual(cls, 'AsyncIOLoop') - else: - # Otherwise, the default is a subclass of PollIOLoop - is_poll = self.run_python( - 'print(isinstance(IOLoop.current(), PollIOLoop))') - self.assertEqual(is_poll, 'True') + # When asyncio is available, it is used by default. + cls = self.run_python('print(classname(IOLoop.current()))') + self.assertEqual(cls, 'AsyncIOMainLoop') + cls = self.run_python('print(classname(IOLoop()))') + self.assertEqual(cls, 'AsyncIOLoop') @unittest.skipIf(asyncio is not None, "IOLoop configuration not available") diff --git a/tornado/test/twisted_test.py b/tornado/test/twisted_test.py index 4cc062971..da688ddfc 100644 --- a/tornado/test/twisted_test.py +++ b/tornado/test/twisted_test.py @@ -32,7 +32,7 @@ from tornado.escape import utf8 from tornado import gen from tornado.httpclient import AsyncHTTPClient from tornado.httpserver import HTTPServer -from tornado.ioloop import IOLoop, PollIOLoop +from tornado.ioloop import IOLoop from tornado.platform.auto import set_close_exec from tornado.testing import bind_unused_port from tornado.test.util import unittest @@ -688,42 +688,5 @@ if have_twisted: else: globalLogBeginner.beginLoggingTo([], redirectStandardIO=False) -if have_twisted: - class LayeredTwistedIOLoop(TwistedIOLoop): - """Layers a TwistedIOLoop on top of a TornadoReactor on a PollIOLoop. - - This is of course silly, but is useful for testing purposes to make - sure we're implementing both sides of the various interfaces - correctly. In some tests another TornadoReactor is layered on top - of the whole stack. - """ - def initialize(self, **kwargs): - self.real_io_loop = PollIOLoop(make_current=False) # type: ignore - reactor = self.real_io_loop.run_sync(gen.coroutine(TornadoReactor)) - super(LayeredTwistedIOLoop, self).initialize(reactor=reactor, **kwargs) - self.add_callback(self.make_current) - - def close(self, all_fds=False): - super(LayeredTwistedIOLoop, self).close(all_fds=all_fds) - # HACK: This is the same thing that test_class.unbuildReactor does. - for reader in self.reactor._internalReaders: - self.reactor.removeReader(reader) - reader.connectionLost(None) - self.real_io_loop.close(all_fds=all_fds) - - def stop(self): - # One of twisted's tests fails if I don't delay crash() - # until the reactor has started, but if I move this to - # TwistedIOLoop then the tests fail when I'm *not* running - # tornado-on-twisted-on-tornado. I'm clearly missing something - # about the startup/crash semantics, but since stop and crash - # are really only used in tests it doesn't really matter. - def f(): - self.reactor.crash() - # Become current again on restart. This is needed to - # override real_io_loop's claim to being the current loop. - self.add_callback(self.make_current) - self.reactor.callWhenRunning(f) - if __name__ == "__main__": unittest.main()