From: Mike Bayer Date: Mon, 1 Jan 2024 21:54:58 +0000 (-0500) Subject: ensure correct lock type propagated in pool recreate X-Git-Tag: rel_1_4_51~3^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6eca5a1a59858763fc7386c8d75a86968587d02c;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git ensure correct lock type propagated in pool recreate Fixed critical issue in asyncio version of the connection pool where calling :meth:`_asyncio.AsyncEngine.dispose` would produce a new connection pool that did not fully re-establish the use of asyncio-compatible mutexes, leading to the use of a plain ``threading.Lock()`` which would then cause deadlocks in an asyncio context when using concurrency features like ``asyncio.gather()``. Fixes: #10813 Change-Id: I95ec698b6a1ba79555aa0b28e6bce65fedf3b1fe (cherry picked from commit 2ed32bbf891b8f7e6c151071b4711319d9aa84f0) (cherry picked from commit c65e4f4471cd10051476caaadcc92d7a7eb557b4) --- diff --git a/doc/build/changelog/unreleased_14/10813.rst b/doc/build/changelog/unreleased_14/10813.rst new file mode 100644 index 0000000000..d4f72d8e0b --- /dev/null +++ b/doc/build/changelog/unreleased_14/10813.rst @@ -0,0 +1,11 @@ +.. change:: + :tags: bug, asyncio + :tickets: 10813 + :versions: 1.4.51, 2.0.25 + + Fixed critical issue in asyncio version of the connection pool where + calling :meth:`_asyncio.AsyncEngine.dispose` would produce a new connection + pool that did not fully re-establish the use of asyncio-compatible mutexes, + leading to the use of a plain ``threading.Lock()`` which would then cause + deadlocks in an asyncio context when using concurrency features like + ``asyncio.gather()``. diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py index 5a85cb91ee..7ed9d0ed90 100644 --- a/lib/sqlalchemy/event/attr.py +++ b/lib/sqlalchemy/event/attr.py @@ -259,13 +259,25 @@ class _EmptyListener(_InstanceLevelDispatch): class _CompoundListener(_InstanceLevelDispatch): - __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once" + __slots__ = ( + "_exec_once_mutex", + "_exec_once", + "_exec_w_sync_once", + "_is_asyncio", + ) + + def __init__(self, *arg, **kw): + super(_CompoundListener, self).__init__(*arg, **kw) + self._is_asyncio = False def _set_asyncio(self): - self._exec_once_mutex = AsyncAdaptedLock() + self._is_asyncio = True def _memoized_attr__exec_once_mutex(self): - return threading.Lock() + if self._is_asyncio: + return AsyncAdaptedLock() + else: + return threading.Lock() def _exec_once_impl(self, retry_on_exception, *args, **kw): with self._exec_once_mutex: @@ -365,6 +377,7 @@ class _ListenerCollection(_CompoundListener): ) def __init__(self, parent, target_cls): + super(_ListenerCollection, self).__init__() if target_cls not in parent._clslevel: parent.update_subclass(target_cls) self._exec_once = False @@ -401,6 +414,9 @@ class _ListenerCollection(_CompoundListener): existing_listeners.extend(other_listeners) + if other._is_asyncio: + self._set_asyncio() + to_associate = other.propagate.union(other_listeners) registry._stored_in_collection_multi(self, other, to_associate) diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index 7875b9aec4..cb79fa3826 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -1169,3 +1169,23 @@ class AsyncProxyTest(EngineFixture, fixtures.TestBase): async_t2 = async_conn.get_transaction() is_(async_t1, async_t2) + + +class PoolRegenTest(EngineFixture): + @testing.requires.queue_pool + @async_test + @testing.variation("do_dispose", [True, False]) + async def test_gather_after_dispose(self, testing_engine, do_dispose): + engine = testing_engine( + asyncio=True, options=dict(pool_size=10, max_overflow=10) + ) + + async def thing(engine): + async with engine.connect() as conn: + await conn.exec_driver_sql("select 1") + + if do_dispose: + await engine.dispose() + + tasks = [thing(engine) for _ in range(10)] + await asyncio.gather(*tasks)