From 300264ab38def29a4a24a83b39aece0d0332e83d Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 6 Dec 2013 15:53:59 -0500 Subject: [PATCH] - Made a slight adjustment to the logic which waits for a pooled connection to be available, such that for a connection pool with no timeout specified, it will every half a second break out of the wait to check for the so-called "abort" flag, which allows the waiter to break out in case the whole connection pool was dumped; normally the waiter should break out due to a notify_all() but it's possible this notify_all() is missed in very slim cases. This is an extension of logic first introduced in 0.8.0, and the issue has only been observed occasionally in stress tests. --- doc/build/changelog/changelog_08.rst | 15 +++++++++++++++ lib/sqlalchemy/util/queue.py | 10 ++++++++-- test/engine/test_pool.py | 19 +++++++++++++------ 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst index fcad47c6f0..c5f882f7f6 100644 --- a/doc/build/changelog/changelog_08.rst +++ b/doc/build/changelog/changelog_08.rst @@ -11,6 +11,21 @@ .. changelog:: :version: 0.8.4 + .. change:: + :tags: bug, engine, pool + :versions: 0.9.0b2 + :tickets: 2522 + + Made a slight adjustment to the logic which waits for a pooled + connection to be available, such that for a connection pool + with no timeout specified, it will every half a second break out of + the wait to check for the so-called "abort" flag, which allows the + waiter to break out in case the whole connection pool was dumped; + normally the waiter should break out due to a notify_all() but it's + possible this notify_all() is missed in very slim cases. + This is an extension of logic first introduced in 0.8.0, and the + issue has only been observed occasionally in stress tests. + .. change:: :tags: bug, mssql :versions: 0.9.0b2 diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index b66738aff1..639b23a936 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -151,7 +151,6 @@ class Queue: return an item if one is immediately available, else raise the ``Empty`` exception (`timeout` is ignored in that case). """ - self.not_empty.acquire() try: if not block: @@ -159,7 +158,11 @@ class Queue: raise Empty elif timeout is None: while self._empty(): - self.not_empty.wait() + # wait for only half a second, then + # loop around, so that we can see a change in + # _sqla_abort_context in case we missed the notify_all() + # called by abort() + self.not_empty.wait(.5) if self._sqla_abort_context: raise SAAbort(self._sqla_abort_context) else: @@ -188,6 +191,9 @@ class Queue: if not self.not_full.acquire(False): return try: + # note that this is now optional + # as the waiters in get() both loop around + # to check the _sqla_abort_context flag periodically self.not_empty.notify_all() finally: self.not_full.release() diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index c395dd8258..3f05f661a1 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -879,9 +879,14 @@ class QueuePoolTest(PoolTestBase): handled when the pool is replaced. """ + mutex = threading.Lock() dbapi = MockDBAPI() def creator(): - return dbapi.connect() + mutex.acquire() + try: + return dbapi.connect() + finally: + mutex.release() success = [] for timeout in (None, 30): @@ -903,11 +908,15 @@ class QueuePoolTest(PoolTestBase): for i in range(2): t = threading.Thread(target=waiter, args=(p, timeout, max_overflow)) + t.daemon = True t.start() threads.append(t) - c1.invalidate() - c2.invalidate() + # this sleep makes sure that the + # two waiter threads hit upon wait() + # inside the queue, before we invalidate the other + # two conns + time.sleep(.2) p2 = p._replace() for t in threads: @@ -928,9 +937,7 @@ class QueuePoolTest(PoolTestBase): p1 = pool.QueuePool(creator=creator1, pool_size=1, timeout=None, max_overflow=0) - p2 = pool.QueuePool(creator=creator2, - pool_size=1, timeout=None, - max_overflow=-1) + p2 = pool.NullPool(creator=creator2) def waiter(p): conn = p.connect() time.sleep(.5) -- 2.47.3