from __future__ import absolute_import, division, print_function, with_statement
-__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore']
+__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock']
import collections
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
super(BoundedSemaphore, self).release()
+
+
+class Lock(object):
+ """A lock for coroutines.
+
+ A Lock begins unlocked, and `acquire` locks it immediately. While it is
+ locked, a coroutine that yields `acquire` waits until another coroutine
+ calls `release`.
+
+ Releasing an unlocked lock raises `RuntimeError`.
+
+ `acquire` supports the context manager protocol:
+
+ >>> from tornado import gen, locks
+ >>> lock = locks.Lock()
+ >>>
+ >>> @gen.coroutine
+ ... def f():
+ ... with (yield lock.acquire()):
+ ... # Do something holding the lock.
+ ... pass
+ ...
+ ... # Now the lock is released.
+
+ Coroutines waiting for `acquire` are granted the lock in first-in, first-out
+ order.
+ """
+ def __init__(self):
+ self._block = BoundedSemaphore(value=1)
+
+ def __repr__(self):
+ return "<%s _block=%s>" % (
+ self.__class__.__name__,
+ self._block)
+
+ def acquire(self, deadline=None):
+ """Attempt to lock. Returns a Future.
+
+ Returns a Future, which raises `tornado.gen.TimeoutError` after a
+ timeout.
+ """
+ return self._block.acquire(deadline)
+
+ def release(self):
+ """Unlock.
+
+ The first coroutine in line waiting for `acquire` gets the lock.
+
+ If not locked, raise a `RuntimeError`.
+ """
+ try:
+ self._block.release()
+ except ValueError:
+ raise RuntimeError('release unlocked lock')
+
+ def __enter__(self):
+ raise RuntimeError(
+ "Use Lock like 'with (yield lock)', not like 'with lock'")
+
+ __exit__ = __enter__
sem.release()
self.assertRaises(ValueError, sem.release)
+
+class LockTests(AsyncTestCase):
+ def test_repr(self):
+ lock = locks.Lock()
+ # No errors.
+ repr(lock)
+ lock.acquire()
+ repr(lock)
+
+ def test_acquire_release(self):
+ lock = locks.Lock()
+ self.assertTrue(lock.acquire().done())
+ future = lock.acquire()
+ self.assertFalse(future.done())
+ lock.release()
+ self.assertTrue(future.done())
+
+ @gen_test
+ def test_acquire_fifo(self):
+ lock = locks.Lock()
+ self.assertTrue(lock.acquire().done())
+ N = 5
+ history = []
+
+ @gen.coroutine
+ def f(idx):
+ with (yield lock.acquire()):
+ history.append(idx)
+
+ futures = [f(i) for i in range(N)]
+ self.assertFalse(any(future.done() for future in futures))
+ lock.release()
+ yield futures
+ self.assertEqual(range(N), history)
+
+ @gen_test
+ def test_acquire_timeout(self):
+ lock = locks.Lock()
+ lock.acquire()
+ with self.assertRaises(gen.TimeoutError):
+ yield lock.acquire(deadline=timedelta(seconds=0.01))
+
+ # Still locked.
+ self.assertFalse(lock.acquire().done())
+
+ def test_multi_release(self):
+ lock = locks.Lock()
+ self.assertRaises(RuntimeError, lock.release)
+ lock.acquire()
+ lock.release()
+ self.assertRaises(RuntimeError, lock.release)
+
+ @gen_test
+ def test_yield_lock(self):
+ # Ensure we catch a "with (yield lock)", which should be
+ # "with (yield lock.acquire())".
+ with self.assertRaises(gen.BadYieldError):
+ with (yield locks.Lock()):
+ pass
+
+ def test_context_manager_misuse(self):
+ # Ensure we catch a "with lock", which should be
+ # "with (yield lock.acquire())".
+ with self.assertRaises(RuntimeError):
+ with locks.Lock():
+ pass
+
+
if __name__ == '__main__':
unittest.main()