--- /dev/null
+.. change::
+ :tags: bug, asyncio
+ :tickets: 10813
+ :versions: 1.4.51, 2.0.25
+
+ Fixed critical issue in asyncio version of the connection pool where
+ calling :meth:`_asyncio.AsyncEngine.dispose` would produce a new connection
+ pool that did not fully re-establish the use of asyncio-compatible mutexes,
+ leading to the use of a plain ``threading.Lock()`` which would then cause
+ deadlocks in an asyncio context when using concurrency features like
+ ``asyncio.gather()``.
class _CompoundListener(_InstanceLevelDispatch[_ET]):
- __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once"
+ __slots__ = (
+ "_exec_once_mutex",
+ "_exec_once",
+ "_exec_w_sync_once",
+ "_is_asyncio",
+ )
_exec_once_mutex: _MutexProtocol
parent_listeners: Collection[_ListenerFnType]
_exec_once: bool
_exec_w_sync_once: bool
+ def __init__(self, *arg: Any, **kw: Any):
+ super().__init__(*arg, **kw)
+ self._is_asyncio = False
+
def _set_asyncio(self) -> None:
- self._exec_once_mutex = AsyncAdaptedLock()
+ self._is_asyncio = True
def _memoized_attr__exec_once_mutex(self) -> _MutexProtocol:
- return threading.Lock()
+ if self._is_asyncio:
+ return AsyncAdaptedLock()
+ else:
+ return threading.Lock()
def _exec_once_impl(
self, retry_on_exception: bool, *args: Any, **kw: Any
propagate: Set[_ListenerFnType]
def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]):
+ super().__init__()
if target_cls not in parent._clslevel:
parent.update_subclass(target_cls)
self._exec_once = False
existing_listeners.extend(other_listeners)
+ if other._is_asyncio:
+ self._set_asyncio()
+
to_associate = other.propagate.union(other_listeners)
registry._stored_in_collection_multi(self, other, to_associate)
async_t2 = async_conn.get_transaction()
is_(async_t1, async_t2)
+
+
+class PoolRegenTest(EngineFixture):
+ @testing.requires.queue_pool
+ @async_test
+ @testing.variation("do_dispose", [True, False])
+ async def test_gather_after_dispose(self, testing_engine, do_dispose):
+ engine = testing_engine(
+ asyncio=True, options=dict(pool_size=10, max_overflow=10)
+ )
+
+ async def thing(engine):
+ async with engine.connect() as conn:
+ await conn.exec_driver_sql("select 1")
+
+ if do_dispose:
+ await engine.dispose()
+
+ tasks = [thing(engine) for _ in range(10)]
+ await asyncio.gather(*tasks)