From: Mike Bayer Date: Thu, 15 Feb 2024 02:10:20 +0000 (-0500) Subject: raise for asyncio-incompatible pool classes X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c449505f651ebf4b73aaa7d7aec99b038ea34cb6;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git raise for asyncio-incompatible pool classes An error is raised if a :class:`.QueuePool` or other non-asyncio pool class is passed to :func:`_asyncio.create_async_engine`. This engine only accepts asyncio-compatible pool classes including :class:`.AsyncAdaptedQueuePool`. Other pool classes such as :class:`.NullPool` are compatible with both synchronous and asynchronous engines as they do not perform any locking. Fixes: #8771 Change-Id: I5843ccea7d824488492d1a9d46207b9f05330ae3 --- diff --git a/doc/build/changelog/unreleased_20/8771.rst b/doc/build/changelog/unreleased_20/8771.rst new file mode 100644 index 0000000000..9f501fcb8d --- /dev/null +++ b/doc/build/changelog/unreleased_20/8771.rst @@ -0,0 +1,15 @@ +.. change:: + :tags: bug, asyncio + :tickets: 8771 + + An error is raised if a :class:`.QueuePool` or other non-asyncio pool class + is passed to :func:`_asyncio.create_async_engine`. This engine only + accepts asyncio-compatible pool classes including + :class:`.AsyncAdaptedQueuePool`. Other pool classes such as + :class:`.NullPool` are compatible with both synchronous and asynchronous + engines as they do not perform any locking. + + .. seealso:: + + :ref:`pool_api` + diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index 78bbdcb1af..f3ea6e8623 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -50,6 +50,13 @@ queued up - the pool would only grow to that size if the application actually used five connections concurrently, in which case the usage of a small pool is an entirely appropriate default behavior. +.. note:: The :class:`.QueuePool` class is **not compatible with asyncio**. + When using :class:`_asyncio.create_async_engine` to create an instance of + :class:`.AsyncEngine`, the :class:`_pool.AsyncAdaptedQueuePool` class, + which makes use of an asyncio-compatible queue implementation, is used + instead. + + .. _pool_switching: Switching Pool Implementations @@ -713,6 +720,8 @@ like in the following example:: my_pool = create_pool_from_url("mysql+mysqldb://", poolclass=NullPool) +.. _pool_api: + API Documentation - Available Pool Implementations -------------------------------------------------- @@ -722,6 +731,9 @@ API Documentation - Available Pool Implementations .. autoclass:: sqlalchemy.pool.QueuePool :members: +.. autoclass:: sqlalchemy.pool.AsyncAdaptedQueuePool + :members: + .. autoclass:: SingletonThreadPool :members: diff --git a/doc/build/errors.rst b/doc/build/errors.rst index 55ac40ae5f..d664512315 100644 --- a/doc/build/errors.rst +++ b/doc/build/errors.rst @@ -188,6 +188,28 @@ sooner. :ref:`connections_toplevel` +.. _error_pcls: + +Pool class cannot be used with asyncio engine (or vice versa) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The :class:`_pool.QueuePool` pool class uses a ``thread.Lock`` object internally +and is not compatible with asyncio. If using the :func:`_asyncio.create_async_engine` +function to create an :class:`.AsyncEngine`, the appropriate queue pool class +is :class:`_pool.AsyncAdaptedQueuePool`, which is used automatically and does +not need to be specified. + +In addition to :class:`_pool.AsyncAdaptedQueuePool`, the :class:`_pool.NullPool` +and :class:`_pool.StaticPool` pool classes do not use locks and are also +suitable for use with async engines. + +This error is also raised in reverse in the unlikely case that the +:class:`_pool.AsyncAdaptedQueuePool` pool class is indicated explicitly with +the :func:`_sa.create_engine` function. + +.. seealso:: + + :ref:`pooling_toplevel` .. _error_8s2b: diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py index e04057d44c..722a10ed05 100644 --- a/lib/sqlalchemy/engine/create.py +++ b/lib/sqlalchemy/engine/create.py @@ -655,6 +655,17 @@ def create_engine(url: Union[str, _url.URL], **kwargs: Any) -> Engine: else: pool._dialect = dialect + if ( + hasattr(pool, "_is_asyncio") + and pool._is_asyncio is not dialect.is_async + ): + raise exc.ArgumentError( + f"Pool class {pool.__class__.__name__} cannot be " + f"used with {'non-' if not dialect.is_async else ''}" + "asyncio engine", + code="pcls", + ) + # create engine. if not pop_kwarg("future", True): raise exc.ArgumentError( diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py index e2bb81bf0d..d046d9f63e 100644 --- a/lib/sqlalchemy/pool/impl.py +++ b/lib/sqlalchemy/pool/impl.py @@ -47,8 +47,18 @@ class QueuePool(Pool): that imposes a limit on the number of open connections. :class:`.QueuePool` is the default pooling implementation used for - all :class:`_engine.Engine` objects, unless the SQLite dialect is - in use with a ``:memory:`` database. + all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` + database. + + The :class:`.QueuePool` class **is not compatible** with asyncio and + :func:`_asyncio.create_async_engine`. The + :class:`.AsyncAdaptedQueuePool` class is used automatically when + using :func:`_asyncio.create_async_engine`, if no other kind of pool + is specified. + + .. seealso:: + + :class:`.AsyncAdaptedQueuePool` """ @@ -123,6 +133,7 @@ class QueuePool(Pool): :class:`_pool.Pool` constructor. """ + Pool.__init__(self, creator, **kw) self._pool = self._queue_class(pool_size, use_lifo=use_lifo) self._overflow = 0 - pool_size @@ -248,6 +259,18 @@ class QueuePool(Pool): class AsyncAdaptedQueuePool(QueuePool): + """An asyncio-compatible version of :class:`.QueuePool`. + + This pool is used by default when using :class:`.AsyncEngine` engines that + were generated from :func:`_asyncio.create_async_engine`. It uses an + asyncio-compatible queue implementation that does not use + ``threading.Lock``. + + The arguments and operation of :class:`.AsyncAdaptedQueuePool` are + otherwise identical to that of :class:`.QueuePool`. + + """ + _is_asyncio = True # type: ignore[assignment] _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( sqla_queue.AsyncAdaptedQueue @@ -266,6 +289,9 @@ class NullPool(Pool): invalidation are not supported by this Pool implementation, since no connections are held persistently. + The :class:`.NullPool` class **is compatible** with asyncio and + :func:`_asyncio.create_async_engine`. + """ def status(self) -> str: @@ -313,6 +339,9 @@ class SingletonThreadPool(Pool): scenarios using a SQLite ``:memory:`` database and is not recommended for production use. + The :class:`.SingletonThreadPool` class **is not compatible** with asyncio + and :func:`_asyncio.create_async_engine`. + Options are the same as those of :class:`_pool.Pool`, as well as: @@ -421,6 +450,8 @@ class StaticPool(Pool): invalidation (which is also used to support auto-reconnect) are only partially supported right now and may not yield good results. + The :class:`.StaticPool` class **is compatible** with asyncio and + :func:`_asyncio.create_async_engine`. """ @@ -485,6 +516,9 @@ class AssertionPool(Pool): at a time. Useful for debugging code that is using more connections than desired. + The :class:`.AssertionPool` class **is compatible** with asyncio and + :func:`_asyncio.create_async_engine`. + """ _conn: Optional[ConnectionPoolEntry] diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index 6b3f32c2b7..bbb85890d0 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -368,7 +368,12 @@ def testing_engine( True # enable event blocks, helps with profiling ) - if isinstance(engine.pool, pool.QueuePool): + if ( + isinstance(engine.pool, pool.QueuePool) + and "pool" not in options + and "pool_timeout" not in options + and "max_overflow" not in options + ): engine.pool._timeout = 0 engine.pool._max_overflow = 0 if use_reaper: diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 4618dfff8d..122c08461d 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -34,6 +34,7 @@ from sqlalchemy.engine import BindTyping from sqlalchemy.engine import default from sqlalchemy.engine.base import Connection from sqlalchemy.engine.base import Engine +from sqlalchemy.pool import AsyncAdaptedQueuePool from sqlalchemy.pool import NullPool from sqlalchemy.pool import QueuePool from sqlalchemy.sql import column @@ -2411,7 +2412,15 @@ class EngineEventsTest(fixtures.TestBase): @testing.combinations(True, False, argnames="close") def test_close_parameter(self, testing_engine, close): eng = testing_engine( - options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool) + options=dict( + pool_size=1, + max_overflow=0, + poolclass=( + QueuePool + if not testing.db.dialect.is_async + else AsyncAdaptedQueuePool + ), + ) ) conn = eng.connect() diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index a70e8e05d0..68650d6d2b 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -12,6 +12,8 @@ from sqlalchemy.engine import base from sqlalchemy.engine import characteristics from sqlalchemy.engine import default from sqlalchemy.engine import url +from sqlalchemy.pool import AsyncAdaptedQueuePool +from sqlalchemy.pool import QueuePool from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings @@ -1345,10 +1347,17 @@ class IsolationLevelTest(fixtures.TestBase): eq_(c2.get_isolation_level(), self._default_isolation_level()) def test_per_connection(self): - from sqlalchemy.pool import QueuePool eng = testing_engine( - options=dict(poolclass=QueuePool, pool_size=2, max_overflow=0) + options=dict( + poolclass=( + QueuePool + if not testing.db.dialect.is_async + else AsyncAdaptedQueuePool + ), + pool_size=2, + max_overflow=0, + ) ) c1 = eng.connect() diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index 15a0ebfd7f..c3d1e4835a 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -3,6 +3,7 @@ import contextlib import inspect as stdlib_inspect from unittest.mock import patch +from sqlalchemy import AssertionPool from sqlalchemy import Column from sqlalchemy import create_engine from sqlalchemy import delete @@ -11,7 +12,11 @@ from sqlalchemy import exc from sqlalchemy import func from sqlalchemy import inspect from sqlalchemy import Integer +from sqlalchemy import NullPool +from sqlalchemy import QueuePool from sqlalchemy import select +from sqlalchemy import SingletonThreadPool +from sqlalchemy import StaticPool from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing @@ -520,6 +525,77 @@ class AsyncEngineTest(EngineFixture): eq_(isolation_level, "SERIALIZABLE") + @testing.combinations( + ( + AsyncAdaptedQueuePool, + True, + ), + ( + QueuePool, + False, + ), + (NullPool, True), + (SingletonThreadPool, False), + (StaticPool, True), + (AssertionPool, True), + argnames="pool_cls,should_work", + ) + @testing.variation("instantiate", [True, False]) + @async_test + async def test_pool_classes( + self, async_testing_engine, pool_cls, instantiate, should_work + ): + """test #8771""" + if instantiate: + if pool_cls in (QueuePool, AsyncAdaptedQueuePool): + pool = pool_cls(creator=testing.db.pool._creator, timeout=10) + else: + pool = pool_cls( + creator=testing.db.pool._creator, + ) + + options = {"pool": pool} + else: + if pool_cls in (QueuePool, AsyncAdaptedQueuePool): + options = {"poolclass": pool_cls, "pool_timeout": 10} + else: + options = {"poolclass": pool_cls} + + if not should_work: + with expect_raises_message( + exc.ArgumentError, + f"Pool class {pool_cls.__name__} " + "cannot be used with asyncio engine", + ): + async_testing_engine(options=options) + return + + e = async_testing_engine(options=options) + + if pool_cls is AssertionPool: + async with e.connect() as conn: + result = await conn.scalar(select(1)) + eq_(result, 1) + return + + async def go(): + async with e.connect() as conn: + result = await conn.scalar(select(1)) + eq_(result, 1) + return result + + eq_(await asyncio.gather(*[go() for i in range(10)]), [1] * 10) + + def test_cant_use_async_pool_w_create_engine(self): + """supplemental test for #8771""" + + with expect_raises_message( + exc.ArgumentError, + "Pool class AsyncAdaptedQueuePool " + "cannot be used with non-asyncio engine", + ): + create_engine("sqlite://", poolclass=AsyncAdaptedQueuePool) + @testing.requires.queue_pool @async_test async def test_dispose(self, async_engine):