]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Made a slight adjustment to the logic which waits for a pooled
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 6 Dec 2013 20:53:59 +0000 (15:53 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 6 Dec 2013 21:33:09 +0000 (16:33 -0500)
connection to be available, such that for a connection pool
with no timeout specified, it will every half a second break out of
the wait to check for the so-called "abort" flag, which allows the
waiter to break out in case the whole connection pool was dumped;
normally the waiter should break out due to a notify_all() but it's
possible this notify_all() is missed in very slim cases.
This is an extension of logic first introduced in 0.8.0, and the
issue has only been observed occasionally in stress tests.

doc/build/changelog/changelog_08.rst
lib/sqlalchemy/util/queue.py
test/engine/test_pool.py

index fcad47c6f0ac631dc9c11275185bb531a8f4d881..c5f882f7f625b89d0fee9e3bccef8fd6c43da526 100644 (file)
 .. changelog::
     :version: 0.8.4
 
+     .. change::
+        :tags: bug, engine, pool
+        :versions: 0.9.0b2
+        :tickets: 2522
+
+        Made a slight adjustment to the logic which waits for a pooled
+        connection to be available, such that for a connection pool
+        with no timeout specified, it will every half a second break out of
+        the wait to check for the so-called "abort" flag, which allows the
+        waiter to break out in case the whole connection pool was dumped;
+        normally the waiter should break out due to a notify_all() but it's
+        possible this notify_all() is missed in very slim cases.
+        This is an extension of logic first introduced in 0.8.0, and the
+        issue has only been observed occasionally in stress tests.
+
      .. change::
         :tags: bug, mssql
         :versions: 0.9.0b2
index 537526beffaf56549e00d5c9bcacc88969edae81..6223641d1b4d00a4c3c948128a4c4ccde966cf5f 100644 (file)
@@ -158,7 +158,6 @@ class Queue:
         return an item if one is immediately available, else raise the
         ``Empty`` exception (`timeout` is ignored in that case).
         """
-
         self.not_empty.acquire()
         try:
             if not block:
@@ -166,7 +165,11 @@ class Queue:
                     raise Empty
             elif timeout is None:
                 while self._empty():
-                    self.not_empty.wait()
+                    # wait for only half a second, then
+                    # loop around, so that we can see a change in
+                    # _sqla_abort_context in case we missed the notify_all()
+                    # called by abort()
+                    self.not_empty.wait(.5)
                     if self._sqla_abort_context:
                         raise SAAbort(self._sqla_abort_context)
             else:
@@ -195,6 +198,9 @@ class Queue:
         if not self.not_full.acquire(False):
             return
         try:
+            # note that this is now optional
+            # as the waiters in get() both loop around
+            # to check the _sqla_abort_context flag periodically
             notify_all(self.not_empty)
         finally:
             self.not_full.release()
index a261a2aebb6586aae34c5f6717020351c9d234c6..e0a9c602488275557735d6e2cbb44f5952c237cf 100644 (file)
@@ -881,9 +881,14 @@ class QueuePoolTest(PoolTestBase):
         handled when the pool is replaced.
 
         """
+        mutex = threading.Lock()
         dbapi = MockDBAPI()
         def creator():
-            return dbapi.connect()
+            mutex.acquire()
+            try:
+                return dbapi.connect()
+            finally:
+                mutex.release()
 
         success = []
         for timeout in (None, 30):
@@ -905,11 +910,15 @@ class QueuePoolTest(PoolTestBase):
                 for i in range(2):
                     t = threading.Thread(target=waiter,
                                     args=(p, timeout, max_overflow))
+                    t.daemon = True
                     t.start()
                     threads.add(t)
 
-                c1.invalidate()
-                c2.invalidate()
+                # this sleep makes sure that the
+                # two waiter threads hit upon wait()
+                # inside the queue, before we invalidate the other
+                # two conns
+                time.sleep(.2)
                 p2 = p._replace()
 
                 for t in threads:
@@ -931,9 +940,7 @@ class QueuePoolTest(PoolTestBase):
         p1 = pool.QueuePool(creator=creator1,
                            pool_size=1, timeout=None,
                            max_overflow=0)
-        p2 = pool.QueuePool(creator=creator2,
-                           pool_size=1, timeout=None,
-                           max_overflow=-1)
+        p2 = pool.NullPool(creator=creator2)
         def waiter(p):
             conn = p.connect()
             time.sleep(.5)