]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
ensure correct lock type propagated in pool recreate
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 1 Jan 2024 21:54:58 +0000 (16:54 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 1 Jan 2024 22:24:03 +0000 (17:24 -0500)
Fixed critical issue in asyncio version of the connection pool where
calling :meth:`_asyncio.AsyncEngine.dispose` would produce a new connection
pool that did not fully re-establish the use of asyncio-compatible mutexes,
leading to the use of a plain ``threading.Lock()`` which would then cause
deadlocks in an asyncio context when using concurrency features like
``asyncio.gather()``.

Fixes: #10813
Change-Id: I95ec698b6a1ba79555aa0b28e6bce65fedf3b1fe
(cherry picked from commit 2ed32bbf891b8f7e6c151071b4711319d9aa84f0)
(cherry picked from commit c65e4f4471cd10051476caaadcc92d7a7eb557b4)

doc/build/changelog/unreleased_14/10813.rst [new file with mode: 0644]
lib/sqlalchemy/event/attr.py
test/ext/asyncio/test_engine_py3k.py

diff --git a/doc/build/changelog/unreleased_14/10813.rst b/doc/build/changelog/unreleased_14/10813.rst
new file mode 100644 (file)
index 0000000..d4f72d8
--- /dev/null
@@ -0,0 +1,11 @@
+.. change::
+    :tags: bug, asyncio
+    :tickets: 10813
+    :versions: 1.4.51, 2.0.25
+
+    Fixed critical issue in asyncio version of the connection pool where
+    calling :meth:`_asyncio.AsyncEngine.dispose` would produce a new connection
+    pool that did not fully re-establish the use of asyncio-compatible mutexes,
+    leading to the use of a plain ``threading.Lock()`` which would then cause
+    deadlocks in an asyncio context when using concurrency features like
+    ``asyncio.gather()``.
index 5a85cb91ee2879ae06d6a0851daa104339967540..7ed9d0ed90c8e2c03186c19a99fc1ce4aa8734cb 100644 (file)
@@ -259,13 +259,25 @@ class _EmptyListener(_InstanceLevelDispatch):
 
 
 class _CompoundListener(_InstanceLevelDispatch):
-    __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once"
+    __slots__ = (
+        "_exec_once_mutex",
+        "_exec_once",
+        "_exec_w_sync_once",
+        "_is_asyncio",
+    )
+
+    def __init__(self, *arg, **kw):
+        super(_CompoundListener, self).__init__(*arg, **kw)
+        self._is_asyncio = False
 
     def _set_asyncio(self):
-        self._exec_once_mutex = AsyncAdaptedLock()
+        self._is_asyncio = True
 
     def _memoized_attr__exec_once_mutex(self):
-        return threading.Lock()
+        if self._is_asyncio:
+            return AsyncAdaptedLock()
+        else:
+            return threading.Lock()
 
     def _exec_once_impl(self, retry_on_exception, *args, **kw):
         with self._exec_once_mutex:
@@ -365,6 +377,7 @@ class _ListenerCollection(_CompoundListener):
     )
 
     def __init__(self, parent, target_cls):
+        super(_ListenerCollection, self).__init__()
         if target_cls not in parent._clslevel:
             parent.update_subclass(target_cls)
         self._exec_once = False
@@ -401,6 +414,9 @@ class _ListenerCollection(_CompoundListener):
 
         existing_listeners.extend(other_listeners)
 
+        if other._is_asyncio:
+            self._set_asyncio()
+
         to_associate = other.propagate.union(other_listeners)
         registry._stored_in_collection_multi(self, other, to_associate)
 
index 7875b9aec4cd72c5927d553c43cf9ff4f844d1aa..cb79fa3826c99863b86e8a3cd64b4ef9f69c0214 100644 (file)
@@ -1169,3 +1169,23 @@ class AsyncProxyTest(EngineFixture, fixtures.TestBase):
 
         async_t2 = async_conn.get_transaction()
         is_(async_t1, async_t2)
+
+
+class PoolRegenTest(EngineFixture):
+    @testing.requires.queue_pool
+    @async_test
+    @testing.variation("do_dispose", [True, False])
+    async def test_gather_after_dispose(self, testing_engine, do_dispose):
+        engine = testing_engine(
+            asyncio=True, options=dict(pool_size=10, max_overflow=10)
+        )
+
+        async def thing(engine):
+            async with engine.connect() as conn:
+                await conn.exec_driver_sql("select 1")
+
+        if do_dispose:
+            await engine.dispose()
+
+        tasks = [thing(engine) for _ in range(10)]
+        await asyncio.gather(*tasks)