From 7b1a1a66cd36fdfac6541e6b771fd6c849b0bd7d Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 25 Jun 2012 11:15:50 -0400 Subject: [PATCH] - change notify to notify_all() so all waiters exit immediately, continuing [ticket:2522] --- lib/sqlalchemy/util/queue.py | 2 +- test/engine/test_pool.py | 45 +++++++++++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index ebf7363318..9e17527b7b 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -183,7 +183,7 @@ class Queue: if not self.not_full.acquire(False): return try: - self.not_empty.notify() + self.not_empty.notify_all() finally: self.not_full.release() diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 4d55728916..a6c2b6250b 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -814,13 +814,13 @@ class QueuePoolTest(PoolTestBase): success = [] for timeout in (None, 30): - for max_overflow in (-1, 0, 3): + for max_overflow in (0, -1, 3): p = pool.QueuePool(creator=creator, pool_size=2, timeout=timeout, max_overflow=max_overflow) def waiter(p): conn = p.connect() - time.sleep(.5) + time.sleep(1) success.append(True) conn.close() @@ -828,15 +828,48 @@ class QueuePoolTest(PoolTestBase): c1 = p.connect() c2 = p.connect() - t = threading.Thread(target=waiter, args=(p, )) - t.setDaemon(True) # so the tests dont hang if this fails - t.start() + for i in range(2): + t = threading.Thread(target=waiter, args=(p, )) + t.setDaemon(True) # so the tests dont hang if this fails + t.start() c1.invalidate() c2.invalidate() p2 = p._replace() time.sleep(1) - eq_(len(success), 6) + eq_(len(success), 12) + + def test_notify_waiters(self): + dbapi = MockDBAPI() + canary = [] + def creator1(): + canary.append(1) + return dbapi.connect() + def creator2(): + canary.append(2) + return dbapi.connect() + p1 = pool.QueuePool(creator=creator1, + pool_size=1, timeout=None, + max_overflow=0) + p2 = pool.QueuePool(creator=creator2, + pool_size=1, timeout=None, + max_overflow=-1) + def waiter(p): + conn = p.connect() + time.sleep(.5) + conn.close() + + c1 = p1.connect() + + for i in range(5): + t = threading.Thread(target=waiter, args=(p1, )) + t.setDaemon(True) + t.start() + time.sleep(.5) + eq_(canary, [1]) + p1._pool.abort(p2) + time.sleep(1) + eq_(canary, [1, 2, 2, 2, 2, 2]) def test_dispose_closes_pooled(self): dbapi = MockDBAPI() -- 2.47.3