--- /dev/null
+.. change::
+ :tags: bug, asyncio
+ :tickets: 10813
+ :versions: 1.4.51, 2.0.25
+
+ Fixed critical issue in asyncio version of the connection pool where
+ calling :meth:`_asyncio.AsyncEngine.dispose` would produce a new connection
+ pool that did not fully re-establish the use of asyncio-compatible mutexes,
+ leading to the use of a plain ``threading.Lock()`` which would then cause
+ deadlocks in an asyncio context when using concurrency features like
+ ``asyncio.gather()``.
class _CompoundListener(_InstanceLevelDispatch):
- __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once"
+ __slots__ = (
+ "_exec_once_mutex",
+ "_exec_once",
+ "_exec_w_sync_once",
+ "_is_asyncio",
+ )
+
+ def __init__(self, *arg, **kw):
+ super(_CompoundListener, self).__init__(*arg, **kw)
+ self._is_asyncio = False
def _set_asyncio(self):
- self._exec_once_mutex = AsyncAdaptedLock()
+ self._is_asyncio = True
def _memoized_attr__exec_once_mutex(self):
- return threading.Lock()
+ if self._is_asyncio:
+ return AsyncAdaptedLock()
+ else:
+ return threading.Lock()
def _exec_once_impl(self, retry_on_exception, *args, **kw):
with self._exec_once_mutex:
)
def __init__(self, parent, target_cls):
+ super(_ListenerCollection, self).__init__()
if target_cls not in parent._clslevel:
parent.update_subclass(target_cls)
self._exec_once = False
existing_listeners.extend(other_listeners)
+ if other._is_asyncio:
+ self._set_asyncio()
+
to_associate = other.propagate.union(other_listeners)
registry._stored_in_collection_multi(self, other, to_associate)
async_t2 = async_conn.get_transaction()
is_(async_t1, async_t2)
+
+
+class PoolRegenTest(EngineFixture):
+ @testing.requires.queue_pool
+ @async_test
+ @testing.variation("do_dispose", [True, False])
+ async def test_gather_after_dispose(self, testing_engine, do_dispose):
+ engine = testing_engine(
+ asyncio=True, options=dict(pool_size=10, max_overflow=10)
+ )
+
+ async def thing(engine):
+ async with engine.connect() as conn:
+ await conn.exec_driver_sql("select 1")
+
+ if do_dispose:
+ await engine.dispose()
+
+ tasks = [thing(engine) for _ in range(10)]
+ await asyncio.gather(*tasks)