]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- change notify to notify_all() so all waiters exit immediately,
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 25 Jun 2012 15:15:50 +0000 (11:15 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 25 Jun 2012 15:15:50 +0000 (11:15 -0400)
continuing [ticket:2522]

lib/sqlalchemy/util/queue.py
test/engine/test_pool.py

index ebf7363318f76ebfaf8c50067cdd49146ace4ab6..9e17527b7b21ed63d5690b4979f757ff175faec4 100644 (file)
@@ -183,7 +183,7 @@ class Queue:
         if not self.not_full.acquire(False):
             return
         try:
-            self.not_empty.notify()
+            self.not_empty.notify_all()
         finally:
             self.not_full.release()
 
index 4d55728916e9bbbcf5fc0a4bda9a79b8cbe508e1..a6c2b6250b666eaa3cdc3d0aa0c1b158523ed640 100644 (file)
@@ -814,13 +814,13 @@ class QueuePoolTest(PoolTestBase):
  
         success = []
         for timeout in (None, 30):
-            for max_overflow in (-1, 0, 3):
+            for max_overflow in (0, -1, 3):
                 p = pool.QueuePool(creator=creator,
                                    pool_size=2, timeout=timeout,
                                    max_overflow=max_overflow)
                 def waiter(p):
                     conn = p.connect()
-                    time.sleep(.5)
+                    time.sleep(1)
                     success.append(True)
                     conn.close()
 
@@ -828,15 +828,48 @@ class QueuePoolTest(PoolTestBase):
                 c1 = p.connect()
                 c2 = p.connect()
 
-                t = threading.Thread(target=waiter, args=(p, ))
-                t.setDaemon(True) # so the tests dont hang if this fails
-                t.start()
+                for i in range(2):
+                    t = threading.Thread(target=waiter, args=(p, ))
+                    t.setDaemon(True) # so the tests dont hang if this fails
+                    t.start()
 
                 c1.invalidate()
                 c2.invalidate()
                 p2 = p._replace()
         time.sleep(1)
-        eq_(len(success), 6)
+        eq_(len(success), 12)
+
+    def test_notify_waiters(self):
+        dbapi = MockDBAPI()
+        canary = []
+        def creator1():
+            canary.append(1)
+            return dbapi.connect()
+        def creator2():
+            canary.append(2)
+            return dbapi.connect()
+        p1 = pool.QueuePool(creator=creator1,
+                           pool_size=1, timeout=None,
+                           max_overflow=0)
+        p2 = pool.QueuePool(creator=creator2,
+                           pool_size=1, timeout=None,
+                           max_overflow=-1)
+        def waiter(p):
+            conn = p.connect()
+            time.sleep(.5)
+            conn.close()
+
+        c1 = p1.connect()
+
+        for i in range(5):
+            t = threading.Thread(target=waiter, args=(p1, ))
+            t.setDaemon(True)
+            t.start()
+        time.sleep(.5)
+        eq_(canary, [1])
+        p1._pool.abort(p2)
+        time.sleep(1)
+        eq_(canary, [1, 2, 2, 2, 2, 2])
 
     def test_dispose_closes_pooled(self):
         dbapi = MockDBAPI()