pass
+def _set_timeout(future, timeout):
+ if timeout:
+ def on_timeout():
+ future.set_exception(gen.TimeoutError())
+ io_loop = ioloop.IOLoop.current()
+ timeout_handle = io_loop.add_timeout(timeout, on_timeout)
+ future.add_done_callback(
+ lambda _: io_loop.remove_timeout(timeout_handle))
+
+
class Queue(object):
"""Coordinate producer and consumer coroutines.
except QueueFull:
future = Future()
self._putters.append((item, future))
- if timeout:
- def on_timeout():
- future.set_exception(gen.TimeoutError())
- ioloop.IOLoop.current().add_timeout(timeout, on_timeout)
+ _set_timeout(future, timeout)
return future
else:
return gen._null_future
future.set_result(self.get_nowait())
except QueueEmpty:
self._getters.append(future)
- if timeout:
- def on_timeout():
- future.set_exception(gen.TimeoutError())
- ioloop.IOLoop.current().add_timeout(timeout, on_timeout)
+ _set_timeout(future, timeout)
return future
def get_nowait(self):
q.put_nowait(0)
self.assertEqual(0, (yield get))
+ @gen_test
+ def test_get_timeout_preempted(self):
+ q = queues.Queue()
+ get = q.get(timeout=timedelta(seconds=0.01))
+ q.put(0)
+ yield gen.sleep(0.02)
+ self.assertEqual(0, (yield get))
+
@gen_test
def test_get_clears_timed_out_putters(self):
q = queues.Queue(1)
# Final get() unblocked this putter.
yield put
+ @gen_test
+ def test_put_timeout_preempted(self):
+ q = queues.Queue(1)
+ q.put_nowait(0)
+ put = q.put(1, timeout=timedelta(seconds=0.01))
+ q.get()
+ yield gen.sleep(0.02)
+ yield put # No TimeoutError.
+
@gen_test
def test_put_clears_timed_out_putters(self):
q = queues.Queue(1)