# wait for the timeout on queue.get(). the fix involves checking the
# timeout again within the mutex, and if so, unlocking and throwing
# them back to the start of do_get()
- p = pool.QueuePool(creator = lambda: mock_dbapi.connect(delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3)
+ p = pool.QueuePool(
+ creator = lambda: mock_dbapi.connect(delay=.05),
+ pool_size = 2,
+ max_overflow = 1, use_threadlocal = False, timeout=3)
timeouts = []
def checkout():
for x in xrange(1):
print timeouts
assert len(timeouts) > 0
for t in timeouts:
- assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts)
+ assert abs(t - 3) < 1.5, "Not all timeouts were within 50% of 3 seconds: "\
+ + repr(timeouts)
def _test_overflow(self, thread_count, max_overflow):
def creator():