Returns a Future, which raises `tornado.gen.TimeoutError` after a
timeout.
"""
- self._consume_expired()
- if self._getters:
- assert not self._queue, "queue non-empty, why are getters waiting?"
- getter = self._getters.popleft()
- self._put(item)
- getter.set_result(self._get())
- return gen._null_future
- elif self.full():
+ try:
+ self.put_nowait(item)
+ except QueueFull:
future = Future()
self._putters.append((item, future))
if timeout:
ioloop.IOLoop.current().add_timeout(timeout, on_timeout)
return future
else:
- self._put(item)
return gen._null_future
def put_nowait(self, item):
if self._getters:
assert self.empty(), "queue non-empty, why are getters waiting?"
getter = self._getters.popleft()
-
self._put(item)
getter.set_result(self._get())
elif self.full():
Returns a Future which resolves once an item is available, or raises
`tornado.gen.TimeoutError` after a timeout.
"""
- self._consume_expired()
- if self._putters:
- assert self.full(), "queue not full, why are putters waiting?"
- item, putter = self._putters.popleft()
- self._put(item)
- putter.set_result(None)
-
- if self.qsize():
- future = Future()
- future.set_result(self._get())
- return future
- else:
- future = Future()
+ future = Future()
+ try:
+ future.set_result(self.get_nowait())
+ except QueueEmpty:
self._getters.append(future)
if timeout:
def on_timeout():
future.set_exception(gen.TimeoutError())
ioloop.IOLoop.current().add_timeout(timeout, on_timeout)
- return future
+ return future
def get_nowait(self):
"""Remove and return an item from the queue without blocking.