from __future__ import absolute_import, division, print_function, with_statement
-__all__ = ['Condition', 'Event', 'Semaphore']
+__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore']
import collections
" 'with semaphore'")
__exit__ = __enter__
+
+
+class BoundedSemaphore(Semaphore):
+ """A semaphore that prevents release() being called too many times.
+
+ If `.release` would increment the semaphore's value past the initial
+ value, it raises `ValueError`. Semaphores are mostly used to guard
+ resources with limited capacity, so a semaphore released too many times
+ is a sign of a bug.
+ """
+ def __init__(self, value=1):
+ super(BoundedSemaphore, self).__init__(value=value)
+ self._initial_value = value
+
+ def release(self):
+ if self._value >= self._initial_value:
+ raise ValueError("Semaphore released too many times")
+ super(BoundedSemaphore, self).release()
pass
+class BoundedSemaphoreTest(AsyncTestCase):
+ def test_release_unacquired(self):
+ sem = locks.BoundedSemaphore()
+ self.assertRaises(ValueError, sem.release)
+ # Value is 0.
+ sem.acquire()
+ # Block on acquire().
+ future = sem.acquire()
+ self.assertFalse(future.done())
+ sem.release()
+ self.assertTrue(future.done())
+ # Value is 1.
+ sem.release()
+ self.assertRaises(ValueError, sem.release)
+
if __name__ == '__main__':
unittest.main()