from tornado import gen, ioloop
from tornado.concurrent import Future, future_set_result_unless_cancelled
-from typing import Union, Optional, Type, Any
+from typing import Union, Optional, Type, Any, Awaitable
import typing
if typing.TYPE_CHECKING:
result += " waiters[%s]" % len(self._waiters)
return result + ">"
- def wait(self, timeout: Union[float, datetime.timedelta] = None) -> "Future[bool]":
+ def wait(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[bool]:
"""Wait for `.notify`.
Returns a `.Future` that resolves ``True`` if the condition is notified,
"""
self._value = False
- def wait(self, timeout: Union[float, datetime.timedelta] = None) -> "Future[None]":
+ def wait(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[None]:
"""Block until the internal flag is true.
- Returns a Future, which raises `tornado.util.TimeoutError` after a
+ Returns an awaitable, which raises `tornado.util.TimeoutError` after a
timeout.
"""
fut = Future() # type: Future[None]
def acquire(
self, timeout: Union[float, datetime.timedelta] = None
- ) -> "Future[_ReleasingContextManager]":
- """Decrement the counter. Returns a Future.
+ ) -> Awaitable[_ReleasingContextManager]:
+ """Decrement the counter. Returns an awaitable.
- Block if the counter is zero and wait for a `.release`. The Future
+ Block if the counter is zero and wait for a `.release`. The awaitable
raises `.TimeoutError` after the deadline.
"""
waiter = Future() # type: Future[_ReleasingContextManager]
return waiter
def __enter__(self) -> None:
- raise RuntimeError(
- "Use Semaphore like 'with (yield semaphore.acquire())', not like"
- " 'with semaphore'"
- )
+ raise RuntimeError("Use 'async with' instead of 'with' for Semaphore")
def __exit__(
self,
def acquire(
self, timeout: Union[float, datetime.timedelta] = None
- ) -> "Future[_ReleasingContextManager]":
- """Attempt to lock. Returns a Future.
+ ) -> Awaitable[_ReleasingContextManager]:
+ """Attempt to lock. Returns an awaitable.
- Returns a Future, which raises `tornado.util.TimeoutError` after a
+ Returns an awaitable, which raises `tornado.util.TimeoutError` after a
timeout.
"""
return self._block.acquire(timeout)
raise RuntimeError("release unlocked lock")
def __enter__(self) -> None:
- raise RuntimeError("Use Lock like 'with (yield lock)', not like 'with lock'")
+ raise RuntimeError("Use `async with` instead of `with` for Lock")
def __exit__(
self,
# License for the specific language governing permissions and limitations
# under the License.
+import asyncio
from datetime import timedelta
import typing # noqa: F401
import unittest
c = locks.Condition()
# Three waiters.
- futures = [c.wait() for _ in range(3)]
+ futures = [asyncio.ensure_future(c.wait()) for _ in range(3)]
# First and second futures resolved. Second future reenters notify(),
# resolving third future.
for _ in range(101):
c.wait(timedelta(seconds=0.01))
- future = c.wait()
+ future = asyncio.ensure_future(c.wait())
self.assertEqual(102, len(c._waiters))
# Let first 101 waiters time out, triggering a collection.
def test_event(self):
e = locks.Event()
- future_0 = e.wait()
+ future_0 = asyncio.ensure_future(e.wait())
e.set()
- future_1 = e.wait()
+ future_1 = asyncio.ensure_future(e.wait())
e.clear()
- future_2 = e.wait()
+ future_2 = asyncio.ensure_future(e.wait())
self.assertTrue(future_0.done())
self.assertTrue(future_1.done())
def test_event_wait_clear(self):
e = locks.Event()
- f0 = e.wait()
+ f0 = asyncio.ensure_future(e.wait())
e.clear()
- f1 = e.wait()
+ f1 = asyncio.ensure_future(e.wait())
e.set()
self.assertTrue(f0.done())
self.assertTrue(f1.done())
def test_acquire(self):
sem = locks.Semaphore()
- f0 = sem.acquire()
+ f0 = asyncio.ensure_future(sem.acquire())
self.assertTrue(f0.done())
# Wait for release().
- f1 = sem.acquire()
+ f1 = asyncio.ensure_future(sem.acquire())
self.assertFalse(f1.done())
- f2 = sem.acquire()
+ f2 = asyncio.ensure_future(sem.acquire())
sem.release()
self.assertTrue(f1.done())
self.assertFalse(f2.done())
sem.release()
# Now acquire() is instant.
- self.assertTrue(sem.acquire().done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
self.assertEqual(0, len(sem._waiters))
@gen_test
yield acquire
sem.acquire()
- f = sem.acquire()
+ f = asyncio.ensure_future(sem.acquire())
self.assertFalse(f.done())
sem.release()
self.assertTrue(f.done())
sem.release()
# Now the counter is 3. We can acquire three times before blocking.
- self.assertTrue(sem.acquire().done())
- self.assertTrue(sem.acquire().done())
- self.assertTrue(sem.acquire().done())
- self.assertFalse(sem.acquire().done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
+ self.assertFalse(asyncio.ensure_future(sem.acquire()).done())
@gen_test
def test_garbage_collection(self):
# Test that timed-out waiters are occasionally cleaned from the queue.
sem = locks.Semaphore(value=0)
- futures = [sem.acquire(timedelta(seconds=0.01)) for _ in range(101)]
+ futures = [
+ asyncio.ensure_future(sem.acquire(timedelta(seconds=0.01)))
+ for _ in range(101)
+ ]
- future = sem.acquire()
+ future = asyncio.ensure_future(sem.acquire())
self.assertEqual(102, len(sem._waiters))
# Let first 101 waiters time out, triggering a collection.
self.assertTrue(yielded is None)
# Semaphore was released and can be acquired again.
- self.assertTrue(sem.acquire().done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
@gen_test
def test_context_manager_async_await(self):
yield f()
# Semaphore was released and can be acquired again.
- self.assertTrue(sem.acquire().done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
@gen_test
def test_context_manager_exception(self):
1 / 0
# Semaphore was released and can be acquired again.
- self.assertTrue(sem.acquire().done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
@gen_test
def test_context_manager_timeout(self):
pass
# Semaphore was released and can be acquired again.
- self.assertTrue(sem.acquire().done())
+ self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
@gen_test
def test_context_manager_timeout_error(self):
pass
# Counter is still 0.
- self.assertFalse(sem.acquire().done())
+ self.assertFalse(asyncio.ensure_future(sem.acquire()).done())
@gen_test
def test_context_manager_contended(self):
# Value is 0.
sem.acquire()
# Block on acquire().
- future = sem.acquire()
+ future = asyncio.ensure_future(sem.acquire())
self.assertFalse(future.done())
sem.release()
self.assertTrue(future.done())
def test_acquire_release(self):
lock = locks.Lock()
- self.assertTrue(lock.acquire().done())
- future = lock.acquire()
+ self.assertTrue(asyncio.ensure_future(lock.acquire()).done())
+ future = asyncio.ensure_future(lock.acquire())
self.assertFalse(future.done())
lock.release()
self.assertTrue(future.done())
@gen_test
def test_acquire_fifo(self):
lock = locks.Lock()
- self.assertTrue(lock.acquire().done())
+ self.assertTrue(asyncio.ensure_future(lock.acquire()).done())
N = 5
history = []
# Repeat the above test using `async with lock:`
# instead of `with (yield lock.acquire()):`.
lock = locks.Lock()
- self.assertTrue(lock.acquire().done())
+ self.assertTrue(asyncio.ensure_future(lock.acquire()).done())
N = 5
history = []
yield lock.acquire(timeout=timedelta(seconds=0.01))
# Still locked.
- self.assertFalse(lock.acquire().done())
+ self.assertFalse(asyncio.ensure_future(lock.acquire()).done())
def test_multi_release(self):
lock = locks.Lock()