success = []
for timeout in (None, 30):
- for max_overflow in (-1, 0, 3):
+ for max_overflow in (0, -1, 3):
p = pool.QueuePool(creator=creator,
pool_size=2, timeout=timeout,
max_overflow=max_overflow)
def waiter(p):
conn = p.connect()
- time.sleep(.5)
+ time.sleep(1)
success.append(True)
conn.close()
c1 = p.connect()
c2 = p.connect()
- t = threading.Thread(target=waiter, args=(p, ))
- t.setDaemon(True) # so the tests dont hang if this fails
- t.start()
+ for i in range(2):
+ t = threading.Thread(target=waiter, args=(p, ))
+ t.setDaemon(True) # so the tests dont hang if this fails
+ t.start()
c1.invalidate()
c2.invalidate()
p2 = p._replace()
time.sleep(1)
- eq_(len(success), 6)
+ eq_(len(success), 12)
+
+ def test_notify_waiters(self):
+ dbapi = MockDBAPI()
+ canary = []
+ def creator1():
+ canary.append(1)
+ return dbapi.connect()
+ def creator2():
+ canary.append(2)
+ return dbapi.connect()
+ p1 = pool.QueuePool(creator=creator1,
+ pool_size=1, timeout=None,
+ max_overflow=0)
+ p2 = pool.QueuePool(creator=creator2,
+ pool_size=1, timeout=None,
+ max_overflow=-1)
+ def waiter(p):
+ conn = p.connect()
+ time.sleep(.5)
+ conn.close()
+
+ c1 = p1.connect()
+
+ for i in range(5):
+ t = threading.Thread(target=waiter, args=(p1, ))
+ t.setDaemon(True)
+ t.start()
+ time.sleep(.5)
+ eq_(canary, [1])
+ p1._pool.abort(p2)
+ time.sleep(1)
+ eq_(canary, [1, 2, 2, 2, 2, 2])
def test_dispose_closes_pooled(self):
dbapi = MockDBAPI()