else:
self.__put_internal(item)
- def get(self, timeout: Union[float, datetime.timedelta] = None) -> "Future[_T]":
+ def get(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[_T]:
"""Remove and return an item from the queue.
- Returns a Future which resolves once an item is available, or raises
+ Returns an awaitable which resolves once an item is available, or raises
`tornado.util.TimeoutError` after a timeout.
``timeout`` may be a number denoting a time (on the same
def join(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[None]:
"""Block until all items in the queue are processed.
- Returns a Future, which raises `tornado.util.TimeoutError` after a
+ Returns an awaitable, which raises `tornado.util.TimeoutError` after a
timeout.
"""
return self._finished.wait(timeout)
# License for the specific language governing permissions and limitations
# under the License.
+import asyncio
from datetime import timedelta
from random import random
import unittest
@gen_test
def test_get_clears_timed_out_getters(self):
q = queues.Queue() # type: queues.Queue[int]
- getters = [q.get(timedelta(seconds=0.01)) for _ in range(10)]
- get = q.get()
+ getters = [
+ asyncio.ensure_future(q.get(timedelta(seconds=0.01))) for _ in range(10)
+ ]
+ get = asyncio.ensure_future(q.get())
self.assertEqual(11, len(q._getters))
yield gen.sleep(0.02)
self.assertEqual(11, len(q._getters))
@gen_test
def test_put_clears_timed_out_getters(self):
q = queues.Queue() # type: queues.Queue[int]
- getters = [q.get(timedelta(seconds=0.01)) for _ in range(10)]
- get = q.get()
+ getters = [
+ asyncio.ensure_future(q.get(timedelta(seconds=0.01))) for _ in range(10)
+ ]
+ get = asyncio.ensure_future(q.get())
q.get()
self.assertEqual(12, len(q._getters))
yield gen.sleep(0.02)
ready.
"""
- future = self.read_queue.get()
+ awaitable = self.read_queue.get()
if callback is not None:
- self.io_loop.add_future(future, callback)
- return future
+ self.io_loop.add_future(asyncio.ensure_future(awaitable), callback)
+ return awaitable
def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]:
return self._on_message(message)