--- /dev/null
+.. change::
+ :tags: bug, asyncio
+ :tickets: 8771
+
+ An error is raised if a :class:`.QueuePool` or other non-asyncio pool class
+ is passed to :func:`_asyncio.create_async_engine`. This engine only
+ accepts asyncio-compatible pool classes including
+ :class:`.AsyncAdaptedQueuePool`. Other pool classes such as
+ :class:`.NullPool` are compatible with both synchronous and asynchronous
+ engines as they do not perform any locking.
+
+ .. seealso::
+
+ :ref:`pool_api`
+
actually used five connections concurrently, in which case the usage of a
small pool is an entirely appropriate default behavior.
+.. note:: The :class:`.QueuePool` class is **not compatible with asyncio**.
+ When using :class:`_asyncio.create_async_engine` to create an instance of
+ :class:`.AsyncEngine`, the :class:`_pool.AsyncAdaptedQueuePool` class,
+ which makes use of an asyncio-compatible queue implementation, is used
+ instead.
+
+
.. _pool_switching:
Switching Pool Implementations
my_pool = create_pool_from_url("mysql+mysqldb://", poolclass=NullPool)
+.. _pool_api:
+
API Documentation - Available Pool Implementations
--------------------------------------------------
.. autoclass:: sqlalchemy.pool.QueuePool
:members:
+.. autoclass:: sqlalchemy.pool.AsyncAdaptedQueuePool
+ :members:
+
.. autoclass:: SingletonThreadPool
:members:
:ref:`connections_toplevel`
+.. _error_pcls:
+
+Pool class cannot be used with asyncio engine (or vice versa)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The :class:`_pool.QueuePool` pool class uses a ``thread.Lock`` object internally
+and is not compatible with asyncio. If using the :func:`_asyncio.create_async_engine`
+function to create an :class:`.AsyncEngine`, the appropriate queue pool class
+is :class:`_pool.AsyncAdaptedQueuePool`, which is used automatically and does
+not need to be specified.
+
+In addition to :class:`_pool.AsyncAdaptedQueuePool`, the :class:`_pool.NullPool`
+and :class:`_pool.StaticPool` pool classes do not use locks and are also
+suitable for use with async engines.
+
+This error is also raised in reverse in the unlikely case that the
+:class:`_pool.AsyncAdaptedQueuePool` pool class is indicated explicitly with
+the :func:`_sa.create_engine` function.
+
+.. seealso::
+
+ :ref:`pooling_toplevel`
.. _error_8s2b:
else:
pool._dialect = dialect
+ if (
+ hasattr(pool, "_is_asyncio")
+ and pool._is_asyncio is not dialect.is_async
+ ):
+ raise exc.ArgumentError(
+ f"Pool class {pool.__class__.__name__} cannot be "
+ f"used with {'non-' if not dialect.is_async else ''}"
+ "asyncio engine",
+ code="pcls",
+ )
+
# create engine.
if not pop_kwarg("future", True):
raise exc.ArgumentError(
that imposes a limit on the number of open connections.
:class:`.QueuePool` is the default pooling implementation used for
- all :class:`_engine.Engine` objects, unless the SQLite dialect is
- in use with a ``:memory:`` database.
+ all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
+ database.
+
+ The :class:`.QueuePool` class **is not compatible** with asyncio and
+ :func:`_asyncio.create_async_engine`. The
+ :class:`.AsyncAdaptedQueuePool` class is used automatically when
+ using :func:`_asyncio.create_async_engine`, if no other kind of pool
+ is specified.
+
+ .. seealso::
+
+ :class:`.AsyncAdaptedQueuePool`
"""
:class:`_pool.Pool` constructor.
"""
+
Pool.__init__(self, creator, **kw)
self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
self._overflow = 0 - pool_size
class AsyncAdaptedQueuePool(QueuePool):
+ """An asyncio-compatible version of :class:`.QueuePool`.
+
+ This pool is used by default when using :class:`.AsyncEngine` engines that
+ were generated from :func:`_asyncio.create_async_engine`. It uses an
+ asyncio-compatible queue implementation that does not use
+ ``threading.Lock``.
+
+ The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
+ otherwise identical to that of :class:`.QueuePool`.
+
+ """
+
_is_asyncio = True # type: ignore[assignment]
_queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
sqla_queue.AsyncAdaptedQueue
invalidation are not supported by this Pool implementation, since
no connections are held persistently.
+ The :class:`.NullPool` class **is compatible** with asyncio and
+ :func:`_asyncio.create_async_engine`.
+
"""
def status(self) -> str:
scenarios using a SQLite ``:memory:`` database and is not recommended
for production use.
+ The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
+ and :func:`_asyncio.create_async_engine`.
+
Options are the same as those of :class:`_pool.Pool`, as well as:
invalidation (which is also used to support auto-reconnect) are only
partially supported right now and may not yield good results.
+ The :class:`.StaticPool` class **is compatible** with asyncio and
+ :func:`_asyncio.create_async_engine`.
"""
at a time. Useful for debugging code that is using more connections
than desired.
+ The :class:`.AssertionPool` class **is compatible** with asyncio and
+ :func:`_asyncio.create_async_engine`.
+
"""
_conn: Optional[ConnectionPoolEntry]
True # enable event blocks, helps with profiling
)
- if isinstance(engine.pool, pool.QueuePool):
+ if (
+ isinstance(engine.pool, pool.QueuePool)
+ and "pool" not in options
+ and "pool_timeout" not in options
+ and "max_overflow" not in options
+ ):
engine.pool._timeout = 0
engine.pool._max_overflow = 0
if use_reaper:
from sqlalchemy.engine import default
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
+from sqlalchemy.pool import AsyncAdaptedQueuePool
from sqlalchemy.pool import NullPool
from sqlalchemy.pool import QueuePool
from sqlalchemy.sql import column
@testing.combinations(True, False, argnames="close")
def test_close_parameter(self, testing_engine, close):
eng = testing_engine(
- options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool)
+ options=dict(
+ pool_size=1,
+ max_overflow=0,
+ poolclass=(
+ QueuePool
+ if not testing.db.dialect.is_async
+ else AsyncAdaptedQueuePool
+ ),
+ )
)
conn = eng.connect()
from sqlalchemy.engine import characteristics
from sqlalchemy.engine import default
from sqlalchemy.engine import url
+from sqlalchemy.pool import AsyncAdaptedQueuePool
+from sqlalchemy.pool import QueuePool
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
eq_(c2.get_isolation_level(), self._default_isolation_level())
def test_per_connection(self):
- from sqlalchemy.pool import QueuePool
eng = testing_engine(
- options=dict(poolclass=QueuePool, pool_size=2, max_overflow=0)
+ options=dict(
+ poolclass=(
+ QueuePool
+ if not testing.db.dialect.is_async
+ else AsyncAdaptedQueuePool
+ ),
+ pool_size=2,
+ max_overflow=0,
+ )
)
c1 = eng.connect()
import inspect as stdlib_inspect
from unittest.mock import patch
+from sqlalchemy import AssertionPool
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import delete
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy import Integer
+from sqlalchemy import NullPool
+from sqlalchemy import QueuePool
from sqlalchemy import select
+from sqlalchemy import SingletonThreadPool
+from sqlalchemy import StaticPool
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
eq_(isolation_level, "SERIALIZABLE")
+ @testing.combinations(
+ (
+ AsyncAdaptedQueuePool,
+ True,
+ ),
+ (
+ QueuePool,
+ False,
+ ),
+ (NullPool, True),
+ (SingletonThreadPool, False),
+ (StaticPool, True),
+ (AssertionPool, True),
+ argnames="pool_cls,should_work",
+ )
+ @testing.variation("instantiate", [True, False])
+ @async_test
+ async def test_pool_classes(
+ self, async_testing_engine, pool_cls, instantiate, should_work
+ ):
+ """test #8771"""
+ if instantiate:
+ if pool_cls in (QueuePool, AsyncAdaptedQueuePool):
+ pool = pool_cls(creator=testing.db.pool._creator, timeout=10)
+ else:
+ pool = pool_cls(
+ creator=testing.db.pool._creator,
+ )
+
+ options = {"pool": pool}
+ else:
+ if pool_cls in (QueuePool, AsyncAdaptedQueuePool):
+ options = {"poolclass": pool_cls, "pool_timeout": 10}
+ else:
+ options = {"poolclass": pool_cls}
+
+ if not should_work:
+ with expect_raises_message(
+ exc.ArgumentError,
+ f"Pool class {pool_cls.__name__} "
+ "cannot be used with asyncio engine",
+ ):
+ async_testing_engine(options=options)
+ return
+
+ e = async_testing_engine(options=options)
+
+ if pool_cls is AssertionPool:
+ async with e.connect() as conn:
+ result = await conn.scalar(select(1))
+ eq_(result, 1)
+ return
+
+ async def go():
+ async with e.connect() as conn:
+ result = await conn.scalar(select(1))
+ eq_(result, 1)
+ return result
+
+ eq_(await asyncio.gather(*[go() for i in range(10)]), [1] * 10)
+
+ def test_cant_use_async_pool_w_create_engine(self):
+ """supplemental test for #8771"""
+
+ with expect_raises_message(
+ exc.ArgumentError,
+ "Pool class AsyncAdaptedQueuePool "
+ "cannot be used with non-asyncio engine",
+ ):
+ create_engine("sqlite://", poolclass=AsyncAdaptedQueuePool)
+
@testing.requires.queue_pool
@async_test
async def test_dispose(self, async_engine):