lambda _: io_loop.remove_timeout(timeout_handle))
+class _QueueIterator(object):
+ def __init__(self, q):
+ self.q = q
+
+ def __anext__(self):
+ return self.q.get()
+
+
class Queue(object):
"""Coordinate producer and consumer coroutines.
Doing work on 3
Doing work on 4
Done
+
+ In Python 3.5, `Queue` implements the async iterator protocol, so
+ ``consumer()`` could be rewritten as::
+
+ async def consumer():
+ async for item in q:
+ try:
+ print('Doing work on %s' % item)
+ yield gen.sleep(0.01)
+ finally:
+ q.task_done()
+
"""
def __init__(self, maxsize=0):
if maxsize is None:
"""
return self._finished.wait(timeout)
+ @gen.coroutine
+ def __aiter__(self):
+ return _QueueIterator(self)
+
# These three are overridable in subclasses.
def _init(self):
self._queue = collections.deque()
from datetime import timedelta
from random import random
+import sys
+import textwrap
from tornado import gen, queues
from tornado.gen import TimeoutError
from tornado.testing import gen_test, AsyncTestCase
-from tornado.test.util import unittest
+from tornado.test.util import unittest, skipBefore35, exec_test
class QueueBasicTest(AsyncTestCase):
get = q.get()
with self.assertRaises(TimeoutError):
yield get_timeout
-
+
q.put_nowait(0)
self.assertEqual(0, (yield get))
for getter in getters:
self.assertRaises(TimeoutError, getter.result)
+ @skipBefore35
+ @gen_test
+ def test_async_for(self):
+ q = queues.Queue()
+ for i in range(5):
+ q.put(i)
+
+ namespace = exec_test(globals(), locals(), """
+ async def f():
+ results = []
+ async for i in q:
+ results.append(i)
+ if i == 4:
+ return results
+ """)
+ results = yield namespace['f']()
+ self.assertEqual(results, list(range(5)))
+
class QueuePutTest(AsyncTestCase):
@gen_test
self.assertEqual(0, (yield get0))
yield q.put(1)
self.assertEqual(1, (yield get1))
-
+
@gen_test
def test_nonblocking_put_with_getters(self):
q = queues.Queue()
put = q.put(2)
with self.assertRaises(TimeoutError):
yield put_timeout
-
+
self.assertEqual(0, q.get_nowait())
# 1 was never put in the queue.
self.assertEqual(2, (yield q.get()))
class QueueJoinTest(AsyncTestCase):
queue_class = queues.Queue
-
+
def test_task_done_underflow(self):
q = self.queue_class()
self.assertRaises(ValueError, q.task_done)
class PriorityQueueJoinTest(QueueJoinTest):
queue_class = queues.PriorityQueue
-
+
@gen_test
def test_order(self):
q = self.queue_class(maxsize=2)