]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Optimize gen.with_timeout for tornado futures.
authorBen Darnell <ben@bendarnell.com>
Sun, 20 Apr 2014 01:22:43 +0000 (21:22 -0400)
committerBen Darnell <ben@bendarnell.com>
Sun, 20 Apr 2014 01:22:43 +0000 (21:22 -0400)
We can avoid some IOLoop and stack_context interactions if we know
that the Future will resolve on the IOLoop's thread.

tornado/gen.py
tornado/test/gen_test.py

index 28c031170068516c77c4cf23f5d00c99a885dda1..c3fee01f04b4e1fa524613587fe2aaa68da131a1 100644 (file)
@@ -471,9 +471,11 @@ def with_timeout(timeout, future, io_loop=None):
     # TODO: allow yield points in addition to futures?
     # Tricky to do with stack_context semantics.
     #
-    # It would be more efficient to cancel the input future on timeout instead
-    # of creating a new one, but we can't know if we are the only one waiting
-    # on the input future, so cancelling it might disrupt other callers.
+    # It's tempting to optimize this by cancelling the input future on timeout
+    # instead of creating a new one, but A) we can't know if we are the only
+    # one waiting on the input future, so cancelling it might disrupt other
+    # callers and B) concurrent futures can only be cancelled while they are
+    # in the queue, so cancellation cannot reliably bound our waiting time.
     result = Future()
     chain_future(future, result)
     if io_loop is None:
@@ -481,8 +483,17 @@ def with_timeout(timeout, future, io_loop=None):
     timeout_handle = io_loop.add_timeout(
         timeout,
         lambda: result.set_exception(TimeoutError("Timeout")))
-    io_loop.add_future(future,
-                        lambda future: io_loop.remove_timeout(timeout_handle))
+    if isinstance(future, Future):
+        # We know this future will resolve on the IOLoop, so we don't
+        # need the extra thread-safety of IOLoop.add_future (and we also
+        # don't care about StackContext here.
+        future.add_done_callback(
+            lambda future: io_loop.remove_timeout(timeout_handle))
+    else:
+        # concurrent.futures.Futures may resolve on any thread, so we
+        # need to route them back to the IOLoop.
+        io_loop.add_future(
+            future, lambda future: io_loop.remove_timeout(timeout_handle))
     return result
 
 
index 3045e58bf3abf0dd44607e87d9134e3d9fa53144..ffe1c7d3c30380baf75c76dfb2f1f75c10c40ad9 100644 (file)
@@ -21,6 +21,10 @@ from tornado.web import Application, RequestHandler, asynchronous, HTTPError
 
 from tornado import gen
 
+try:
+    from concurrent import futures
+except ImportError:
+    futures = None
 
 skipBefore33 = unittest.skipIf(sys.version_info < (3, 3), 'PEP 380 not available')
 skipNotCPython = unittest.skipIf(platform.python_implementation() != 'CPython',
@@ -984,6 +988,21 @@ class WithTimeoutTest(AsyncTestCase):
                                         future)
         self.assertEqual(result, 'asdf')
 
+    @unittest.skipIf(futures is None, 'futures module not present')
+    @gen_test
+    def test_timeout_concurrent_future(self):
+        with futures.ThreadPoolExecutor(1) as executor:
+            with self.assertRaises(gen.TimeoutError):
+                yield gen.with_timeout(self.io_loop.time(),
+                                       executor.submit(time.sleep, 0.1))
+
+    @unittest.skipIf(futures is None, 'futures module not present')
+    @gen_test
+    def test_completed_concurrent_future(self):
+        with futures.ThreadPoolExecutor(1) as executor:
+            yield gen.with_timeout(datetime.timedelta(seconds=3600),
+                                   executor.submit(lambda: None))
+
 
 if __name__ == '__main__':
     unittest.main()