self.assertTrue(future.done())
@gen_test
- def test_acquire_contended(self):
+ def test_acquire_fifo(self):
lock = locks.Lock()
self.assertTrue(lock.acquire().done())
N = 5
+ history = []
@gen.coroutine
- def f():
+ def f(idx):
with (yield lock.acquire()):
- pass
+ history.append(idx)
- futures = [f() for _ in range(N)]
+ futures = [f(i) for i in range(N)]
self.assertFalse(any(future.done() for future in futures))
lock.release()
yield futures
+ self.assertEqual(list(range(N)), history)
@gen_test
def test_acquire_timeout(self):