"""
from __future__ import absolute_import, division, print_function
+import asyncio
+from concurrent import futures
import functools
import sys
-import warnings
-
-from tornado.stack_context import ExceptionStackContext, wrap
-from tornado.util import ArgReplacer
-
-try:
- from concurrent import futures
-except ImportError:
- futures = None
-
-try:
- import asyncio
-except ImportError:
- asyncio = None
-
-try:
- import typing
-except ImportError:
- typing = None
class ReturnValueIgnoredError(Exception):
+ # No longer used; was previously used by @return_future
pass
Future = asyncio.Future # noqa
-if futures is None:
- FUTURES = Future # type: typing.Union[type, typing.Tuple[type, ...]]
-else:
- FUTURES = (futures.Future, Future)
+FUTURES = (futures.Future, Future)
def is_future(x):
The ``callback`` argument is deprecated and will be removed in
6.0. The decorator itself is discouraged in new code but will
not be removed in 6.0.
+
+ .. versionchanged:: 6.0
+
+ The ``callback`` argument was removed.
"""
def run_on_executor_decorator(fn):
executor = kwargs.get("executor", "executor")
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
- callback = kwargs.pop("callback", None)
async_future = Future()
conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs)
chain_future(conc_future, async_future)
- if callback:
- warnings.warn("callback arguments are deprecated, use the returned Future instead",
- DeprecationWarning)
- from tornado.ioloop import IOLoop
- IOLoop.current().add_future(
- async_future, lambda future: callback(future.result()))
return async_future
return wrapper
if args and kwargs:
_NO_RESULT = object()
-def return_future(f):
- """Decorator to make a function that returns via callback return a
- `Future`.
-
- This decorator was provided to ease the transition from
- callback-oriented code to coroutines. It is not recommended for
- new code.
-
- The wrapped function should take a ``callback`` keyword argument
- and invoke it with one argument when it has finished. To signal failure,
- the function can simply raise an exception (which will be
- captured by the `.StackContext` and passed along to the ``Future``).
-
- From the caller's perspective, the callback argument is optional.
- If one is given, it will be invoked when the function is complete
- with ``Future.result()`` as an argument. If the function fails, the
- callback will not be run and an exception will be raised into the
- surrounding `.StackContext`.
-
- If no callback is given, the caller should use the ``Future`` to
- wait for the function to complete (perhaps by yielding it in a
- coroutine, or passing it to `.IOLoop.add_future`).
-
- Usage:
-
- .. testcode::
-
- @return_future
- def future_func(arg1, arg2, callback):
- # Do stuff (possibly asynchronous)
- callback(result)
-
- async def caller():
- await future_func(arg1, arg2)
-
- ..
-
- Note that ``@return_future`` and ``@gen.engine`` can be applied to the
- same function, provided ``@return_future`` appears first. However,
- consider using ``@gen.coroutine`` instead of this combination.
-
- .. versionchanged:: 5.1
-
- Now raises a `.DeprecationWarning` if a callback argument is passed to
- the decorated function and deprecation warnings are enabled.
-
- .. deprecated:: 5.1
-
- This decorator will be removed in Tornado 6.0. New code should
- use coroutines directly instead of wrapping callback-based code
- with this decorator. Interactions with non-Tornado
- callback-based code should be managed explicitly to avoid
- relying on the `.ExceptionStackContext` built into this
- decorator.
- """
- warnings.warn("@return_future is deprecated, use coroutines instead",
- DeprecationWarning)
- return _non_deprecated_return_future(f)
-
-
-def _non_deprecated_return_future(f):
- # Allow auth.py to use this decorator without triggering
- # deprecation warnings. This will go away once auth.py has removed
- # its legacy interfaces in 6.0.
- replacer = ArgReplacer(f, 'callback')
-
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- future = Future()
- callback, args, kwargs = replacer.replace(
- lambda value=_NO_RESULT: future_set_result_unless_cancelled(future, value),
- args, kwargs)
-
- def handle_error(typ, value, tb):
- future_set_exc_info(future, (typ, value, tb))
- return True
- exc_info = None
- with ExceptionStackContext(handle_error, delay_warning=True):
- try:
- result = f(*args, **kwargs)
- if result is not None:
- raise ReturnValueIgnoredError(
- "@return_future should not be used with functions "
- "that return values")
- except:
- exc_info = sys.exc_info()
- raise
- if exc_info is not None:
- # If the initial synchronous part of f() raised an exception,
- # go ahead and raise it to the caller directly without waiting
- # for them to inspect the Future.
- future.result()
-
- # If the caller passed in a callback, schedule it to be called
- # when the future resolves. It is important that this happens
- # just before we return the future, or else we risk confusing
- # stack contexts with multiple exceptions (one here with the
- # immediate exception, and again when the future resolves and
- # the callback triggers its exception by calling future.result()).
- if callback is not None:
- warnings.warn("callback arguments are deprecated, use the returned Future instead",
- DeprecationWarning)
-
- def run_callback(future):
- result = future.result()
- if result is _NO_RESULT:
- callback()
- else:
- callback(future.result())
- future_add_done_callback(future, wrap(run_callback))
- return future
- return wrapper
-
-
def chain_future(a, b):
"""Chain two futures together so that when one completes, so does the other.
# under the License.
from __future__ import absolute_import, division, print_function
-import gc
import logging
import re
import socket
-import sys
-import traceback
import warnings
-from tornado.concurrent import (Future, return_future, ReturnValueIgnoredError,
- run_on_executor, future_set_result_unless_cancelled)
+from tornado.concurrent import Future, run_on_executor, future_set_result_unless_cancelled
from tornado.escape import utf8, to_unicode
from tornado import gen
-from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
-from tornado.log import app_log
from tornado import stack_context
from tornado.tcpserver import TCPServer
-from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
-from tornado.test.util import unittest, skipBefore35, exec_test, ignore_deprecation
+from tornado.testing import AsyncTestCase, bind_unused_port, gen_test
+from tornado.test.util import unittest, skipBefore35, exec_test
try:
self.assertEqual(fut.result(), 42)
-class ReturnFutureTest(AsyncTestCase):
- with ignore_deprecation():
- @return_future
- def sync_future(self, callback):
- callback(42)
-
- @return_future
- def async_future(self, callback):
- self.io_loop.add_callback(callback, 42)
-
- @return_future
- def immediate_failure(self, callback):
- 1 / 0
-
- @return_future
- def delayed_failure(self, callback):
- self.io_loop.add_callback(lambda: 1 / 0)
-
- @return_future
- def return_value(self, callback):
- # Note that the result of both running the callback and returning
- # a value (or raising an exception) is unspecified; with current
- # implementations the last event prior to callback resolution wins.
- return 42
-
- @return_future
- def no_result_future(self, callback):
- callback()
-
- def test_immediate_failure(self):
- with self.assertRaises(ZeroDivisionError):
- # The caller sees the error just like a normal function.
- self.immediate_failure(callback=self.stop)
- # The callback is not run because the function failed synchronously.
- self.io_loop.add_timeout(self.io_loop.time() + 0.05, self.stop)
- result = self.wait()
- self.assertIs(result, None)
-
- def test_return_value(self):
- with self.assertRaises(ReturnValueIgnoredError):
- self.return_value(callback=self.stop)
-
- def test_callback_kw(self):
- with ignore_deprecation():
- future = self.sync_future(callback=self.stop)
- result = self.wait()
- self.assertEqual(result, 42)
- self.assertEqual(future.result(), 42)
-
- def test_callback_positional(self):
- # When the callback is passed in positionally, future_wrap shouldn't
- # add another callback in the kwargs.
- with ignore_deprecation():
- future = self.sync_future(self.stop)
- result = self.wait()
- self.assertEqual(result, 42)
- self.assertEqual(future.result(), 42)
-
- def test_no_callback(self):
- future = self.sync_future()
- self.assertEqual(future.result(), 42)
-
- def test_none_callback_kw(self):
- # explicitly pass None as callback
- future = self.sync_future(callback=None)
- self.assertEqual(future.result(), 42)
-
- def test_none_callback_pos(self):
- future = self.sync_future(None)
- self.assertEqual(future.result(), 42)
-
- def test_async_future(self):
- future = self.async_future()
- self.assertFalse(future.done())
- self.io_loop.add_future(future, self.stop)
- future2 = self.wait()
- self.assertIs(future, future2)
- self.assertEqual(future.result(), 42)
-
- @gen_test
- def test_async_future_gen(self):
- result = yield self.async_future()
- self.assertEqual(result, 42)
-
- def test_delayed_failure(self):
- future = self.delayed_failure()
- with ignore_deprecation():
- self.io_loop.add_future(future, self.stop)
- future2 = self.wait()
- self.assertIs(future, future2)
- with self.assertRaises(ZeroDivisionError):
- future.result()
-
- def test_kw_only_callback(self):
- with ignore_deprecation():
- @return_future
- def f(**kwargs):
- kwargs['callback'](42)
- future = f()
- self.assertEqual(future.result(), 42)
-
- def test_error_in_callback(self):
- with ignore_deprecation():
- self.sync_future(callback=lambda future: 1 / 0)
- # The exception gets caught by our StackContext and will be re-raised
- # when we wait.
- self.assertRaises(ZeroDivisionError, self.wait)
-
- def test_no_result_future(self):
- with ignore_deprecation():
- future = self.no_result_future(self.stop)
- result = self.wait()
- self.assertIs(result, None)
- # result of this future is undefined, but not an error
- future.result()
-
- def test_no_result_future_callback(self):
- with ignore_deprecation():
- future = self.no_result_future(callback=lambda: self.stop())
- result = self.wait()
- self.assertIs(result, None)
- future.result()
-
- @gen_test
- def test_future_traceback(self):
- @gen.coroutine
- def f():
- yield gen.moment
- try:
- 1 / 0
- except ZeroDivisionError:
- self.expected_frame = traceback.extract_tb(
- sys.exc_info()[2], limit=1)[0]
- raise
- try:
- yield f()
- self.fail("didn't get expected exception")
- except ZeroDivisionError:
- tb = traceback.extract_tb(sys.exc_info()[2])
- self.assertIn(self.expected_frame, tb)
-
- @gen_test
- def test_uncaught_exception_log(self):
- if IOLoop.configured_class().__name__.endswith('AsyncIOLoop'):
- # Install an exception handler that mirrors our
- # non-asyncio logging behavior.
- def exc_handler(loop, context):
- app_log.error('%s: %s', context['message'],
- type(context.get('exception')))
- self.io_loop.asyncio_loop.set_exception_handler(exc_handler)
-
- @gen.coroutine
- def f():
- yield gen.moment
- 1 / 0
-
- g = f()
-
- with ExpectLog(app_log,
- "(?s)Future.* exception was never retrieved:"
- ".*ZeroDivisionError"):
- yield gen.moment
- yield gen.moment
- # For some reason, TwistedIOLoop and pypy3 need a third iteration
- # in order to drain references to the future
- yield gen.moment
- del g
- gc.collect() # for PyPy
-
-
# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
self.future.set_exception(e)
-class DecoratorCapClient(BaseCapClient):
- with ignore_deprecation():
- @return_future
- def capitalize(self, request_data, callback):
- logging.debug("capitalize")
- self.request_data = request_data
- self.stream = IOStream(socket.socket())
- self.stream.connect(('127.0.0.1', self.port),
- callback=self.handle_connect)
- self.callback = callback
-
- def handle_connect(self):
- logging.debug("handle_connect")
- self.stream.write(utf8(self.request_data + "\n"))
- self.stream.read_until(b'\n', callback=self.handle_read)
-
- def handle_read(self, data):
- logging.debug("handle_read")
- self.stream.close()
- self.callback(self.process_response(data))
-
-
class GeneratorCapClient(BaseCapClient):
@gen.coroutine
def capitalize(self, request_data):
self.server.stop()
super(ClientTestMixin, self).tearDown() # type: ignore
- def test_callback(self):
- with ignore_deprecation():
- self.client.capitalize("hello", callback=self.stop)
- result = self.wait()
- self.assertEqual(result, "HELLO")
-
- def test_callback_error(self):
- with ignore_deprecation():
- self.client.capitalize("HELLO", callback=self.stop)
- self.assertRaisesRegexp(CapError, "already capitalized", self.wait)
-
def test_future(self):
future = self.client.capitalize("hello")
self.io_loop.add_future(future, self.stop)
self.warning_catcher.__exit__(None, None, None)
-class DecoratorClientTest(ClientTestMixin, AsyncTestCase):
- client_class = DecoratorCapClient
-
- def setUp(self):
- self.warning_catcher = warnings.catch_warnings()
- self.warning_catcher.__enter__()
- warnings.simplefilter('ignore', DeprecationWarning)
- super(DecoratorClientTest, self).setUp()
-
- def tearDown(self):
- super(DecoratorClientTest, self).tearDown()
- self.warning_catcher.__exit__(None, None, None)
-
-
class GeneratorClientTest(ClientTestMixin, AsyncTestCase):
client_class = GeneratorCapClient
from tornado.netutil import (
BlockingResolver, OverrideResolver, ThreadedResolver, is_valid_ip, bind_sockets
)
-from tornado.stack_context import ExceptionStackContext
from tornado.testing import AsyncTestCase, gen_test, bind_unused_port
-from tornado.test.util import unittest, skipIfNoNetwork, ignore_deprecation
+from tornado.test.util import unittest, skipIfNoNetwork
try:
from concurrent import futures
class _ResolverTestMixin(object):
- def test_localhost(self):
- with ignore_deprecation():
- self.resolver.resolve('localhost', 80, callback=self.stop)
- result = self.wait()
- self.assertIn((socket.AF_INET, ('127.0.0.1', 80)), result)
-
@gen_test
- def test_future_interface(self):
+ def test_localhost(self):
addrinfo = yield self.resolver.resolve('localhost', 80,
socket.AF_UNSPEC)
self.assertIn((socket.AF_INET, ('127.0.0.1', 80)),
# It is impossible to quickly and consistently generate an error in name
# resolution, so test this case separately, using mocks as needed.
class _ResolverErrorTestMixin(object):
- def test_bad_host(self):
- def handler(exc_typ, exc_val, exc_tb):
- self.stop(exc_val)
- return True # Halt propagation.
-
- with ignore_deprecation():
- with ExceptionStackContext(handler):
- self.resolver.resolve('an invalid domain', 80, callback=self.stop)
-
- result = self.wait()
- self.assertIsInstance(result, Exception)
-
@gen_test
- def test_future_interface_bad_host(self):
+ def test_bad_host(self):
with self.assertRaises(IOError):
yield self.resolver.resolve('an invalid domain', 80,
socket.AF_UNSPEC)