if self.qsize():
future = Future()
- # Defer unblocking the getter, which might do task_done() without
- # yielding first. We want putters to have a chance to run first and
- # keep join() blocked. See test_producer_consumer().
- ioloop.IOLoop.current().add_callback(
- partial(future.set_result, self._get()))
+ future.set_result(self._get())
return future
else:
future = Future()
yield q.join()
self.assertEqual(sum(range(100)), self.accumulator)
+ @gen_test
+ def test_task_done_delay(self):
+ # Verify it is task_done(), not get(), that unblocks join().
+ q = queues.Queue()
+ q.put_nowait(0)
+ join = q.join()
+ self.assertFalse(join.done())
+ yield q.get()
+ self.assertFalse(join.done())
+ yield gen.moment
+ self.assertFalse(join.done())
+ q.task_done()
+ self.assertTrue(join.done())
+
@gen_test
def test_join_empty_queue(self):
q = queues.Queue()
# We don't yield between get() and task_done(), so get() must wait for
# the next tick. Otherwise we'd immediately call task_done and unblock
# join() before q.put() resumes, and we'd only process the first four
- # items. Consumers would normally yield in the course of processing an
- # item, but it's worthwhile testing the degenerate case.
+ # items.
@gen.coroutine
def consumer():
while True:
for item in range(10):
yield q.put(item)
- producer()
consumer()
+ yield producer()
yield q.join()
self.assertEqual(list(range(10)), history)