From: Ben Darnell Date: Sat, 24 May 2014 21:40:23 +0000 (-0400) Subject: Add a future-based alternative to gen.Multi to avoid stack_context overhead. X-Git-Tag: v4.0.0b1~50 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=da552790c3f65a8b62f05be9a008ae44950790c6;p=thirdparty%2Ftornado.git Add a future-based alternative to gen.Multi to avoid stack_context overhead. --- diff --git a/tornado/gen.py b/tornado/gen.py index 7b6cf0ce9..e136a6a7b 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -406,10 +406,14 @@ class YieldFuture(YieldPoint): class Multi(YieldPoint): """Runs multiple asynchronous operations in parallel. - Takes a list of ``Tasks`` or other ``YieldPoints`` and returns a list of + Takes a list of ``YieldPoints`` or ``Futures`` and returns a list of their responses. It is not necessary to call `Multi` explicitly, since the engine will do so automatically when the generator yields - a list of ``YieldPoints``. + a list of ``YieldPoints`` or a mixture of ``YieldPoints`` and ``Futures``. + + Instead of a list, the argument may also be a dictionary whose values are + Futures, in which case a parallel dictionary is returned mapping the same + keys to their results. """ def __init__(self, children): self.keys = None @@ -442,6 +446,54 @@ class Multi(YieldPoint): return list(result) +def multi_future(children): + """Wait for multiple asynchronous futures in parallel. + + Takes a list of ``Futures`` (but *not* other ``YieldPoints``) and returns + a new Future that resolves when all the other Futures are done. + If all the ``Futures`` succeeded, the returned Future's result is a list + of their results. If any failed, the returned Future raises the exception + of the first one to fail. + + Instead of a list, the argument may also be a dictionary whose values are + Futures, in which case a parallel dictionary is returned mapping the same + keys to their results. + + It is not necessary to call `multi_future` explcitly, since the engine will + do so automatically when the generator yields a list of `Futures`. + This function is faster than the `Multi` `YieldPoint` because it does not + require the creation of a stack context. + + .. versionadded:: 3.3 + """ + if isinstance(children, dict): + keys = list(children.keys()) + children = children.values() + else: + keys = None + assert all(is_future(i) for i in children) + unfinished_children = set(children) + + future = Future() + if not children: + future.set_result({} if keys is not None else []) + def callback(f): + unfinished_children.remove(f) + if not unfinished_children: + try: + result_list = [i.result() for i in children] + except Exception: + future.set_exc_info(sys.exc_info()) + else: + if keys is not None: + future.set_result(dict(zip(keys, result_list))) + else: + future.set_result(result_list) + for f in children: + f.add_done_callback(callback) + return future + + def maybe_future(x): """Converts ``x`` into a `.Future`. @@ -631,8 +683,17 @@ class Runner(object): self.running = False def handle_yield(self, yielded): - if isinstance(yielded, (list, dict)): - yielded = Multi(yielded) + if isinstance(yielded, list): + if all(is_future(f) for f in yielded): + yielded = multi_future(yielded) + else: + yielded = Multi(yielded) + elif isinstance(yielded, dict): + if all(is_future(f) for f in yielded.values()): + yielded = multi_future(yielded) + else: + yielded = Multi(yielded) + if isinstance(yielded, YieldPoint): self.future = TracebackFuture() def start_yield_point(): diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py index b00ff735e..254d5175a 100644 --- a/tornado/test/gen_test.py +++ b/tornado/test/gen_test.py @@ -296,26 +296,53 @@ class GenEngineTest(AsyncTestCase): self.stop() self.run_gen(f) - def test_multi_delayed(self): + # The following tests explicitly run with both gen.Multi + # and gen.multi_future (Task returns a Future, so it can be used + # with either). + def test_multi_yieldpoint_delayed(self): @gen.engine def f(): # callbacks run at different times - responses = yield [ + responses = yield gen.Multi([ gen.Task(self.delay_callback, 3, arg="v1"), gen.Task(self.delay_callback, 1, arg="v2"), - ] + ]) self.assertEqual(responses, ["v1", "v2"]) self.stop() self.run_gen(f) - def test_multi_dict_delayed(self): + def test_multi_yieldpoint_dict_delayed(self): @gen.engine def f(): # callbacks run at different times - responses = yield dict( + responses = yield gen.Multi(dict( foo=gen.Task(self.delay_callback, 3, arg="v1"), bar=gen.Task(self.delay_callback, 1, arg="v2"), - ) + )) + self.assertEqual(responses, dict(foo="v1", bar="v2")) + self.stop() + self.run_gen(f) + + def test_multi_future_delayed(self): + @gen.engine + def f(): + # callbacks run at different times + responses = yield gen.multi_future([ + gen.Task(self.delay_callback, 3, arg="v1"), + gen.Task(self.delay_callback, 1, arg="v2"), + ]) + self.assertEqual(responses, ["v1", "v2"]) + self.stop() + self.run_gen(f) + + def test_multi_future_dict_delayed(self): + @gen.engine + def f(): + # callbacks run at different times + responses = yield gen.multi_future(dict( + foo=gen.Task(self.delay_callback, 3, arg="v1"), + bar=gen.Task(self.delay_callback, 1, arg="v2"), + )) self.assertEqual(responses, dict(foo="v1", bar="v2")) self.stop() self.run_gen(f) @@ -339,6 +366,15 @@ class GenEngineTest(AsyncTestCase): y = yield {} self.assertTrue(isinstance(y, dict)) + @gen_test + def test_multi_mixed_types(self): + # A YieldPoint (Wait) and Future (Task) can be combined + # (and use the YieldPoint codepath) + (yield gen.Callback("k1"))("v1") + responses = yield [gen.Wait("k1"), + gen.Task(self.delay_callback, 3, arg="v2")] + self.assertEqual(responses, ["v1", "v2"]) + @gen_test def test_future(self): result = yield self.async_future(1)