.. testsetup:: semaphore
- from tornado import gen
+ from collections import deque
- # Ensure reliable doctest output.
- waits = [0.1, 0.2, 0.1]
+ from tornado import gen, ioloop
+ from tornado.concurrent import Future
+
+ # Ensure reliable doctest output: resolve Futures one at a time.
+ futures_q = deque([Future() for _ in range(3)])
@gen.coroutine
+ def simulator(futures):
+ for f in futures:
+ yield gen.moment
+ f.set_result(None)
+
+ ioloop.IOLoop.current().add_callback(simulator, list(futures_q))
+
def use_some_resource():
- yield gen.sleep(waits.pop())
+ return futures_q.popleft()
.. testcode:: semaphore