"""
from __future__ import absolute_import, division, print_function
+import asyncio
import collections
import functools
-import itertools
import os
import sys
import types
from tornado.ioloop import IOLoop
from tornado.log import app_log
from tornado import stack_context
-from tornado.util import PY3, raise_exc_info, TimeoutError
+from tornado.util import PY3, TimeoutError
try:
try:
return future
-def engine(func):
- """Callback-oriented decorator for asynchronous generators.
-
- This is an older interface; for new code that does not need to be
- compatible with versions of Tornado older than 3.0 the
- `coroutine` decorator is recommended instead.
-
- This decorator is similar to `coroutine`, except it does not
- return a `.Future` and the ``callback`` argument is not treated
- specially.
-
- In most cases, functions decorated with `engine` should take
- a ``callback`` argument and invoke it with their result when
- they are finished. One notable exception is the
- `~tornado.web.RequestHandler` :ref:`HTTP verb methods <verbs>`,
- which use ``self.finish()`` in place of a callback argument.
-
- .. deprecated:: 5.1
-
- This decorator will be removed in 6.0. Use `coroutine` or
- ``async def`` instead.
- """
- warnings.warn("gen.engine is deprecated, use gen.coroutine or async def instead",
- DeprecationWarning)
- func = _make_coroutine_wrapper(func, replace_callback=False)
-
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- future = func(*args, **kwargs)
-
- def final_callback(future):
- if future.result() is not None:
- raise ReturnValueIgnoredError(
- "@gen.engine functions cannot return values: %r" %
- (future.result(),))
- # The engine interface doesn't give us any way to return
- # errors but to raise them into the stack context.
- # Save the stack context here to use when the Future has resolved.
- future_add_done_callback(future, stack_context.wrap(final_callback))
- return wrapper
-
-
def coroutine(func):
"""Decorator for asynchronous generators.
return self.next()
-class YieldPoint(object):
- """Base class for objects that may be yielded from the generator.
-
- .. deprecated:: 4.0
- Use `Futures <.Future>` instead. This class and all its subclasses
- will be removed in 6.0
- """
- def __init__(self):
- warnings.warn("YieldPoint is deprecated, use Futures instead",
- DeprecationWarning)
-
- def start(self, runner):
- """Called by the runner after the generator has yielded.
-
- No other methods will be called on this object before ``start``.
- """
- raise NotImplementedError()
-
- def is_ready(self):
- """Called by the runner to determine whether to resume the generator.
-
- Returns a boolean; may be called more than once.
- """
- raise NotImplementedError()
-
- def get_result(self):
- """Returns the value to use as the result of the yield expression.
-
- This method will only be called once, and only after `is_ready`
- has returned true.
- """
- raise NotImplementedError()
-
-
-class Callback(YieldPoint):
- """Returns a callable object that will allow a matching `Wait` to proceed.
-
- The key may be any value suitable for use as a dictionary key, and is
- used to match ``Callbacks`` to their corresponding ``Waits``. The key
- must be unique among outstanding callbacks within a single run of the
- generator function, but may be reused across different runs of the same
- function (so constants generally work fine).
-
- The callback may be called with zero or one arguments; if an argument
- is given it will be returned by `Wait`.
-
- .. deprecated:: 4.0
- Use `Futures <.Future>` instead. This class will be removed in 6.0.
- """
- def __init__(self, key):
- warnings.warn("gen.Callback is deprecated, use Futures instead",
- DeprecationWarning)
- self.key = key
-
- def start(self, runner):
- self.runner = runner
- runner.register_callback(self.key)
-
- def is_ready(self):
- return True
-
- def get_result(self):
- return self.runner.result_callback(self.key)
-
-
-class Wait(YieldPoint):
- """Returns the argument passed to the result of a previous `Callback`.
-
- .. deprecated:: 4.0
- Use `Futures <.Future>` instead. This class will be removed in 6.0.
- """
- def __init__(self, key):
- warnings.warn("gen.Wait is deprecated, use Futures instead",
- DeprecationWarning)
- self.key = key
-
- def start(self, runner):
- self.runner = runner
-
- def is_ready(self):
- return self.runner.is_ready(self.key)
-
- def get_result(self):
- return self.runner.pop_result(self.key)
-
-
-class WaitAll(YieldPoint):
- """Returns the results of multiple previous `Callbacks <Callback>`.
-
- The argument is a sequence of `Callback` keys, and the result is
- a list of results in the same order.
-
- `WaitAll` is equivalent to yielding a list of `Wait` objects.
-
- .. deprecated:: 4.0
- Use `Futures <.Future>` instead. This class will be removed in 6.0.
- """
- def __init__(self, keys):
- warnings.warn("gen.WaitAll is deprecated, use gen.multi instead",
- DeprecationWarning)
- self.keys = keys
-
- def start(self, runner):
- self.runner = runner
-
- def is_ready(self):
- return all(self.runner.is_ready(key) for key in self.keys)
-
- def get_result(self):
- return [self.runner.pop_result(key) for key in self.keys]
-
-
-def Task(func, *args, **kwargs):
- """Adapts a callback-based asynchronous function for use in coroutines.
-
- Takes a function (and optional additional arguments) and runs it with
- those arguments plus a ``callback`` keyword argument. The argument passed
- to the callback is returned as the result of the yield expression.
-
- .. versionchanged:: 4.0
- ``gen.Task`` is now a function that returns a `.Future`, instead of
- a subclass of `YieldPoint`. It still behaves the same way when
- yielded.
-
- .. deprecated:: 5.1
- This function is deprecated and will be removed in 6.0.
- """
- warnings.warn("gen.Task is deprecated, use Futures instead",
- DeprecationWarning)
- future = _create_future()
-
- def handle_exception(typ, value, tb):
- if future.done():
- return False
- future_set_exc_info(future, (typ, value, tb))
- return True
-
- def set_result(result):
- if future.done():
- return
- future_set_result_unless_cancelled(future, result)
- with stack_context.ExceptionStackContext(handle_exception):
- func(*args, callback=_argument_adapter(set_result), **kwargs)
- return future
-
-
-class YieldFuture(YieldPoint):
- def __init__(self, future):
- """Adapts a `.Future` to the `YieldPoint` interface.
-
- .. versionchanged:: 5.0
- The ``io_loop`` argument (deprecated since version 4.1) has been removed.
-
- .. deprecated:: 5.1
- This class will be removed in 6.0.
- """
- warnings.warn("YieldFuture is deprecated, use Futures instead",
- DeprecationWarning)
- self.future = future
- self.io_loop = IOLoop.current()
-
- def start(self, runner):
- if not self.future.done():
- self.runner = runner
- self.key = object()
- runner.register_callback(self.key)
- self.io_loop.add_future(self.future, runner.result_callback(self.key))
- else:
- self.runner = None
- self.result_fn = self.future.result
-
- def is_ready(self):
- if self.runner is not None:
- return self.runner.is_ready(self.key)
- else:
- return True
-
- def get_result(self):
- if self.runner is not None:
- return self.runner.pop_result(self.key).result()
- else:
- return self.result_fn()
-
-
-def _contains_yieldpoint(children):
- """Returns True if ``children`` contains any YieldPoints.
-
- ``children`` may be a dict or a list, as used by `MultiYieldPoint`
- and `multi_future`.
- """
- if isinstance(children, dict):
- return any(isinstance(i, YieldPoint) for i in children.values())
- if isinstance(children, list):
- return any(isinstance(i, YieldPoint) for i in children)
- return False
-
-
def multi(children, quiet_exceptions=()):
"""Runs multiple asynchronous operations in parallel.
other than `YieldPoint` and `.Future`.
"""
- if _contains_yieldpoint(children):
- return MultiYieldPoint(children, quiet_exceptions=quiet_exceptions)
- else:
- return multi_future(children, quiet_exceptions=quiet_exceptions)
+ return multi_future(children, quiet_exceptions=quiet_exceptions)
Multi = multi
-class MultiYieldPoint(YieldPoint):
- """Runs multiple asynchronous operations in parallel.
-
- This class is similar to `multi`, but it always creates a stack
- context even when no children require it. It is not compatible with
- native coroutines.
-
- .. versionchanged:: 4.2
- If multiple ``YieldPoints`` fail, any exceptions after the first
- (which is raised) will be logged. Added the ``quiet_exceptions``
- argument to suppress this logging for selected exception types.
-
- .. versionchanged:: 4.3
- Renamed from ``Multi`` to ``MultiYieldPoint``. The name ``Multi``
- remains as an alias for the equivalent `multi` function.
-
- .. deprecated:: 4.3
- Use `multi` instead. This class will be removed in 6.0.
- """
- def __init__(self, children, quiet_exceptions=()):
- warnings.warn("MultiYieldPoint is deprecated, use Futures instead",
- DeprecationWarning)
- self.keys = None
- if isinstance(children, dict):
- self.keys = list(children.keys())
- children = children.values()
- self.children = []
- for i in children:
- if not isinstance(i, YieldPoint):
- i = convert_yielded(i)
- if is_future(i):
- i = YieldFuture(i)
- self.children.append(i)
- assert all(isinstance(i, YieldPoint) for i in self.children)
- self.unfinished_children = set(self.children)
- self.quiet_exceptions = quiet_exceptions
-
- def start(self, runner):
- for i in self.children:
- i.start(runner)
-
- def is_ready(self):
- finished = list(itertools.takewhile(
- lambda i: i.is_ready(), self.unfinished_children))
- self.unfinished_children.difference_update(finished)
- return not self.unfinished_children
-
- def get_result(self):
- result_list = []
- exc_info = None
- for f in self.children:
- try:
- result_list.append(f.get_result())
- except Exception as e:
- if exc_info is None:
- exc_info = sys.exc_info()
- else:
- if not isinstance(e, self.quiet_exceptions):
- app_log.error("Multiple exceptions in yield list",
- exc_info=True)
- if exc_info is not None:
- raise_exc_info(exc_info)
- if self.keys is not None:
- return dict(zip(self.keys, result_list))
- else:
- return list(result_list)
-
-
def multi_future(children, quiet_exceptions=()):
"""Wait for multiple asynchronous futures in parallel.
self.running = False
def handle_yield(self, yielded):
- # Lists containing YieldPoints require stack contexts;
- # other lists are handled in convert_yielded.
- if _contains_yieldpoint(yielded):
- yielded = multi(yielded)
-
- if isinstance(yielded, YieldPoint):
- # YieldPoints are too closely coupled to the Runner to go
- # through the generic convert_yielded mechanism.
+ try:
+ self.future = convert_yielded(yielded)
+ except BadYieldError:
self.future = Future()
-
- def start_yield_point():
- try:
- yielded.start(self)
- if yielded.is_ready():
- future_set_result_unless_cancelled(self.future, yielded.get_result())
- else:
- self.yield_point = yielded
- except Exception:
- self.future = Future()
- future_set_exc_info(self.future, sys.exc_info())
-
- if self.stack_context_deactivate is None:
- # Start a stack context if this is the first
- # YieldPoint we've seen.
- with stack_context.ExceptionStackContext(
- self.handle_exception) as deactivate:
- self.stack_context_deactivate = deactivate
-
- def cb():
- start_yield_point()
- self.run()
- self.io_loop.add_callback(cb)
- return False
- else:
- start_yield_point()
- else:
- try:
- self.future = convert_yielded(yielded)
- except BadYieldError:
- self.future = Future()
- future_set_exc_info(self.future, sys.exc_info())
+ future_set_exc_info(self.future, sys.exc_info())
if self.future is moment:
self.io_loop.add_callback(self.run)
# Convert Awaitables into Futures.
try:
- import asyncio
-except ImportError:
- # Py2-compatible version for use with Cython.
- # Copied from PEP 380.
- @coroutine
- def _wrap_awaitable(x):
- if hasattr(x, '__await__'):
- _i = x.__await__()
- else:
- _i = iter(x)
- try:
- _y = next(_i)
- except StopIteration as _e:
- _r = _value_from_stopiteration(_e)
- else:
- while 1:
- try:
- _s = yield _y
- except GeneratorExit as _e:
- try:
- _m = _i.close
- except AttributeError:
- pass
- else:
- _m()
- raise _e
- except BaseException as _e:
- _x = sys.exc_info()
- try:
- _m = _i.throw
- except AttributeError:
- raise _e
- else:
- try:
- _y = _m(*_x)
- except StopIteration as _e:
- _r = _value_from_stopiteration(_e)
- break
- else:
- try:
- if _s is None:
- _y = next(_i)
- else:
- _y = _i.send(_s)
- except StopIteration as _e:
- _r = _value_from_stopiteration(_e)
- break
- raise Return(_r)
-else:
- try:
- _wrap_awaitable = asyncio.ensure_future
- except AttributeError:
- # asyncio.ensure_future was introduced in Python 3.4.4, but
- # Debian jessie still ships with 3.4.2 so try the old name.
- _wrap_awaitable = getattr(asyncio, 'async')
+ _wrap_awaitable = asyncio.ensure_future
+except AttributeError:
+ # asyncio.ensure_future was introduced in Python 3.4.4, but
+ # Debian jessie still ships with 3.4.2 so try the old name.
+ _wrap_awaitable = getattr(asyncio, 'async')
def convert_yielded(yielded):
yield self.authorize_redirect()
-class TwitterClientLoginGenEngineHandler(TwitterClientHandler):
- with ignore_deprecation():
- @asynchronous
- @gen.engine
- def get(self):
- if self.get_argument("oauth_token", None):
- user = yield self.get_authenticated_user()
- self.finish(user)
- else:
- # Old style: with @gen.engine we can ignore the Future from
- # authorize_redirect.
- self.authorize_redirect()
-
-
class TwitterClientLoginGenCoroutineHandler(TwitterClientHandler):
@gen.coroutine
def get(self):
yield self.authorize_redirect()
-class TwitterClientShowUserHandlerLegacy(TwitterClientHandler):
- with ignore_deprecation():
- @asynchronous
- @gen.engine
- def get(self):
- # TODO: would be nice to go through the login flow instead of
- # cheating with a hard-coded access token.
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- response = yield gen.Task(self.twitter_request,
- '/users/show/%s' % self.get_argument('name'),
- access_token=dict(key='hjkl', secret='vbnm'))
- if response is None:
- self.set_status(500)
- self.finish('error from twitter request')
- else:
- self.finish(response)
-
-
class TwitterClientShowUserHandler(TwitterClientHandler):
@gen.coroutine
def get(self):
('/legacy/twitter/client/login', TwitterClientLoginHandlerLegacy, dict(test=self)),
('/twitter/client/login', TwitterClientLoginHandler, dict(test=self)),
- ('/twitter/client/login_gen_engine',
- TwitterClientLoginGenEngineHandler, dict(test=self)),
('/twitter/client/login_gen_coroutine',
TwitterClientLoginGenCoroutineHandler, dict(test=self)),
- ('/legacy/twitter/client/show_user',
- TwitterClientShowUserHandlerLegacy, dict(test=self)),
('/twitter/client/show_user',
TwitterClientShowUserHandler, dict(test=self)),
def test_twitter_redirect(self):
self.base_twitter_redirect('/twitter/client/login')
- def test_twitter_redirect_gen_engine(self):
- self.base_twitter_redirect('/twitter/client/login_gen_engine')
-
def test_twitter_redirect_gen_coroutine(self):
self.base_twitter_redirect('/twitter/client/login_gen_coroutine')
u'screen_name': u'foo',
u'username': u'foo'})
- def test_twitter_show_user_legacy(self):
- response = self.fetch('/legacy/twitter/client/show_user?name=somebody')
- response.rethrow()
- self.assertEqual(json_decode(response.body),
- {'name': 'Somebody', 'screen_name': 'somebody'})
-
- def test_twitter_show_user_error_legacy(self):
- with ExpectLog(gen_log, 'Error response HTTP 500'):
- response = self.fetch('/legacy/twitter/client/show_user?name=error')
- self.assertEqual(response.code, 500)
- self.assertEqual(response.body, b'error from twitter request')
-
def test_twitter_show_user(self):
response = self.fetch('/twitter/client/show_user?name=somebody')
response.rethrow()
self.assertIs(result, None)
future.result()
- @gen_test
- def test_future_traceback_legacy(self):
- with ignore_deprecation():
- @return_future
- @gen.engine
- def f(callback):
- yield gen.Task(self.io_loop.add_callback)
- try:
- 1 / 0
- except ZeroDivisionError:
- self.expected_frame = traceback.extract_tb(
- sys.exc_info()[2], limit=1)[0]
- raise
- try:
- yield f()
- self.fail("didn't get expected exception")
- except ZeroDivisionError:
- tb = traceback.extract_tb(sys.exc_info()[2])
- self.assertIn(self.expected_frame, tb)
-
@gen_test
def test_future_traceback(self):
@gen.coroutine
asyncio = None
-class GenEngineTest(AsyncTestCase):
- def setUp(self):
- self.warning_catcher = warnings.catch_warnings()
- self.warning_catcher.__enter__()
- warnings.simplefilter('ignore', DeprecationWarning)
- super(GenEngineTest, self).setUp()
- self.named_contexts = []
-
- def tearDown(self):
- super(GenEngineTest, self).tearDown()
- self.warning_catcher.__exit__(None, None, None)
-
- def named_context(self, name):
- @contextlib.contextmanager
- def context():
- self.named_contexts.append(name)
- try:
- yield
- finally:
- self.assertEqual(self.named_contexts.pop(), name)
- return context
-
- def run_gen(self, f):
- f()
- return self.wait()
-
- def delay_callback(self, iterations, callback, arg):
- """Runs callback(arg) after a number of IOLoop iterations."""
- if iterations == 0:
- callback(arg)
- else:
- self.io_loop.add_callback(functools.partial(
- self.delay_callback, iterations - 1, callback, arg))
-
- with ignore_deprecation():
- @return_future
- def async_future(self, result, callback):
- self.io_loop.add_callback(callback, result)
-
- @gen.coroutine
- def async_exception(self, e):
- yield gen.moment
- raise e
-
- def test_no_yield(self):
- @gen.engine
- def f():
- self.stop()
- self.run_gen(f)
-
- def test_inline_cb(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))()
- res = yield gen.Wait("k1")
- self.assertTrue(res is None)
- self.stop()
- self.run_gen(f)
-
- def test_ioloop_cb(self):
- @gen.engine
- def f():
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- self.stop()
- self.run_gen(f)
-
- def test_exception_phase1(self):
- @gen.engine
- def f():
- 1 / 0
- self.assertRaises(ZeroDivisionError, self.run_gen, f)
-
- def test_exception_phase2(self):
- @gen.engine
- def f():
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- 1 / 0
- self.assertRaises(ZeroDivisionError, self.run_gen, f)
-
- def test_exception_in_task_phase1(self):
- def fail_task(callback):
- 1 / 0
-
- @gen.engine
- def f():
- try:
- yield gen.Task(fail_task)
- raise Exception("did not get expected exception")
- except ZeroDivisionError:
- self.stop()
- self.run_gen(f)
-
- def test_exception_in_task_phase2(self):
- # This is the case that requires the use of stack_context in gen.engine
- def fail_task(callback):
- self.io_loop.add_callback(lambda: 1 / 0)
-
- @gen.engine
- def f():
- try:
- yield gen.Task(fail_task)
- raise Exception("did not get expected exception")
- except ZeroDivisionError:
- self.stop()
- self.run_gen(f)
-
- def test_with_arg(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))(42)
- res = yield gen.Wait("k1")
- self.assertEqual(42, res)
- self.stop()
- self.run_gen(f)
-
- def test_with_arg_tuple(self):
- @gen.engine
- def f():
- (yield gen.Callback((1, 2)))((3, 4))
- res = yield gen.Wait((1, 2))
- self.assertEqual((3, 4), res)
- self.stop()
- self.run_gen(f)
-
- def test_key_reuse(self):
- @gen.engine
- def f():
- yield gen.Callback("k1")
- yield gen.Callback("k1")
- self.stop()
- self.assertRaises(gen.KeyReuseError, self.run_gen, f)
-
- def test_key_reuse_tuple(self):
- @gen.engine
- def f():
- yield gen.Callback((1, 2))
- yield gen.Callback((1, 2))
- self.stop()
- self.assertRaises(gen.KeyReuseError, self.run_gen, f)
-
- def test_key_mismatch(self):
- @gen.engine
- def f():
- yield gen.Callback("k1")
- yield gen.Wait("k2")
- self.stop()
- self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
-
- def test_key_mismatch_tuple(self):
- @gen.engine
- def f():
- yield gen.Callback((1, 2))
- yield gen.Wait((2, 3))
- self.stop()
- self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
-
- def test_leaked_callback(self):
- @gen.engine
- def f():
- yield gen.Callback("k1")
- self.stop()
- self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
-
- def test_leaked_callback_tuple(self):
- @gen.engine
- def f():
- yield gen.Callback((1, 2))
- self.stop()
- self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
-
- def test_parallel_callback(self):
- @gen.engine
- def f():
- for k in range(3):
- self.io_loop.add_callback((yield gen.Callback(k)))
- yield gen.Wait(1)
- self.io_loop.add_callback((yield gen.Callback(3)))
- yield gen.Wait(0)
- yield gen.Wait(3)
- yield gen.Wait(2)
- self.stop()
- self.run_gen(f)
-
- def test_bogus_yield(self):
- @gen.engine
- def f():
- yield 42
- self.assertRaises(gen.BadYieldError, self.run_gen, f)
-
- def test_bogus_yield_tuple(self):
- @gen.engine
- def f():
- yield (1, 2)
- self.assertRaises(gen.BadYieldError, self.run_gen, f)
-
- def test_reuse(self):
- @gen.engine
- def f():
- self.io_loop.add_callback((yield gen.Callback(0)))
- yield gen.Wait(0)
- self.stop()
- self.run_gen(f)
- self.run_gen(f)
-
- def test_task(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- self.stop()
- self.run_gen(f)
-
- def test_wait_all(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))("v1")
- (yield gen.Callback("k2"))("v2")
- results = yield gen.WaitAll(["k1", "k2"])
- self.assertEqual(results, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
-
- def test_exception_in_yield(self):
- @gen.engine
- def f():
- try:
- yield gen.Wait("k1")
- raise Exception("did not get expected exception")
- except gen.UnknownKeyError:
- pass
- self.stop()
- self.run_gen(f)
-
- def test_resume_after_exception_in_yield(self):
- @gen.engine
- def f():
- try:
- yield gen.Wait("k1")
- raise Exception("did not get expected exception")
- except gen.UnknownKeyError:
- pass
- (yield gen.Callback("k2"))("v2")
- self.assertEqual((yield gen.Wait("k2")), "v2")
- self.stop()
- self.run_gen(f)
-
- def test_orphaned_callback(self):
- @gen.engine
- def f():
- self.orphaned_callback = yield gen.Callback(1)
- try:
- self.run_gen(f)
- raise Exception("did not get expected exception")
- except gen.LeakedCallbackError:
- pass
- self.orphaned_callback()
-
- def test_none(self):
- @gen.engine
- def f():
- yield None
- self.stop()
- self.run_gen(f)
-
- def test_multi(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))("v1")
- (yield gen.Callback("k2"))("v2")
- results = yield [gen.Wait("k1"), gen.Wait("k2")]
- self.assertEqual(results, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
-
- def test_multi_dict(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))("v1")
- (yield gen.Callback("k2"))("v2")
- results = yield dict(foo=gen.Wait("k1"), bar=gen.Wait("k2"))
- self.assertEqual(results, dict(foo="v1", bar="v2"))
- self.stop()
- self.run_gen(f)
-
- # The following tests explicitly run with both gen.Multi
- # and gen.multi_future (Task returns a Future, so it can be used
- # with either).
- def test_multi_yieldpoint_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.Multi([
- gen.Task(self.delay_callback, 3, arg="v1"),
- gen.Task(self.delay_callback, 1, arg="v2"),
- ])
- self.assertEqual(responses, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
-
- def test_multi_yieldpoint_dict_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.Multi(dict(
- foo=gen.Task(self.delay_callback, 3, arg="v1"),
- bar=gen.Task(self.delay_callback, 1, arg="v2"),
- ))
- self.assertEqual(responses, dict(foo="v1", bar="v2"))
- self.stop()
- self.run_gen(f)
-
- def test_multi_future_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.multi_future([
- gen.Task(self.delay_callback, 3, arg="v1"),
- gen.Task(self.delay_callback, 1, arg="v2"),
- ])
- self.assertEqual(responses, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
-
- def test_multi_future_dict_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.multi_future(dict(
- foo=gen.Task(self.delay_callback, 3, arg="v1"),
- bar=gen.Task(self.delay_callback, 1, arg="v2"),
- ))
- self.assertEqual(responses, dict(foo="v1", bar="v2"))
- self.stop()
- self.run_gen(f)
-
- @skipOnTravis
- @gen_test
- def test_multi_performance(self):
- # Yielding a list used to have quadratic performance; make
- # sure a large list stays reasonable. On my laptop a list of
- # 2000 used to take 1.8s, now it takes 0.12.
- start = time.time()
- yield [gen.Task(self.io_loop.add_callback) for i in range(2000)]
- end = time.time()
- self.assertLess(end - start, 1.0)
-
- @gen_test
- def test_multi_empty(self):
- # Empty lists or dicts should return the same type.
- x = yield []
- self.assertTrue(isinstance(x, list))
- y = yield {}
- self.assertTrue(isinstance(y, dict))
-
- @gen_test
- def test_multi_mixed_types(self):
- # A YieldPoint (Wait) and Future (Task) can be combined
- # (and use the YieldPoint codepath)
- (yield gen.Callback("k1"))("v1")
- responses = yield [gen.Wait("k1"),
- gen.Task(self.delay_callback, 3, arg="v2")]
- self.assertEqual(responses, ["v1", "v2"])
-
- @gen_test
- def test_future(self):
- result = yield self.async_future(1)
- self.assertEqual(result, 1)
-
- @gen_test
- def test_multi_future(self):
- results = yield [self.async_future(1), self.async_future(2)]
- self.assertEqual(results, [1, 2])
-
- @gen_test
- def test_multi_future_duplicate(self):
- f = self.async_future(2)
- results = yield [self.async_future(1), f, self.async_future(3), f]
- self.assertEqual(results, [1, 2, 3, 2])
-
- @gen_test
- def test_multi_dict_future(self):
- results = yield dict(foo=self.async_future(1), bar=self.async_future(2))
- self.assertEqual(results, dict(foo=1, bar=2))
-
- @gen_test
- def test_multi_exceptions(self):
- with ExpectLog(app_log, "Multiple exceptions in yield list"):
- with self.assertRaises(RuntimeError) as cm:
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))])
- self.assertEqual(str(cm.exception), "error 1")
-
- # With only one exception, no error is logged.
- with self.assertRaises(RuntimeError):
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_future(2)])
-
- # Exception logging may be explicitly quieted.
- with self.assertRaises(RuntimeError):
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))],
- quiet_exceptions=RuntimeError)
-
- @gen_test
- def test_multi_future_exceptions(self):
- with ExpectLog(app_log, "Multiple exceptions in yield list"):
- with self.assertRaises(RuntimeError) as cm:
- yield [self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))]
- self.assertEqual(str(cm.exception), "error 1")
-
- # With only one exception, no error is logged.
- with self.assertRaises(RuntimeError):
- yield [self.async_exception(RuntimeError("error 1")),
- self.async_future(2)]
-
- # Exception logging may be explicitly quieted.
- with self.assertRaises(RuntimeError):
- yield gen.multi_future(
- [self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))],
- quiet_exceptions=RuntimeError)
-
- def test_arguments(self):
- @gen.engine
- def f():
- (yield gen.Callback("noargs"))()
- self.assertEqual((yield gen.Wait("noargs")), None)
- (yield gen.Callback("1arg"))(42)
- self.assertEqual((yield gen.Wait("1arg")), 42)
-
- (yield gen.Callback("kwargs"))(value=42)
- result = yield gen.Wait("kwargs")
- self.assertTrue(isinstance(result, gen.Arguments))
- self.assertEqual(((), dict(value=42)), result)
- self.assertEqual(dict(value=42), result.kwargs)
-
- (yield gen.Callback("2args"))(42, 43)
- result = yield gen.Wait("2args")
- self.assertTrue(isinstance(result, gen.Arguments))
- self.assertEqual(((42, 43), {}), result)
- self.assertEqual((42, 43), result.args)
-
- def task_func(callback):
- callback(None, error="foo")
- result = yield gen.Task(task_func)
- self.assertTrue(isinstance(result, gen.Arguments))
- self.assertEqual(((None,), dict(error="foo")), result)
-
- self.stop()
- self.run_gen(f)
-
- def test_stack_context_leak(self):
- # regression test: repeated invocations of a gen-based
- # function should not result in accumulated stack_contexts
- def _stack_depth():
- head = stack_context._state.contexts[1]
- length = 0
-
- while head is not None:
- length += 1
- head = head.old_contexts[1]
-
- return length
-
- @gen.engine
- def inner(callback):
- yield gen.Task(self.io_loop.add_callback)
- callback()
-
- @gen.engine
- def outer():
- for i in range(10):
- yield gen.Task(inner)
-
- stack_increase = _stack_depth() - initial_stack_depth
- self.assertTrue(stack_increase <= 2)
- self.stop()
- initial_stack_depth = _stack_depth()
- self.run_gen(outer)
-
- def test_stack_context_leak_exception(self):
- # same as previous, but with a function that exits with an exception
- @gen.engine
- def inner(callback):
- yield gen.Task(self.io_loop.add_callback)
- 1 / 0
-
- @gen.engine
- def outer():
- for i in range(10):
- try:
- yield gen.Task(inner)
- except ZeroDivisionError:
- pass
- stack_increase = len(stack_context._state.contexts) - initial_stack_depth
- self.assertTrue(stack_increase <= 2)
- self.stop()
- initial_stack_depth = len(stack_context._state.contexts)
- self.run_gen(outer)
-
- def function_with_stack_context(self, callback):
- # Technically this function should stack_context.wrap its callback
- # upon entry. However, it is very common for this step to be
- # omitted.
- def step2():
- self.assertEqual(self.named_contexts, ['a'])
- self.io_loop.add_callback(callback)
-
- with stack_context.StackContext(self.named_context('a')):
- self.io_loop.add_callback(step2)
-
- @gen_test
- def test_wait_transfer_stack_context(self):
- # Wait should not pick up contexts from where callback was invoked,
- # even if that function improperly fails to wrap its callback.
- cb = yield gen.Callback('k1')
- self.function_with_stack_context(cb)
- self.assertEqual(self.named_contexts, [])
- yield gen.Wait('k1')
- self.assertEqual(self.named_contexts, [])
-
- @gen_test
- def test_task_transfer_stack_context(self):
- yield gen.Task(self.function_with_stack_context)
- self.assertEqual(self.named_contexts, [])
-
- def test_raise_after_stop(self):
- # This pattern will be used in the following tests so make sure
- # the exception propagates as expected.
- @gen.engine
- def f():
- self.stop()
- 1 / 0
-
- with self.assertRaises(ZeroDivisionError):
- self.run_gen(f)
-
- def test_sync_raise_return(self):
- # gen.Return is allowed in @gen.engine, but it may not be used
- # to return a value.
- @gen.engine
- def f():
- self.stop(42)
- raise gen.Return()
-
- result = self.run_gen(f)
- self.assertEqual(result, 42)
-
- def test_async_raise_return(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- self.stop(42)
- raise gen.Return()
-
- result = self.run_gen(f)
- self.assertEqual(result, 42)
-
- def test_sync_raise_return_value(self):
- @gen.engine
- def f():
- raise gen.Return(42)
-
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
-
- def test_sync_raise_return_value_tuple(self):
- @gen.engine
- def f():
- raise gen.Return((1, 2))
-
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
-
- def test_async_raise_return_value(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- raise gen.Return(42)
-
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
-
- def test_async_raise_return_value_tuple(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- raise gen.Return((1, 2))
-
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
-
- def test_return_value(self):
- # It is an error to apply @gen.engine to a function that returns
- # a value.
- @gen.engine
- def f():
- return 42
-
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
-
- def test_return_value_tuple(self):
- # It is an error to apply @gen.engine to a function that returns
- # a value.
- @gen.engine
- def f():
- return (1, 2)
-
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
-
- @skipNotCPython
- def test_task_refcounting(self):
- # On CPython, tasks and their arguments should be released immediately
- # without waiting for garbage collection.
- @gen.engine
- def f():
- class Foo(object):
- pass
- arg = Foo()
- self.arg_ref = weakref.ref(arg)
- task = gen.Task(self.io_loop.add_callback, arg=arg)
- self.task_ref = weakref.ref(task)
- yield task
- self.stop()
-
- self.run_gen(f)
- self.assertIs(self.arg_ref(), None)
- self.assertIs(self.task_ref(), None)
-
-
-# GenBasicTest duplicates the non-deprecated portions of GenEngineTest
-# with gen.coroutine to ensure we don't lose coverage when gen.engine
-# goes away.
class GenBasicTest(AsyncTestCase):
@gen.coroutine
def delay(self, iterations, arg):
self.assertEqual(results, [42, 43])
self.finished = True
- @skipBefore35
- @gen_test
- def test_async_await_mixed_multi_native_yieldpoint(self):
- namespace = exec_test(globals(), locals(), """
- async def f1():
- await gen.Task(self.io_loop.add_callback)
- return 42
- """)
-
- @gen.coroutine
- def f2():
- yield gen.Task(self.io_loop.add_callback)
- raise gen.Return(43)
-
- with ignore_deprecation():
- f2(callback=(yield gen.Callback('cb')))
- results = yield [namespace['f1'](), gen.Wait('cb')]
- self.assertEqual(results, [42, 43])
- self.finished = True
-
@skipBefore35
@gen_test
def test_async_with_timeout(self):
yield future
self.finished = True
- @gen_test
- def test_pass_callback(self):
- with ignore_deprecation():
- @gen.coroutine
- def f():
- raise gen.Return(42)
- result = yield gen.Task(f)
- self.assertEqual(result, 42)
- self.finished = True
-
@gen_test
def test_replace_yieldpoint_exception(self):
# Test exception handling: a coroutine can catch one exception
self.assertEqual(result, 42)
self.finished = True
- @gen_test
- def test_replace_context_exception(self):
- with ignore_deprecation():
- # Test exception handling: exceptions thrown into the stack context
- # can be caught and replaced.
- # Note that this test and the following are for behavior that is
- # not really supported any more: coroutines no longer create a
- # stack context automatically; but one is created after the first
- # YieldPoint (i.e. not a Future).
- @gen.coroutine
- def f2():
- (yield gen.Callback(1))()
- yield gen.Wait(1)
- self.io_loop.add_callback(lambda: 1 / 0)
- try:
- yield gen.Task(self.io_loop.add_timeout,
- self.io_loop.time() + 10)
- except ZeroDivisionError:
- raise KeyError()
-
- future = f2()
- with self.assertRaises(KeyError):
- yield future
- self.finished = True
-
- @gen_test
- def test_swallow_context_exception(self):
- with ignore_deprecation():
- # Test exception handling: exceptions thrown into the stack context
- # can be caught and ignored.
- @gen.coroutine
- def f2():
- (yield gen.Callback(1))()
- yield gen.Wait(1)
- self.io_loop.add_callback(lambda: 1 / 0)
- try:
- yield gen.Task(self.io_loop.add_timeout,
- self.io_loop.time() + 10)
- except ZeroDivisionError:
- raise gen.Return(42)
-
- result = yield f2()
- self.assertEqual(result, 42)
- self.finished = True
-
@gen_test
def test_moment(self):
calls = []
self.finished = True
-class GenSequenceHandler(RequestHandler):
- with ignore_deprecation():
- @asynchronous
- @gen.engine
- def get(self):
- # The outer ignore_deprecation applies at definition time.
- # We need another for serving time.
- with ignore_deprecation():
- self.io_loop = self.request.connection.stream.io_loop
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- self.write("1")
- self.io_loop.add_callback((yield gen.Callback("k2")))
- yield gen.Wait("k2")
- self.write("2")
- # reuse an old key
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- self.finish("3")
-
-
class GenCoroutineSequenceHandler(RequestHandler):
@gen.coroutine
def get(self):
self.write("3")
-class GenTaskHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- client = AsyncHTTPClient()
- with ignore_deprecation():
- response = yield gen.Task(client.fetch, self.get_argument('url'))
- response.rethrow()
- self.finish(b"got response: " + response.body)
-
-
-class GenExceptionHandler(RequestHandler):
- with ignore_deprecation():
- @asynchronous
- @gen.engine
- def get(self):
- # This test depends on the order of the two decorators.
- io_loop = self.request.connection.stream.io_loop
- yield gen.Task(io_loop.add_callback)
- raise Exception("oops")
-
-
class GenCoroutineExceptionHandler(RequestHandler):
@gen.coroutine
def get(self):
raise Exception("oops")
-class GenYieldExceptionHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- io_loop = self.request.connection.stream.io_loop
- # Test the interaction of the two stack_contexts.
- with ignore_deprecation():
- def fail_task(callback):
- io_loop.add_callback(lambda: 1 / 0)
- try:
- yield gen.Task(fail_task)
- raise Exception("did not get expected exception")
- except ZeroDivisionError:
- self.finish('ok')
-
-
# "Undecorated" here refers to the absence of @asynchronous.
class UndecoratedCoroutinesHandler(RequestHandler):
@gen.coroutine
class GenWebTest(AsyncHTTPTestCase):
def get_app(self):
return Application([
- ('/sequence', GenSequenceHandler),
('/coroutine_sequence', GenCoroutineSequenceHandler),
('/coroutine_unfinished_sequence',
GenCoroutineUnfinishedSequenceHandler),
- ('/task', GenTaskHandler),
- ('/exception', GenExceptionHandler),
('/coroutine_exception', GenCoroutineExceptionHandler),
- ('/yield_exception', GenYieldExceptionHandler),
('/undecorated_coroutine', UndecoratedCoroutinesHandler),
('/async_prepare_error', AsyncPrepareErrorHandler),
('/native_coroutine', NativeCoroutineHandler),
])
- def test_sequence_handler(self):
- response = self.fetch('/sequence')
- self.assertEqual(response.body, b"123")
-
def test_coroutine_sequence_handler(self):
response = self.fetch('/coroutine_sequence')
self.assertEqual(response.body, b"123")
response = self.fetch('/coroutine_unfinished_sequence')
self.assertEqual(response.body, b"123")
- def test_task_handler(self):
- response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence')))
- self.assertEqual(response.body, b"got response: 123")
-
- def test_exception_handler(self):
- # Make sure we get an error and not a timeout
- with ExpectLog(app_log, "Uncaught exception GET /exception"):
- response = self.fetch('/exception')
- self.assertEqual(500, response.code)
-
def test_coroutine_exception_handler(self):
# Make sure we get an error and not a timeout
with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"):
response = self.fetch('/coroutine_exception')
self.assertEqual(500, response.code)
- def test_yield_exception_handler(self):
- response = self.fetch('/yield_exception')
- self.assertEqual(response.body, b'ok')
-
def test_undecorated_coroutines(self):
response = self.fetch('/undecorated_coroutine')
self.assertEqual(response.body, b'123')
'tornado.test.queues_test',
'tornado.test.routing_test',
'tornado.test.simple_httpclient_test',
- 'tornado.test.stack_context_test',
'tornado.test.tcpclient_test',
'tornado.test.tcpserver_test',
'tornado.test.template_test',
+++ /dev/null
-from __future__ import absolute_import, division, print_function
-
-from tornado import gen
-from tornado.ioloop import IOLoop
-from tornado.log import app_log
-from tornado.stack_context import (StackContext, wrap, NullContext, StackContextInconsistentError,
- ExceptionStackContext, run_with_stack_context, _state)
-from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
-from tornado.test.util import unittest, ignore_deprecation
-from tornado.web import asynchronous, Application, RequestHandler
-import contextlib
-import functools
-import logging
-import warnings
-
-
-class TestRequestHandler(RequestHandler):
- def __init__(self, app, request):
- super(TestRequestHandler, self).__init__(app, request)
-
- with ignore_deprecation():
- @asynchronous
- def get(self):
- logging.debug('in get()')
- # call self.part2 without a self.async_callback wrapper. Its
- # exception should still get thrown
- IOLoop.current().add_callback(self.part2)
-
- def part2(self):
- logging.debug('in part2()')
- # Go through a third layer to make sure that contexts once restored
- # are again passed on to future callbacks
- IOLoop.current().add_callback(self.part3)
-
- def part3(self):
- logging.debug('in part3()')
- raise Exception('test exception')
-
- def write_error(self, status_code, **kwargs):
- if 'exc_info' in kwargs and str(kwargs['exc_info'][1]) == 'test exception':
- self.write('got expected exception')
- else:
- self.write('unexpected failure')
-
-
-class HTTPStackContextTest(AsyncHTTPTestCase):
- def get_app(self):
- return Application([('/', TestRequestHandler)])
-
- def test_stack_context(self):
- with ExpectLog(app_log, "Uncaught exception GET /"):
- with ignore_deprecation():
- self.http_client.fetch(self.get_url('/'), self.handle_response)
- self.wait()
- self.assertEqual(self.response.code, 500)
- self.assertTrue(b'got expected exception' in self.response.body)
-
- def handle_response(self, response):
- self.response = response
- self.stop()
-
-
-class StackContextTest(AsyncTestCase):
- def setUp(self):
- super(StackContextTest, self).setUp()
- self.active_contexts = []
- self.warning_catcher = warnings.catch_warnings()
- self.warning_catcher.__enter__()
- warnings.simplefilter('ignore', DeprecationWarning)
-
- def tearDown(self):
- self.warning_catcher.__exit__(None, None, None)
- super(StackContextTest, self).tearDown()
-
- @contextlib.contextmanager
- def context(self, name):
- self.active_contexts.append(name)
- yield
- self.assertEqual(self.active_contexts.pop(), name)
-
- # Simulates the effect of an asynchronous library that uses its own
- # StackContext internally and then returns control to the application.
- def test_exit_library_context(self):
- def library_function(callback):
- # capture the caller's context before introducing our own
- callback = wrap(callback)
- with StackContext(functools.partial(self.context, 'library')):
- self.io_loop.add_callback(
- functools.partial(library_inner_callback, callback))
-
- def library_inner_callback(callback):
- self.assertEqual(self.active_contexts[-2:],
- ['application', 'library'])
- callback()
-
- def final_callback():
- # implementation detail: the full context stack at this point
- # is ['application', 'library', 'application']. The 'library'
- # context was not removed, but is no longer innermost so
- # the application context takes precedence.
- self.assertEqual(self.active_contexts[-1], 'application')
- self.stop()
- with StackContext(functools.partial(self.context, 'application')):
- library_function(final_callback)
- self.wait()
-
- def test_deactivate(self):
- deactivate_callbacks = []
-
- def f1():
- with StackContext(functools.partial(self.context, 'c1')) as c1:
- deactivate_callbacks.append(c1)
- self.io_loop.add_callback(f2)
-
- def f2():
- with StackContext(functools.partial(self.context, 'c2')) as c2:
- deactivate_callbacks.append(c2)
- self.io_loop.add_callback(f3)
-
- def f3():
- with StackContext(functools.partial(self.context, 'c3')) as c3:
- deactivate_callbacks.append(c3)
- self.io_loop.add_callback(f4)
-
- def f4():
- self.assertEqual(self.active_contexts, ['c1', 'c2', 'c3'])
- deactivate_callbacks[1]()
- # deactivating a context doesn't remove it immediately,
- # but it will be missing from the next iteration
- self.assertEqual(self.active_contexts, ['c1', 'c2', 'c3'])
- self.io_loop.add_callback(f5)
-
- def f5():
- self.assertEqual(self.active_contexts, ['c1', 'c3'])
- self.stop()
- self.io_loop.add_callback(f1)
- self.wait()
-
- def test_deactivate_order(self):
- # Stack context deactivation has separate logic for deactivation at
- # the head and tail of the stack, so make sure it works in any order.
- def check_contexts():
- # Make sure that the full-context array and the exception-context
- # linked lists are consistent with each other.
- full_contexts, chain = _state.contexts
- exception_contexts = []
- while chain is not None:
- exception_contexts.append(chain)
- chain = chain.old_contexts[1]
- self.assertEqual(list(reversed(full_contexts)), exception_contexts)
- return list(self.active_contexts)
-
- def make_wrapped_function():
- """Wraps a function in three stack contexts, and returns
- the function along with the deactivation functions.
- """
- # Remove the test's stack context to make sure we can cover
- # the case where the last context is deactivated.
- with NullContext():
- partial = functools.partial
- with StackContext(partial(self.context, 'c0')) as c0:
- with StackContext(partial(self.context, 'c1')) as c1:
- with StackContext(partial(self.context, 'c2')) as c2:
- return (wrap(check_contexts), [c0, c1, c2])
-
- # First make sure the test mechanism works without any deactivations
- func, deactivate_callbacks = make_wrapped_function()
- self.assertEqual(func(), ['c0', 'c1', 'c2'])
-
- # Deactivate the tail
- func, deactivate_callbacks = make_wrapped_function()
- deactivate_callbacks[0]()
- self.assertEqual(func(), ['c1', 'c2'])
-
- # Deactivate the middle
- func, deactivate_callbacks = make_wrapped_function()
- deactivate_callbacks[1]()
- self.assertEqual(func(), ['c0', 'c2'])
-
- # Deactivate the head
- func, deactivate_callbacks = make_wrapped_function()
- deactivate_callbacks[2]()
- self.assertEqual(func(), ['c0', 'c1'])
-
- def test_isolation_nonempty(self):
- # f2 and f3 are a chain of operations started in context c1.
- # f2 is incidentally run under context c2, but that context should
- # not be passed along to f3.
- def f1():
- with StackContext(functools.partial(self.context, 'c1')):
- wrapped = wrap(f2)
- with StackContext(functools.partial(self.context, 'c2')):
- wrapped()
-
- def f2():
- self.assertIn('c1', self.active_contexts)
- self.io_loop.add_callback(f3)
-
- def f3():
- self.assertIn('c1', self.active_contexts)
- self.assertNotIn('c2', self.active_contexts)
- self.stop()
-
- self.io_loop.add_callback(f1)
- self.wait()
-
- def test_isolation_empty(self):
- # Similar to test_isolation_nonempty, but here the f2/f3 chain
- # is started without any context. Behavior should be equivalent
- # to the nonempty case (although historically it was not)
- def f1():
- with NullContext():
- wrapped = wrap(f2)
- with StackContext(functools.partial(self.context, 'c2')):
- wrapped()
-
- def f2():
- self.io_loop.add_callback(f3)
-
- def f3():
- self.assertNotIn('c2', self.active_contexts)
- self.stop()
-
- self.io_loop.add_callback(f1)
- self.wait()
-
- def test_yield_in_with(self):
- @gen.engine
- def f():
- self.callback = yield gen.Callback('a')
- with StackContext(functools.partial(self.context, 'c1')):
- # This yield is a problem: the generator will be suspended
- # and the StackContext's __exit__ is not called yet, so
- # the context will be left on _state.contexts for anything
- # that runs before the yield resolves.
- yield gen.Wait('a')
-
- with self.assertRaises(StackContextInconsistentError):
- f()
- self.wait()
- # Cleanup: to avoid GC warnings (which for some reason only seem
- # to show up on py33-asyncio), invoke the callback (which will do
- # nothing since the gen.Runner is already finished) and delete it.
- self.callback()
- del self.callback
-
- @gen_test
- def test_yield_outside_with(self):
- # This pattern avoids the problem in the previous test.
- cb = yield gen.Callback('k1')
- with StackContext(functools.partial(self.context, 'c1')):
- self.io_loop.add_callback(cb)
- yield gen.Wait('k1')
-
- def test_yield_in_with_exception_stack_context(self):
- # As above, but with ExceptionStackContext instead of StackContext.
- @gen.engine
- def f():
- with ExceptionStackContext(lambda t, v, tb: False):
- yield gen.Task(self.io_loop.add_callback)
-
- with self.assertRaises(StackContextInconsistentError):
- f()
- self.wait()
-
- @gen_test
- def test_yield_outside_with_exception_stack_context(self):
- cb = yield gen.Callback('k1')
- with ExceptionStackContext(lambda t, v, tb: False):
- self.io_loop.add_callback(cb)
- yield gen.Wait('k1')
-
- @gen_test
- def test_run_with_stack_context(self):
- @gen.coroutine
- def f1():
- self.assertEqual(self.active_contexts, ['c1'])
- yield run_with_stack_context(
- StackContext(functools.partial(self.context, 'c2')),
- f2)
- self.assertEqual(self.active_contexts, ['c1'])
-
- @gen.coroutine
- def f2():
- self.assertEqual(self.active_contexts, ['c1', 'c2'])
- yield gen.Task(self.io_loop.add_callback)
- self.assertEqual(self.active_contexts, ['c1', 'c2'])
-
- self.assertEqual(self.active_contexts, [])
- yield run_with_stack_context(
- StackContext(functools.partial(self.context, 'c1')),
- f1)
- self.assertEqual(self.active_contexts, [])
-
-
-if __name__ == '__main__':
- unittest.main()