class Multi(YieldPoint):
"""Runs multiple asynchronous operations in parallel.
- Takes a list of ``Tasks`` or other ``YieldPoints`` and returns a list of
+ Takes a list of ``YieldPoints`` or ``Futures`` and returns a list of
their responses. It is not necessary to call `Multi` explicitly,
since the engine will do so automatically when the generator yields
- a list of ``YieldPoints``.
+ a list of ``YieldPoints`` or a mixture of ``YieldPoints`` and ``Futures``.
+
+ Instead of a list, the argument may also be a dictionary whose values are
+ Futures, in which case a parallel dictionary is returned mapping the same
+ keys to their results.
"""
def __init__(self, children):
self.keys = None
return list(result)
+def multi_future(children):
+ """Wait for multiple asynchronous futures in parallel.
+
+ Takes a list of ``Futures`` (but *not* other ``YieldPoints``) and returns
+ a new Future that resolves when all the other Futures are done.
+ If all the ``Futures`` succeeded, the returned Future's result is a list
+ of their results. If any failed, the returned Future raises the exception
+ of the first one to fail.
+
+ Instead of a list, the argument may also be a dictionary whose values are
+ Futures, in which case a parallel dictionary is returned mapping the same
+ keys to their results.
+
+ It is not necessary to call `multi_future` explcitly, since the engine will
+ do so automatically when the generator yields a list of `Futures`.
+ This function is faster than the `Multi` `YieldPoint` because it does not
+ require the creation of a stack context.
+
+ .. versionadded:: 3.3
+ """
+ if isinstance(children, dict):
+ keys = list(children.keys())
+ children = children.values()
+ else:
+ keys = None
+ assert all(is_future(i) for i in children)
+ unfinished_children = set(children)
+
+ future = Future()
+ if not children:
+ future.set_result({} if keys is not None else [])
+ def callback(f):
+ unfinished_children.remove(f)
+ if not unfinished_children:
+ try:
+ result_list = [i.result() for i in children]
+ except Exception:
+ future.set_exc_info(sys.exc_info())
+ else:
+ if keys is not None:
+ future.set_result(dict(zip(keys, result_list)))
+ else:
+ future.set_result(result_list)
+ for f in children:
+ f.add_done_callback(callback)
+ return future
+
+
def maybe_future(x):
"""Converts ``x`` into a `.Future`.
self.running = False
def handle_yield(self, yielded):
- if isinstance(yielded, (list, dict)):
- yielded = Multi(yielded)
+ if isinstance(yielded, list):
+ if all(is_future(f) for f in yielded):
+ yielded = multi_future(yielded)
+ else:
+ yielded = Multi(yielded)
+ elif isinstance(yielded, dict):
+ if all(is_future(f) for f in yielded.values()):
+ yielded = multi_future(yielded)
+ else:
+ yielded = Multi(yielded)
+
if isinstance(yielded, YieldPoint):
self.future = TracebackFuture()
def start_yield_point():
self.stop()
self.run_gen(f)
- def test_multi_delayed(self):
+ # The following tests explicitly run with both gen.Multi
+ # and gen.multi_future (Task returns a Future, so it can be used
+ # with either).
+ def test_multi_yieldpoint_delayed(self):
@gen.engine
def f():
# callbacks run at different times
- responses = yield [
+ responses = yield gen.Multi([
gen.Task(self.delay_callback, 3, arg="v1"),
gen.Task(self.delay_callback, 1, arg="v2"),
- ]
+ ])
self.assertEqual(responses, ["v1", "v2"])
self.stop()
self.run_gen(f)
- def test_multi_dict_delayed(self):
+ def test_multi_yieldpoint_dict_delayed(self):
@gen.engine
def f():
# callbacks run at different times
- responses = yield dict(
+ responses = yield gen.Multi(dict(
foo=gen.Task(self.delay_callback, 3, arg="v1"),
bar=gen.Task(self.delay_callback, 1, arg="v2"),
- )
+ ))
+ self.assertEqual(responses, dict(foo="v1", bar="v2"))
+ self.stop()
+ self.run_gen(f)
+
+ def test_multi_future_delayed(self):
+ @gen.engine
+ def f():
+ # callbacks run at different times
+ responses = yield gen.multi_future([
+ gen.Task(self.delay_callback, 3, arg="v1"),
+ gen.Task(self.delay_callback, 1, arg="v2"),
+ ])
+ self.assertEqual(responses, ["v1", "v2"])
+ self.stop()
+ self.run_gen(f)
+
+ def test_multi_future_dict_delayed(self):
+ @gen.engine
+ def f():
+ # callbacks run at different times
+ responses = yield gen.multi_future(dict(
+ foo=gen.Task(self.delay_callback, 3, arg="v1"),
+ bar=gen.Task(self.delay_callback, 1, arg="v2"),
+ ))
self.assertEqual(responses, dict(foo="v1", bar="v2"))
self.stop()
self.run_gen(f)
y = yield {}
self.assertTrue(isinstance(y, dict))
+ @gen_test
+ def test_multi_mixed_types(self):
+ # A YieldPoint (Wait) and Future (Task) can be combined
+ # (and use the YieldPoint codepath)
+ (yield gen.Callback("k1"))("v1")
+ responses = yield [gen.Wait("k1"),
+ gen.Task(self.delay_callback, 3, arg="v2")]
+ self.assertEqual(responses, ["v1", "v2"])
+
@gen_test
def test_future(self):
result = yield self.async_future(1)