# We run many threads in the hope that existing threads ids won't
# be recycled.
Bunch(f, 15).wait_for_finished()
- self.assertEqual(n, len(threading.enumerate()))
+ if len(threading.enumerate()) != n:
+ # There is a small window during which a Thread instance's
+ # target function has finished running, but the Thread is still
+ # alive and registered. Avoid spurious failures by waiting a
+ # bit more (seen on a buildbot).
+ time.sleep(0.4)
+ self.assertEqual(n, len(threading.enumerate()))
+ def test_timeout(self):
+ lock = self.locktype()
+ # Can't set timeout if not blocking
+ self.assertRaises(ValueError, lock.acquire, 0, 1)
+ # Invalid timeout values
+ self.assertRaises(ValueError, lock.acquire, timeout=-100)
+ self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
+ self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
+ # TIMEOUT_MAX is ok
+ lock.acquire(timeout=TIMEOUT_MAX)
+ lock.release()
+ t1 = time.time()
+ self.assertTrue(lock.acquire(timeout=5))
+ t2 = time.time()
+ # Just a sanity test that it didn't actually wait for the timeout.
+ self.assertLess(t2 - t1, 5)
+ results = []
+ def f():
+ t1 = time.time()
+ results.append(lock.acquire(timeout=0.5))
+ t2 = time.time()
+ results.append(t2 - t1)
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(results[0])
+ self.assertTimeout(results[1], 0.5)
+
class LockTests(BaseLockTests):
"""