# TODO: allow yield points in addition to futures?
# Tricky to do with stack_context semantics.
#
- # It would be more efficient to cancel the input future on timeout instead
- # of creating a new one, but we can't know if we are the only one waiting
- # on the input future, so cancelling it might disrupt other callers.
+ # It's tempting to optimize this by cancelling the input future on timeout
+ # instead of creating a new one, but A) we can't know if we are the only
+ # one waiting on the input future, so cancelling it might disrupt other
+ # callers and B) concurrent futures can only be cancelled while they are
+ # in the queue, so cancellation cannot reliably bound our waiting time.
result = Future()
chain_future(future, result)
if io_loop is None:
timeout_handle = io_loop.add_timeout(
timeout,
lambda: result.set_exception(TimeoutError("Timeout")))
- io_loop.add_future(future,
- lambda future: io_loop.remove_timeout(timeout_handle))
+ if isinstance(future, Future):
+ # We know this future will resolve on the IOLoop, so we don't
+ # need the extra thread-safety of IOLoop.add_future (and we also
+ # don't care about StackContext here.
+ future.add_done_callback(
+ lambda future: io_loop.remove_timeout(timeout_handle))
+ else:
+ # concurrent.futures.Futures may resolve on any thread, so we
+ # need to route them back to the IOLoop.
+ io_loop.add_future(
+ future, lambda future: io_loop.remove_timeout(timeout_handle))
return result
from tornado import gen
+try:
+ from concurrent import futures
+except ImportError:
+ futures = None
skipBefore33 = unittest.skipIf(sys.version_info < (3, 3), 'PEP 380 not available')
skipNotCPython = unittest.skipIf(platform.python_implementation() != 'CPython',
future)
self.assertEqual(result, 'asdf')
+ @unittest.skipIf(futures is None, 'futures module not present')
+ @gen_test
+ def test_timeout_concurrent_future(self):
+ with futures.ThreadPoolExecutor(1) as executor:
+ with self.assertRaises(gen.TimeoutError):
+ yield gen.with_timeout(self.io_loop.time(),
+ executor.submit(time.sleep, 0.1))
+
+ @unittest.skipIf(futures is None, 'futures module not present')
+ @gen_test
+ def test_completed_concurrent_future(self):
+ with futures.ThreadPoolExecutor(1) as executor:
+ yield gen.with_timeout(datetime.timedelta(seconds=3600),
+ executor.submit(lambda: None))
+
if __name__ == '__main__':
unittest.main()