This was py2-only and is no longer used.
# tornado.gen
"Runner",
- # tornado.ioloop
- "PollIOLoop",
-
# tornado.web
"ChunkedTransferEncoding",
"GZipContentEncoding",
+++ /dev/null
-[tox]
-envlist = py27
-setupdir = ../../..
-
-[testenv]
-commands =
- python -m tornado.test.runtests --ioloop=tornaduv.UVLoop {posargs:}
-# twisted tests don't work on pyuv IOLoop currently.
-deps =
- pyuv
- tornaduv
- futures
- mock
from __future__ import absolute_import, division, print_function
-import collections
import datetime
-import errno
-import functools
-import heapq
-import itertools
import logging
import numbers
import os
-import select
import sys
import threading
import time
from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info, future_add_done_callback # noqa: E501
from tornado.log import app_log, gen_log
-from tornado.platform.auto import set_close_exec, Waker
from tornado import stack_context
-from tornado.util import (
- PY3, Configurable, errno_from_exception, timedelta_to_seconds,
- TimeoutError, unicode_type, import_object,
-)
+from tornado.util import Configurable, timedelta_to_seconds, TimeoutError, unicode_type, import_object
try:
import signal
except ImportError:
ThreadPoolExecutor = None
-if PY3:
- import _thread as thread
-else:
- import thread
-
try:
import asyncio
except ImportError:
asyncio = None
-_POLL_TIMEOUT = 3600.0
-
-
class IOLoop(Configurable):
"""A level-triggered I/O loop.
@classmethod
def configurable_default(cls):
- if asyncio is not None:
- from tornado.platform.asyncio import AsyncIOLoop
- return AsyncIOLoop
- return PollIOLoop
+ from tornado.platform.asyncio import AsyncIOLoop
+ return AsyncIOLoop
def initialize(self, make_current=None):
if make_current is None:
pass
-class PollIOLoop(IOLoop):
- """Base class for IOLoops built around a select-like function.
-
- For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
- (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
- `tornado.platform.select.SelectIOLoop` (all platforms).
- """
- def initialize(self, impl, time_func=None, **kwargs):
- super(PollIOLoop, self).initialize(**kwargs)
- self._impl = impl
- if hasattr(self._impl, 'fileno'):
- set_close_exec(self._impl.fileno())
- self.time_func = time_func or time.time
- self._handlers = {}
- self._events = {}
- self._callbacks = collections.deque()
- self._timeouts = []
- self._cancellations = 0
- self._running = False
- self._stopped = False
- self._closing = False
- self._thread_ident = None
- self._pid = os.getpid()
- self._blocking_signal_threshold = None
- self._timeout_counter = itertools.count()
-
- # Create a pipe that we send bogus data to when we want to wake
- # the I/O loop when it is idle
- self._waker = Waker()
- self.add_handler(self._waker.fileno(),
- lambda fd, events: self._waker.consume(),
- self.READ)
-
- @classmethod
- def configurable_base(cls):
- return PollIOLoop
-
- @classmethod
- def configurable_default(cls):
- if hasattr(select, "epoll"):
- from tornado.platform.epoll import EPollIOLoop
- return EPollIOLoop
- if hasattr(select, "kqueue"):
- # Python 2.6+ on BSD or Mac
- from tornado.platform.kqueue import KQueueIOLoop
- return KQueueIOLoop
- from tornado.platform.select import SelectIOLoop
- return SelectIOLoop
-
- def close(self, all_fds=False):
- self._closing = True
- self.remove_handler(self._waker.fileno())
- if all_fds:
- for fd, handler in list(self._handlers.values()):
- self.close_fd(fd)
- self._waker.close()
- self._impl.close()
- self._callbacks = None
- self._timeouts = None
- if hasattr(self, '_executor'):
- self._executor.shutdown()
-
- def add_handler(self, fd, handler, events):
- fd, obj = self.split_fd(fd)
- self._handlers[fd] = (obj, stack_context.wrap(handler))
- self._impl.register(fd, events | self.ERROR)
-
- def update_handler(self, fd, events):
- fd, obj = self.split_fd(fd)
- self._impl.modify(fd, events | self.ERROR)
-
- def remove_handler(self, fd):
- fd, obj = self.split_fd(fd)
- self._handlers.pop(fd, None)
- self._events.pop(fd, None)
- try:
- self._impl.unregister(fd)
- except Exception:
- gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
-
- def set_blocking_signal_threshold(self, seconds, action):
- if not hasattr(signal, "setitimer"):
- gen_log.error("set_blocking_signal_threshold requires a signal module "
- "with the setitimer method")
- return
- self._blocking_signal_threshold = seconds
- if seconds is not None:
- signal.signal(signal.SIGALRM,
- action if action is not None else signal.SIG_DFL)
-
- def start(self):
- if self._running:
- raise RuntimeError("IOLoop is already running")
- if os.getpid() != self._pid:
- raise RuntimeError("Cannot share PollIOLoops across processes")
- self._setup_logging()
- if self._stopped:
- self._stopped = False
- return
- old_current = IOLoop.current(instance=False)
- if old_current is not self:
- self.make_current()
- self._thread_ident = thread.get_ident()
- self._running = True
-
- # signal.set_wakeup_fd closes a race condition in event loops:
- # a signal may arrive at the beginning of select/poll/etc
- # before it goes into its interruptible sleep, so the signal
- # will be consumed without waking the select. The solution is
- # for the (C, synchronous) signal handler to write to a pipe,
- # which will then be seen by select.
- #
- # In python's signal handling semantics, this only matters on the
- # main thread (fortunately, set_wakeup_fd only works on the main
- # thread and will raise a ValueError otherwise).
- #
- # If someone has already set a wakeup fd, we don't want to
- # disturb it. This is an issue for twisted, which does its
- # SIGCHLD processing in response to its own wakeup fd being
- # written to. As long as the wakeup fd is registered on the IOLoop,
- # the loop will still wake up and everything should work.
- old_wakeup_fd = None
- if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
- # requires python 2.6+, unix. set_wakeup_fd exists but crashes
- # the python process on windows.
- try:
- old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
- if old_wakeup_fd != -1:
- # Already set, restore previous value. This is a little racy,
- # but there's no clean get_wakeup_fd and in real use the
- # IOLoop is just started once at the beginning.
- signal.set_wakeup_fd(old_wakeup_fd)
- old_wakeup_fd = None
- except ValueError:
- # Non-main thread, or the previous value of wakeup_fd
- # is no longer valid.
- old_wakeup_fd = None
-
- try:
- while True:
- # Prevent IO event starvation by delaying new callbacks
- # to the next iteration of the event loop.
- ncallbacks = len(self._callbacks)
-
- # Add any timeouts that have come due to the callback list.
- # Do not run anything until we have determined which ones
- # are ready, so timeouts that call add_timeout cannot
- # schedule anything in this iteration.
- due_timeouts = []
- if self._timeouts:
- now = self.time()
- while self._timeouts:
- if self._timeouts[0].callback is None:
- # The timeout was cancelled. Note that the
- # cancellation check is repeated below for timeouts
- # that are cancelled by another timeout or callback.
- heapq.heappop(self._timeouts)
- self._cancellations -= 1
- elif self._timeouts[0].deadline <= now:
- due_timeouts.append(heapq.heappop(self._timeouts))
- else:
- break
- if (self._cancellations > 512 and
- self._cancellations > (len(self._timeouts) >> 1)):
- # Clean up the timeout queue when it gets large and it's
- # more than half cancellations.
- self._cancellations = 0
- self._timeouts = [x for x in self._timeouts
- if x.callback is not None]
- heapq.heapify(self._timeouts)
-
- for i in range(ncallbacks):
- self._run_callback(self._callbacks.popleft())
- for timeout in due_timeouts:
- if timeout.callback is not None:
- self._run_callback(timeout.callback)
- # Closures may be holding on to a lot of memory, so allow
- # them to be freed before we go into our poll wait.
- due_timeouts = timeout = None
-
- if self._callbacks:
- # If any callbacks or timeouts called add_callback,
- # we don't want to wait in poll() before we run them.
- poll_timeout = 0.0
- elif self._timeouts:
- # If there are any timeouts, schedule the first one.
- # Use self.time() instead of 'now' to account for time
- # spent running callbacks.
- poll_timeout = self._timeouts[0].deadline - self.time()
- poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
- else:
- # No timeouts and no callbacks, so use the default.
- poll_timeout = _POLL_TIMEOUT
-
- if not self._running:
- break
-
- if self._blocking_signal_threshold is not None:
- # clear alarm so it doesn't fire while poll is waiting for
- # events.
- signal.setitimer(signal.ITIMER_REAL, 0, 0)
-
- try:
- event_pairs = self._impl.poll(poll_timeout)
- except Exception as e:
- # Depending on python version and IOLoop implementation,
- # different exception types may be thrown and there are
- # two ways EINTR might be signaled:
- # * e.errno == errno.EINTR
- # * e.args is like (errno.EINTR, 'Interrupted system call')
- if errno_from_exception(e) == errno.EINTR:
- continue
- else:
- raise
-
- if self._blocking_signal_threshold is not None:
- signal.setitimer(signal.ITIMER_REAL,
- self._blocking_signal_threshold, 0)
-
- # Pop one fd at a time from the set of pending fds and run
- # its handler. Since that handler may perform actions on
- # other file descriptors, there may be reentrant calls to
- # this IOLoop that modify self._events
- self._events.update(event_pairs)
- while self._events:
- fd, events = self._events.popitem()
- try:
- fd_obj, handler_func = self._handlers[fd]
- handler_func(fd_obj, events)
- except (OSError, IOError) as e:
- if errno_from_exception(e) == errno.EPIPE:
- # Happens when the client closes the connection
- pass
- else:
- self.handle_callback_exception(self._handlers.get(fd))
- except Exception:
- self.handle_callback_exception(self._handlers.get(fd))
- fd_obj = handler_func = None
-
- finally:
- # reset the stopped flag so another start/stop pair can be issued
- self._stopped = False
- if self._blocking_signal_threshold is not None:
- signal.setitimer(signal.ITIMER_REAL, 0, 0)
- if old_current is None:
- IOLoop.clear_current()
- elif old_current is not self:
- old_current.make_current()
- if old_wakeup_fd is not None:
- signal.set_wakeup_fd(old_wakeup_fd)
-
- def stop(self):
- self._running = False
- self._stopped = True
- self._waker.wake()
-
- def time(self):
- return self.time_func()
-
- def call_at(self, deadline, callback, *args, **kwargs):
- timeout = _Timeout(
- deadline,
- functools.partial(stack_context.wrap(callback), *args, **kwargs),
- self)
- heapq.heappush(self._timeouts, timeout)
- return timeout
-
- def remove_timeout(self, timeout):
- # Removing from a heap is complicated, so just leave the defunct
- # timeout object in the queue (see discussion in
- # http://docs.python.org/library/heapq.html).
- # If this turns out to be a problem, we could add a garbage
- # collection pass whenever there are too many dead timeouts.
- timeout.callback = None
- self._cancellations += 1
-
- def add_callback(self, callback, *args, **kwargs):
- if self._closing:
- return
- # Blindly insert into self._callbacks. This is safe even
- # from signal handlers because deque.append is atomic.
- self._callbacks.append(functools.partial(
- stack_context.wrap(callback), *args, **kwargs))
- if thread.get_ident() != self._thread_ident:
- # This will write one byte but Waker.consume() reads many
- # at once, so it's ok to write even when not strictly
- # necessary.
- self._waker.wake()
- else:
- # If we're on the IOLoop's thread, we don't need to wake anyone.
- pass
-
- def add_callback_from_signal(self, callback, *args, **kwargs):
- with stack_context.NullContext():
- self.add_callback(callback, *args, **kwargs)
-
-
class _Timeout(object):
"""An IOLoop timeout, a UNIX timestamp and a callback"""
import os
if 'APPENGINE_RUNTIME' in os.environ:
- from tornado.platform.common import Waker
-
def set_close_exec(fd):
pass
elif os.name == 'nt':
- from tornado.platform.common import Waker
from tornado.platform.windows import set_close_exec
else:
- from tornado.platform.posix import set_close_exec, Waker
-
-try:
- # monotime monkey-patches the time module to have a monotonic function
- # in versions of python before 3.3.
- import monotime
- # Silence pyflakes warning about this unused import
- monotime
-except ImportError:
- pass
-try:
- # monotonic can provide a monotonic function in versions of python before
- # 3.3, too.
- from monotonic import monotonic as monotonic_time
-except ImportError:
- try:
- from time import monotonic as monotonic_time
- except ImportError:
- monotonic_time = None
+ from tornado.platform.posix import set_close_exec
-__all__ = ['Waker', 'set_close_exec', 'monotonic_time']
+__all__ = ['set_close_exec']
+++ /dev/null
-"""Lowest-common-denominator implementations of platform functionality."""
-from __future__ import absolute_import, division, print_function
-
-import errno
-import socket
-import time
-
-from tornado.platform import interface
-from tornado.util import errno_from_exception
-
-
-def try_close(f):
- # Avoid issue #875 (race condition when using the file in another
- # thread).
- for i in range(10):
- try:
- f.close()
- except IOError:
- # Yield to another thread
- time.sleep(1e-3)
- else:
- break
- # Try a last time and let raise
- f.close()
-
-
-class Waker(interface.Waker):
- """Create an OS independent asynchronous pipe.
-
- For use on platforms that don't have os.pipe() (or where pipes cannot
- be passed to select()), but do have sockets. This includes Windows
- and Jython.
- """
- def __init__(self):
- from .auto import set_close_exec
- # Based on Zope select_trigger.py:
- # https://github.com/zopefoundation/Zope/blob/master/src/ZServer/medusa/thread/select_trigger.py
-
- self.writer = socket.socket()
- set_close_exec(self.writer.fileno())
- # Disable buffering -- pulling the trigger sends 1 byte,
- # and we want that sent immediately, to wake up ASAP.
- self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-
- count = 0
- while 1:
- count += 1
- # Bind to a local port; for efficiency, let the OS pick
- # a free port for us.
- # Unfortunately, stress tests showed that we may not
- # be able to connect to that port ("Address already in
- # use") despite that the OS picked it. This appears
- # to be a race bug in the Windows socket implementation.
- # So we loop until a connect() succeeds (almost always
- # on the first try). See the long thread at
- # http://mail.zope.org/pipermail/zope/2005-July/160433.html
- # for hideous details.
- a = socket.socket()
- set_close_exec(a.fileno())
- a.bind(("127.0.0.1", 0))
- a.listen(1)
- connect_address = a.getsockname() # assigned (host, port) pair
- try:
- self.writer.connect(connect_address)
- break # success
- except socket.error as detail:
- if (not hasattr(errno, 'WSAEADDRINUSE') or
- errno_from_exception(detail) != errno.WSAEADDRINUSE):
- # "Address already in use" is the only error
- # I've seen on two WinXP Pro SP2 boxes, under
- # Pythons 2.3.5 and 2.4.1.
- raise
- # (10048, 'Address already in use')
- # assert count <= 2 # never triggered in Tim's tests
- if count >= 10: # I've never seen it go above 2
- a.close()
- self.writer.close()
- raise socket.error("Cannot bind trigger!")
- # Close `a` and try again. Note: I originally put a short
- # sleep() here, but it didn't appear to help or hurt.
- a.close()
-
- self.reader, addr = a.accept()
- set_close_exec(self.reader.fileno())
- self.reader.setblocking(0)
- self.writer.setblocking(0)
- a.close()
- self.reader_fd = self.reader.fileno()
-
- def fileno(self):
- return self.reader.fileno()
-
- def write_fileno(self):
- return self.writer.fileno()
-
- def wake(self):
- try:
- self.writer.send(b"x")
- except (IOError, socket.error, ValueError):
- pass
-
- def consume(self):
- try:
- while True:
- result = self.reader.recv(1024)
- if not result:
- break
- except (IOError, socket.error):
- pass
-
- def close(self):
- self.reader.close()
- try_close(self.writer)
+++ /dev/null
-#
-# Copyright 2012 Facebook
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""EPoll-based IOLoop implementation for Linux systems."""
-from __future__ import absolute_import, division, print_function
-
-import select
-
-from tornado.ioloop import PollIOLoop
-
-
-class EPollIOLoop(PollIOLoop):
- def initialize(self, **kwargs):
- super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
def set_close_exec(fd):
"""Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor."""
raise NotImplementedError()
-
-
-class Waker(object):
- """A socket-like object that can wake another thread from ``select()``.
-
- The `~tornado.ioloop.IOLoop` will add the Waker's `fileno()` to
- its ``select`` (or ``epoll`` or ``kqueue``) calls. When another
- thread wants to wake up the loop, it calls `wake`. Once it has woken
- up, it will call `consume` to do any necessary per-wake cleanup. When
- the ``IOLoop`` is closed, it closes its waker too.
- """
- def fileno(self):
- """Returns the read file descriptor for this waker.
-
- Must be suitable for use with ``select()`` or equivalent on the
- local platform.
- """
- raise NotImplementedError()
-
- def write_fileno(self):
- """Returns the write file descriptor for this waker."""
- raise NotImplementedError()
-
- def wake(self):
- """Triggers activity on the waker's file descriptor."""
- raise NotImplementedError()
-
- def consume(self):
- """Called after the listen has woken up to do any necessary cleanup."""
- raise NotImplementedError()
-
- def close(self):
- """Closes the waker's file descriptor(s)."""
- raise NotImplementedError()
-
-
-def monotonic_time():
- raise NotImplementedError()
+++ /dev/null
-#
-# Copyright 2012 Facebook
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""KQueue-based IOLoop implementation for BSD/Mac systems."""
-from __future__ import absolute_import, division, print_function
-
-import select
-
-from tornado.ioloop import IOLoop, PollIOLoop
-
-assert hasattr(select, 'kqueue'), 'kqueue not supported'
-
-
-class _KQueue(object):
- """A kqueue-based event loop for BSD/Mac systems."""
- def __init__(self):
- self._kqueue = select.kqueue()
- self._active = {}
-
- def fileno(self):
- return self._kqueue.fileno()
-
- def close(self):
- self._kqueue.close()
-
- def register(self, fd, events):
- if fd in self._active:
- raise IOError("fd %s already registered" % fd)
- self._control(fd, events, select.KQ_EV_ADD)
- self._active[fd] = events
-
- def modify(self, fd, events):
- self.unregister(fd)
- self.register(fd, events)
-
- def unregister(self, fd):
- events = self._active.pop(fd)
- self._control(fd, events, select.KQ_EV_DELETE)
-
- def _control(self, fd, events, flags):
- kevents = []
- if events & IOLoop.WRITE:
- kevents.append(select.kevent(
- fd, filter=select.KQ_FILTER_WRITE, flags=flags))
- if events & IOLoop.READ:
- kevents.append(select.kevent(
- fd, filter=select.KQ_FILTER_READ, flags=flags))
- # Even though control() takes a list, it seems to return EINVAL
- # on Mac OS X (10.6) when there is more than one event in the list.
- for kevent in kevents:
- self._kqueue.control([kevent], 0)
-
- def poll(self, timeout):
- kevents = self._kqueue.control(None, 1000, timeout)
- events = {}
- for kevent in kevents:
- fd = kevent.ident
- if kevent.filter == select.KQ_FILTER_READ:
- events[fd] = events.get(fd, 0) | IOLoop.READ
- if kevent.filter == select.KQ_FILTER_WRITE:
- if kevent.flags & select.KQ_EV_EOF:
- # If an asynchronous connection is refused, kqueue
- # returns a write event with the EOF flag set.
- # Turn this into an error for consistency with the
- # other IOLoop implementations.
- # Note that for read events, EOF may be returned before
- # all data has been consumed from the socket buffer,
- # so we only check for EOF on write events.
- events[fd] = IOLoop.ERROR
- else:
- events[fd] = events.get(fd, 0) | IOLoop.WRITE
- if kevent.flags & select.KQ_EV_ERROR:
- events[fd] = events.get(fd, 0) | IOLoop.ERROR
- return events.items()
-
-
-class KQueueIOLoop(PollIOLoop):
- def initialize(self, **kwargs):
- super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs)
import fcntl
import os
-from tornado.platform import common, interface
-
def set_close_exec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
-
-class Waker(interface.Waker):
- def __init__(self):
- r, w = os.pipe()
- _set_nonblocking(r)
- _set_nonblocking(w)
- set_close_exec(r)
- set_close_exec(w)
- self.reader = os.fdopen(r, "rb", 0)
- self.writer = os.fdopen(w, "wb", 0)
-
- def fileno(self):
- return self.reader.fileno()
-
- def write_fileno(self):
- return self.writer.fileno()
-
- def wake(self):
- try:
- self.writer.write(b"x")
- except (IOError, ValueError):
- pass
-
- def consume(self):
- try:
- while True:
- result = self.reader.read()
- if not result:
- break
- except IOError:
- pass
-
- def close(self):
- self.reader.close()
- common.try_close(self.writer)
+++ /dev/null
-#
-# Copyright 2012 Facebook
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""Select-based IOLoop implementation.
-
-Used as a fallback for systems that don't support epoll or kqueue.
-"""
-from __future__ import absolute_import, division, print_function
-
-import select
-
-from tornado.ioloop import IOLoop, PollIOLoop
-
-
-class _Select(object):
- """A simple, select()-based IOLoop implementation for non-Linux systems"""
- def __init__(self):
- self.read_fds = set()
- self.write_fds = set()
- self.error_fds = set()
- self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
-
- def close(self):
- pass
-
- def register(self, fd, events):
- if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds:
- raise IOError("fd %s already registered" % fd)
- if events & IOLoop.READ:
- self.read_fds.add(fd)
- if events & IOLoop.WRITE:
- self.write_fds.add(fd)
- if events & IOLoop.ERROR:
- self.error_fds.add(fd)
- # Closed connections are reported as errors by epoll and kqueue,
- # but as zero-byte reads by select, so when errors are requested
- # we need to listen for both read and error.
- # self.read_fds.add(fd)
-
- def modify(self, fd, events):
- self.unregister(fd)
- self.register(fd, events)
-
- def unregister(self, fd):
- self.read_fds.discard(fd)
- self.write_fds.discard(fd)
- self.error_fds.discard(fd)
-
- def poll(self, timeout):
- readable, writeable, errors = select.select(
- self.read_fds, self.write_fds, self.error_fds, timeout)
- events = {}
- for fd in readable:
- events[fd] = events.get(fd, 0) | IOLoop.READ
- for fd in writeable:
- events[fd] = events.get(fd, 0) | IOLoop.WRITE
- for fd in errors:
- events[fd] = events.get(fd, 0) | IOLoop.ERROR
- return events.items()
-
-
-class SelectIOLoop(PollIOLoop):
- def initialize(self, **kwargs):
- super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)
from __future__ import absolute_import, division, print_function
-import datetime
import functools
-import numbers
import socket
import sys
import twisted.internet.abstract # type: ignore
from twisted.internet.defer import Deferred # type: ignore
from twisted.internet.posixbase import PosixReactorBase # type: ignore
-from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor # type: ignore # noqa: E501
+from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime # type: ignore
from twisted.python import failure, log # type: ignore
from twisted.internet import error # type: ignore
import twisted.names.cache # type: ignore
import tornado.ioloop
from tornado.log import app_log
from tornado.netutil import Resolver
-from tornado.stack_context import NullContext, wrap
+from tornado.stack_context import NullContext
from tornado.ioloop import IOLoop
-from tornado.util import timedelta_to_seconds
@implementer(IDelayedCall)
return reactor
-@implementer(IReadDescriptor, IWriteDescriptor)
-class _FD(object):
- def __init__(self, fd, fileobj, handler):
- self.fd = fd
- self.fileobj = fileobj
- self.handler = handler
- self.reading = False
- self.writing = False
- self.lost = False
-
- def fileno(self):
- return self.fd
-
- def doRead(self):
- if not self.lost:
- self.handler(self.fileobj, tornado.ioloop.IOLoop.READ)
-
- def doWrite(self):
- if not self.lost:
- self.handler(self.fileobj, tornado.ioloop.IOLoop.WRITE)
-
- def connectionLost(self, reason):
- if not self.lost:
- self.handler(self.fileobj, tornado.ioloop.IOLoop.ERROR)
- self.lost = True
-
- writeConnectionLost = readConnectionLost = connectionLost
-
- def logPrefix(self):
- return ''
-
-
-class TwistedIOLoop(tornado.ioloop.IOLoop):
- """IOLoop implementation that runs on Twisted.
-
- `TwistedIOLoop` implements the Tornado IOLoop interface on top of
- the Twisted reactor. Recommended usage::
-
- from tornado.platform.twisted import TwistedIOLoop
- from twisted.internet import reactor
- TwistedIOLoop().install()
- # Set up your tornado application as usual using `IOLoop.instance`
- reactor.run()
-
- Uses the global Twisted reactor by default. To create multiple
- ``TwistedIOLoops`` in the same process, you must pass a unique reactor
- when constructing each one.
-
- Not compatible with `tornado.process.Subprocess.set_exit_callback`
- because the ``SIGCHLD`` handlers used by Tornado and Twisted conflict
- with each other.
-
- See also :meth:`tornado.ioloop.IOLoop.install` for general notes on
- installing alternative IOLoops.
-
- .. deprecated:: 5.1
-
- The `asyncio` event loop will be the only available implementation in
- Tornado 6.0.
- """
- def initialize(self, reactor=None, **kwargs):
- super(TwistedIOLoop, self).initialize(**kwargs)
- if reactor is None:
- import twisted.internet.reactor # type: ignore
- reactor = twisted.internet.reactor
- self.reactor = reactor
- self.fds = {}
-
- def close(self, all_fds=False):
- fds = self.fds
- self.reactor.removeAll()
- for c in self.reactor.getDelayedCalls():
- c.cancel()
- if all_fds:
- for fd in fds.values():
- self.close_fd(fd.fileobj)
-
- def add_handler(self, fd, handler, events):
- if fd in self.fds:
- raise ValueError('fd %s added twice' % fd)
- fd, fileobj = self.split_fd(fd)
- self.fds[fd] = _FD(fd, fileobj, wrap(handler))
- if events & tornado.ioloop.IOLoop.READ:
- self.fds[fd].reading = True
- self.reactor.addReader(self.fds[fd])
- if events & tornado.ioloop.IOLoop.WRITE:
- self.fds[fd].writing = True
- self.reactor.addWriter(self.fds[fd])
-
- def update_handler(self, fd, events):
- fd, fileobj = self.split_fd(fd)
- if events & tornado.ioloop.IOLoop.READ:
- if not self.fds[fd].reading:
- self.fds[fd].reading = True
- self.reactor.addReader(self.fds[fd])
- else:
- if self.fds[fd].reading:
- self.fds[fd].reading = False
- self.reactor.removeReader(self.fds[fd])
- if events & tornado.ioloop.IOLoop.WRITE:
- if not self.fds[fd].writing:
- self.fds[fd].writing = True
- self.reactor.addWriter(self.fds[fd])
- else:
- if self.fds[fd].writing:
- self.fds[fd].writing = False
- self.reactor.removeWriter(self.fds[fd])
-
- def remove_handler(self, fd):
- fd, fileobj = self.split_fd(fd)
- if fd not in self.fds:
- return
- self.fds[fd].lost = True
- if self.fds[fd].reading:
- self.reactor.removeReader(self.fds[fd])
- if self.fds[fd].writing:
- self.reactor.removeWriter(self.fds[fd])
- del self.fds[fd]
-
- def start(self):
- old_current = IOLoop.current(instance=False)
- try:
- self._setup_logging()
- self.make_current()
- self.reactor.run()
- finally:
- if old_current is None:
- IOLoop.clear_current()
- else:
- old_current.make_current()
-
- def stop(self):
- self.reactor.crash()
-
- def add_timeout(self, deadline, callback, *args, **kwargs):
- # This method could be simplified (since tornado 4.0) by
- # overriding call_at instead of add_timeout, but we leave it
- # for now as a test of backwards-compatibility.
- if isinstance(deadline, numbers.Real):
- delay = max(deadline - self.time(), 0)
- elif isinstance(deadline, datetime.timedelta):
- delay = timedelta_to_seconds(deadline)
- else:
- raise TypeError("Unsupported deadline %r")
- return self.reactor.callLater(
- delay, self._run_callback,
- functools.partial(wrap(callback), *args, **kwargs))
-
- def remove_timeout(self, timeout):
- if timeout.active():
- timeout.cancel()
-
- def add_callback(self, callback, *args, **kwargs):
- self.reactor.callFromThread(
- self._run_callback,
- functools.partial(wrap(callback), *args, **kwargs))
-
- def add_callback_from_signal(self, callback, *args, **kwargs):
- self.add_callback(callback, *args, **kwargs)
-
-
class TwistedResolver(Resolver):
"""Twisted-based asynchronous resolver.
from tornado.escape import native_str
from tornado import gen
-from tornado.ioloop import IOLoop, TimeoutError, PollIOLoop, PeriodicCallback
+from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback
from tornado.log import app_log
-from tornado.platform.select import _Select
from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog, gen_test
from tornado.test.util import (unittest, skipIfNonUnix, skipOnTravis,
twisted = None
-class FakeTimeSelect(_Select):
- def __init__(self):
- self._time = 1000
- super(FakeTimeSelect, self).__init__()
-
- def time(self):
- return self._time
-
- def sleep(self, t):
- self._time += t
-
- def poll(self, timeout):
- events = super(FakeTimeSelect, self).poll(0)
- if events:
- return events
- self._time += timeout
- return []
-
-
-class FakeTimeIOLoop(PollIOLoop):
- """IOLoop implementation with a fake and deterministic clock.
-
- The clock advances as needed to trigger timeouts immediately.
- For use when testing code that involves the passage of time
- and no external dependencies.
- """
- def initialize(self):
- self.fts = FakeTimeSelect()
- super(FakeTimeIOLoop, self).initialize(impl=self.fts,
- time_func=self.fts.time)
-
- def sleep(self, t):
- """Simulate a blocking sleep by advancing the clock."""
- self.fts.sleep(t)
-
-
class TestIOLoop(AsyncTestCase):
def test_add_callback_return_sequence(self):
# A callback returning {} or [] shouldn't spin the CPU, see Issue #1803.
self.io_loop.call_later(0, self.stop)
self.wait()
# The asyncio event loop does not guarantee the order of these
- # callbacks, but PollIOLoop does.
+ # callbacks.
self.assertEqual(sorted(results), [1, 2, 3, 4])
def test_add_timeout_return(self):
self.io_loop.run_sync(namespace['f2'])
-@unittest.skipIf(asyncio is not None,
- 'IOLoop configuration not available')
-class TestPeriodicCallback(unittest.TestCase):
- def setUp(self):
- self.io_loop = FakeTimeIOLoop()
- self.io_loop.make_current()
-
- def tearDown(self):
- self.io_loop.close()
-
- def test_basic(self):
- calls = []
-
- def cb():
- calls.append(self.io_loop.time())
- pc = PeriodicCallback(cb, 10000)
- pc.start()
- self.io_loop.call_later(50, self.io_loop.stop)
- self.io_loop.start()
- self.assertEqual(calls, [1010, 1020, 1030, 1040, 1050])
-
- def test_overrun(self):
- sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
- expected = [
- 1010, 1020, 1030, # first 3 calls on schedule
- 1050, 1070, # next 2 delayed one cycle
- 1100, 1130, # next 2 delayed 2 cycles
- 1170, 1210, # next 2 delayed 3 cycles
- 1220, 1230, # then back on schedule.
- ]
- calls = []
-
- def cb():
- calls.append(self.io_loop.time())
- if not sleep_durations:
- self.io_loop.stop()
- return
- self.io_loop.sleep(sleep_durations.pop(0))
- pc = PeriodicCallback(cb, 10000)
- pc.start()
- self.io_loop.start()
- self.assertEqual(calls, expected)
-
- def test_io_loop_set_at_start(self):
- # Check PeriodicCallback uses the current IOLoop at start() time,
- # not at instantiation time.
- calls = []
- io_loop = FakeTimeIOLoop()
-
- def cb():
- calls.append(io_loop.time())
- pc = PeriodicCallback(cb, 10000)
- io_loop.make_current()
- pc.start()
- io_loop.call_later(50, io_loop.stop)
- io_loop.start()
- self.assertEqual(calls, [1010, 1020, 1030, 1040, 1050])
- io_loop.close()
-
-
class TestPeriodicCallbackMath(unittest.TestCase):
def simulate_calls(self, pc, durations):
"""Simulate a series of calls to the PeriodicCallback.
class TestIOLoopConfiguration(unittest.TestCase):
def run_python(self, *statements):
statements = [
- 'from tornado.ioloop import IOLoop, PollIOLoop',
+ 'from tornado.ioloop import IOLoop',
'classname = lambda x: x.__class__.__name__',
] + list(statements)
args = [sys.executable, '-c', '; '.join(statements)]
return native_str(subprocess.check_output(args)).strip()
def test_default(self):
- if asyncio is not None:
- # When asyncio is available, it is used by default.
- cls = self.run_python('print(classname(IOLoop.current()))')
- self.assertEqual(cls, 'AsyncIOMainLoop')
- cls = self.run_python('print(classname(IOLoop()))')
- self.assertEqual(cls, 'AsyncIOLoop')
- else:
- # Otherwise, the default is a subclass of PollIOLoop
- is_poll = self.run_python(
- 'print(isinstance(IOLoop.current(), PollIOLoop))')
- self.assertEqual(is_poll, 'True')
+ # When asyncio is available, it is used by default.
+ cls = self.run_python('print(classname(IOLoop.current()))')
+ self.assertEqual(cls, 'AsyncIOMainLoop')
+ cls = self.run_python('print(classname(IOLoop()))')
+ self.assertEqual(cls, 'AsyncIOLoop')
@unittest.skipIf(asyncio is not None,
"IOLoop configuration not available")
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
from tornado.httpserver import HTTPServer
-from tornado.ioloop import IOLoop, PollIOLoop
+from tornado.ioloop import IOLoop
from tornado.platform.auto import set_close_exec
from tornado.testing import bind_unused_port
from tornado.test.util import unittest
else:
globalLogBeginner.beginLoggingTo([], redirectStandardIO=False)
-if have_twisted:
- class LayeredTwistedIOLoop(TwistedIOLoop):
- """Layers a TwistedIOLoop on top of a TornadoReactor on a PollIOLoop.
-
- This is of course silly, but is useful for testing purposes to make
- sure we're implementing both sides of the various interfaces
- correctly. In some tests another TornadoReactor is layered on top
- of the whole stack.
- """
- def initialize(self, **kwargs):
- self.real_io_loop = PollIOLoop(make_current=False) # type: ignore
- reactor = self.real_io_loop.run_sync(gen.coroutine(TornadoReactor))
- super(LayeredTwistedIOLoop, self).initialize(reactor=reactor, **kwargs)
- self.add_callback(self.make_current)
-
- def close(self, all_fds=False):
- super(LayeredTwistedIOLoop, self).close(all_fds=all_fds)
- # HACK: This is the same thing that test_class.unbuildReactor does.
- for reader in self.reactor._internalReaders:
- self.reactor.removeReader(reader)
- reader.connectionLost(None)
- self.real_io_loop.close(all_fds=all_fds)
-
- def stop(self):
- # One of twisted's tests fails if I don't delay crash()
- # until the reactor has started, but if I move this to
- # TwistedIOLoop then the tests fail when I'm *not* running
- # tornado-on-twisted-on-tornado. I'm clearly missing something
- # about the startup/crash semantics, but since stop and crash
- # are really only used in tests it doesn't really matter.
- def f():
- self.reactor.crash()
- # Become current again on restart. This is needed to
- # override real_io_loop's claim to being the current loop.
- self.add_callback(self.make_current)
- self.reactor.callWhenRunning(f)
-
if __name__ == "__main__":
unittest.main()