>>> @gen.coroutine
... def f():
... with (yield semaphore.acquire()):
- ... assert semaphore.locked()
+ ... # Do something holding the semaphore.
+ ... pass
...
- ... assert not semaphore.locked()
-
- .. note:: Unlike the standard `threading.Semaphore`, a Tornado `.Semaphore`
- can tell you the current value of its `.counter`, because code in a
- single-threaded Tornado application can check this value and act upon
- it without fear of interruption from another thread.
+ ... # Now the semaphore is released.
"""
def __init__(self, value=1):
if value < 0:
def __repr__(self):
res = super(Semaphore, self).__repr__()
- extra = 'locked' if self.locked() else 'unlocked,value:{0}'.format(
+ extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format(
self._value)
if self._waiters:
extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
return '<{0} [{1}]>'.format(res[1:-1], extra)
- @property
- def counter(self):
- """An integer, the current semaphore value."""
- return self._value
-
- def locked(self):
- """True if the semaphore cannot be acquired immediately."""
- return self._value == 0
-
def release(self):
- """Increment `.counter` and wake one waiter."""
+ """Increment the counter and wake one waiter."""
self._value += 1
for waiter in self._waiters:
if not waiter.done():
break
def acquire(self, timeout=None):
- """Decrement `.counter`. Returns a Future.
+ """Decrement the counter. Returns a Future.
Block if the counter is zero and wait for a `.release`. The Future
raises `.TimeoutError` after the deadline.
def test_acquire(self):
sem = locks.Semaphore()
- self.assertFalse(sem.locked())
f0 = sem.acquire()
self.assertTrue(f0.done())
- self.assertTrue(sem.locked())
# Wait for release().
f1 = sem.acquire()
+ self.assertFalse(f1.done())
f2 = sem.acquire()
sem.release()
self.assertTrue(f1.done())
sem = locks.Semaphore()
sem.release()
sem.release()
- self.assertEqual(3, sem.counter)
+
+ # Now the counter is 3. We can acquire three times before blocking.
+ self.assertTrue(sem.acquire().done())
+ self.assertTrue(sem.acquire().done())
+ self.assertTrue(sem.acquire().done())
+ self.assertFalse(sem.acquire().done())
class SemaphoreContextManagerTest(AsyncTestCase):
def test_context_manager(self):
sem = locks.Semaphore()
with (yield sem.acquire()) as yielded:
- self.assertTrue(sem.locked())
self.assertTrue(yielded is None)
- self.assertFalse(sem.locked())
+ # Semaphore was released and can be acquired again.
+ self.assertTrue(sem.acquire().done())
@gen_test
def test_context_manager_exception(self):
with (yield sem.acquire()):
1 / 0
- # Context manager released semaphore.
- self.assertFalse(sem.locked())
+ # Semaphore was released and can be acquired again.
+ self.assertTrue(sem.acquire().done())
@gen_test
def test_context_manager_timeout(self):