from __future__ import absolute_import, division, print_function
import collections
+from concurrent.futures import CancelledError
from tornado import gen, ioloop
from tornado.concurrent import Future, future_set_result_unless_cancelled
Done
"""
def __init__(self):
- self._future = Future()
+ self._value = False
+ self._waiters = set()
def __repr__(self):
return '<%s %s>' % (
def is_set(self):
"""Return ``True`` if the internal flag is true."""
- return self._future.done()
+ return self._value
def set(self):
"""Set the internal flag to ``True``. All waiters are awakened.
Calling `.wait` once the flag is set will not block.
"""
- if not self._future.done():
- self._future.set_result(None)
+ if not self._value:
+ self._value = True
+
+ for fut in self._waiters:
+ if not fut.done():
+ fut.set_result(None)
def clear(self):
"""Reset the internal flag to ``False``.
Calls to `.wait` will block until `.set` is called.
"""
- if self._future.done():
- self._future = Future()
+ self._value = False
def wait(self, timeout=None):
"""Block until the internal flag is true.
Returns a Future, which raises `tornado.util.TimeoutError` after a
timeout.
"""
+ fut = Future()
+ if self._value:
+ fut.set_result(None)
+ return fut
+ self._waiters.add(fut)
+ fut.add_done_callback(lambda fut: self._waiters.remove(fut))
if timeout is None:
- return self._future
+ return fut
else:
- return gen.with_timeout(timeout, self._future)
+ timeout_fut = gen.with_timeout(timeout, fut, quiet_exceptions=(CancelledError,))
+ # This is a slightly clumsy workaround for the fact that
+ # gen.with_timeout doesn't cancel its futures. Cancelling
+ # fut will remove it from the waiters list.
+ timeout_fut.add_done_callback(lambda tf: fut.cancel() if not fut.done() else None)
+ return timeout_fut
class _ReleasingContextManager(object):