p = pool.QueuePool(creator=creator,
pool_size=2, timeout=timeout,
max_overflow=max_overflow)
- def waiter(p):
+ def waiter(p, timeout, max_overflow):
success_key = (timeout, max_overflow)
conn = p.connect()
time.sleep(.5)
c2 = p.connect()
for i in range(2):
- t = threading.Thread(target=waiter, args=(p, ))
+ t = threading.Thread(target=waiter,
+ args=(p, timeout, max_overflow))
t.setDaemon(True) # so the tests dont hang if this fails
t.start()
c2.invalidate()
p2 = p._replace()
time.sleep(1)
+
eq_(len(success), 12, "successes: %s" % success)
@testing.requires.threading_with_mock