@gen_test
def test_context_manager_timeout(self):
+ sem = locks.Semaphore()
+ with (yield sem.acquire(timedelta(seconds=0.01))):
+ pass
+
+ # Semaphore was released and can be acquired again.
+ self.assertTrue(sem.acquire().done())
+
+ @gen_test
+ def test_context_manager_timeout_error(self):
sem = locks.Semaphore(value=0)
with self.assertRaises(gen.TimeoutError):
with (yield sem.acquire(timedelta(seconds=0.01))):
pass
+ # Counter is still 0.
+ self.assertFalse(sem.acquire().done())
+
@gen_test
def test_context_manager_contended(self):
sem = locks.Semaphore()