]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add a future-based alternative to gen.Multi to avoid stack_context overhead.
authorBen Darnell <ben@bendarnell.com>
Sat, 24 May 2014 21:40:23 +0000 (17:40 -0400)
committerBen Darnell <ben@bendarnell.com>
Sat, 24 May 2014 21:40:23 +0000 (17:40 -0400)
tornado/gen.py
tornado/test/gen_test.py

index 7b6cf0ce9a69d5280e1939d247de17f8c39fc52e..e136a6a7b0863345c28e4a838f275dc566fb2590 100644 (file)
@@ -406,10 +406,14 @@ class YieldFuture(YieldPoint):
 class Multi(YieldPoint):
     """Runs multiple asynchronous operations in parallel.
 
-    Takes a list of ``Tasks`` or other ``YieldPoints`` and returns a list of
+    Takes a list of ``YieldPoints`` or ``Futures`` and returns a list of
     their responses.  It is not necessary to call `Multi` explicitly,
     since the engine will do so automatically when the generator yields
-    a list of ``YieldPoints``.
+    a list of ``YieldPoints`` or a mixture of ``YieldPoints`` and ``Futures``.
+
+    Instead of a list, the argument may also be a dictionary whose values are
+    Futures, in which case a parallel dictionary is returned mapping the same
+    keys to their results.
     """
     def __init__(self, children):
         self.keys = None
@@ -442,6 +446,54 @@ class Multi(YieldPoint):
             return list(result)
 
 
+def multi_future(children):
+    """Wait for multiple asynchronous futures in parallel.
+
+    Takes a list of ``Futures`` (but *not* other ``YieldPoints``) and returns
+    a new Future that resolves when all the other Futures are done.
+    If all the ``Futures`` succeeded, the returned Future's result is a list
+    of their results.  If any failed, the returned Future raises the exception
+    of the first one to fail.
+
+    Instead of a list, the argument may also be a dictionary whose values are
+    Futures, in which case a parallel dictionary is returned mapping the same
+    keys to their results.
+
+    It is not necessary to call `multi_future` explcitly, since the engine will
+    do so automatically when the generator yields a list of `Futures`.
+    This function is faster than the `Multi` `YieldPoint` because it does not
+    require the creation of a stack context.
+
+    .. versionadded:: 3.3
+    """
+    if isinstance(children, dict):
+        keys = list(children.keys())
+        children = children.values()
+    else:
+        keys = None
+    assert all(is_future(i) for i in children)
+    unfinished_children = set(children)
+
+    future = Future()
+    if not children:
+        future.set_result({} if keys is not None else [])
+    def callback(f):
+        unfinished_children.remove(f)
+        if not unfinished_children:
+            try:
+                result_list = [i.result() for i in children]
+            except Exception:
+                future.set_exc_info(sys.exc_info())
+            else:
+                if keys is not None:
+                    future.set_result(dict(zip(keys, result_list)))
+                else:
+                    future.set_result(result_list)
+    for f in children:
+        f.add_done_callback(callback)
+    return future
+
+
 def maybe_future(x):
     """Converts ``x`` into a `.Future`.
 
@@ -631,8 +683,17 @@ class Runner(object):
             self.running = False
 
     def handle_yield(self, yielded):
-        if isinstance(yielded, (list, dict)):
-            yielded = Multi(yielded)
+        if isinstance(yielded, list):
+            if all(is_future(f) for f in yielded):
+                yielded = multi_future(yielded)
+            else:
+                yielded = Multi(yielded)
+        elif isinstance(yielded, dict):
+            if all(is_future(f) for f in yielded.values()):
+                yielded = multi_future(yielded)
+            else:
+                yielded = Multi(yielded)
+
         if isinstance(yielded, YieldPoint):
             self.future = TracebackFuture()
             def start_yield_point():
index b00ff735eef053c8715f7e8beef764e6d40031f5..254d5175ae0d479348ac562a08e7a40599cc622a 100644 (file)
@@ -296,26 +296,53 @@ class GenEngineTest(AsyncTestCase):
             self.stop()
         self.run_gen(f)
 
-    def test_multi_delayed(self):
+    # The following tests explicitly run with both gen.Multi
+    # and gen.multi_future (Task returns a Future, so it can be used
+    # with either).
+    def test_multi_yieldpoint_delayed(self):
         @gen.engine
         def f():
             # callbacks run at different times
-            responses = yield [
+            responses = yield gen.Multi([
                 gen.Task(self.delay_callback, 3, arg="v1"),
                 gen.Task(self.delay_callback, 1, arg="v2"),
-            ]
+            ])
             self.assertEqual(responses, ["v1", "v2"])
             self.stop()
         self.run_gen(f)
 
-    def test_multi_dict_delayed(self):
+    def test_multi_yieldpoint_dict_delayed(self):
         @gen.engine
         def f():
             # callbacks run at different times
-            responses = yield dict(
+            responses = yield gen.Multi(dict(
                 foo=gen.Task(self.delay_callback, 3, arg="v1"),
                 bar=gen.Task(self.delay_callback, 1, arg="v2"),
-            )
+            ))
+            self.assertEqual(responses, dict(foo="v1", bar="v2"))
+            self.stop()
+        self.run_gen(f)
+
+    def test_multi_future_delayed(self):
+        @gen.engine
+        def f():
+            # callbacks run at different times
+            responses = yield gen.multi_future([
+                gen.Task(self.delay_callback, 3, arg="v1"),
+                gen.Task(self.delay_callback, 1, arg="v2"),
+            ])
+            self.assertEqual(responses, ["v1", "v2"])
+            self.stop()
+        self.run_gen(f)
+
+    def test_multi_future_dict_delayed(self):
+        @gen.engine
+        def f():
+            # callbacks run at different times
+            responses = yield gen.multi_future(dict(
+                foo=gen.Task(self.delay_callback, 3, arg="v1"),
+                bar=gen.Task(self.delay_callback, 1, arg="v2"),
+            ))
             self.assertEqual(responses, dict(foo="v1", bar="v2"))
             self.stop()
         self.run_gen(f)
@@ -339,6 +366,15 @@ class GenEngineTest(AsyncTestCase):
         y = yield {}
         self.assertTrue(isinstance(y, dict))
 
+    @gen_test
+    def test_multi_mixed_types(self):
+        # A YieldPoint (Wait) and Future (Task) can be combined
+        # (and use the YieldPoint codepath)
+        (yield gen.Callback("k1"))("v1")
+        responses = yield [gen.Wait("k1"),
+                           gen.Task(self.delay_callback, 3, arg="v2")]
+        self.assertEqual(responses, ["v1", "v2"])
+
     @gen_test
     def test_future(self):
         result = yield self.async_future(1)