]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
raise for asyncio-incompatible pool classes
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 15 Feb 2024 02:10:20 +0000 (21:10 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 16 Feb 2024 15:02:07 +0000 (10:02 -0500)
An error is raised if a :class:`.QueuePool` or other non-asyncio pool class
is passed to :func:`_asyncio.create_async_engine`.  This engine only
accepts asyncio-compatible pool classes including
:class:`.AsyncAdaptedQueuePool`. Other pool classes such as
:class:`.NullPool` are compatible with both synchronous and asynchronous
engines as they do not perform any locking.

Fixes: #8771
Change-Id: I5843ccea7d824488492d1a9d46207b9f05330ae3

doc/build/changelog/unreleased_20/8771.rst [new file with mode: 0644]
doc/build/core/pooling.rst
doc/build/errors.rst
lib/sqlalchemy/engine/create.py
lib/sqlalchemy/pool/impl.py
lib/sqlalchemy/testing/engines.py
test/engine/test_execute.py
test/engine/test_transaction.py
test/ext/asyncio/test_engine_py3k.py

diff --git a/doc/build/changelog/unreleased_20/8771.rst b/doc/build/changelog/unreleased_20/8771.rst
new file mode 100644 (file)
index 0000000..9f501fc
--- /dev/null
@@ -0,0 +1,15 @@
+.. change::
+    :tags: bug, asyncio
+    :tickets: 8771
+
+    An error is raised if a :class:`.QueuePool` or other non-asyncio pool class
+    is passed to :func:`_asyncio.create_async_engine`.  This engine only
+    accepts asyncio-compatible pool classes including
+    :class:`.AsyncAdaptedQueuePool`. Other pool classes such as
+    :class:`.NullPool` are compatible with both synchronous and asynchronous
+    engines as they do not perform any locking.
+
+    .. seealso::
+
+        :ref:`pool_api`
+
index 78bbdcb1af86bbea818bfac3048ec4ee9e9e94d0..f3ea6e862383c6f678c70c748d478c991ef58350 100644 (file)
@@ -50,6 +50,13 @@ queued up - the pool would only grow to that size if the application
 actually used five connections concurrently, in which case the usage of a
 small pool is an entirely appropriate default behavior.
 
+.. note:: The :class:`.QueuePool` class is **not compatible with asyncio**.
+   When using :class:`_asyncio.create_async_engine` to create an instance of
+   :class:`.AsyncEngine`, the :class:`_pool.AsyncAdaptedQueuePool` class,
+   which makes use of an asyncio-compatible queue implementation, is used
+   instead.
+
+
 .. _pool_switching:
 
 Switching Pool Implementations
@@ -713,6 +720,8 @@ like in the following example::
 
     my_pool = create_pool_from_url("mysql+mysqldb://", poolclass=NullPool)
 
+.. _pool_api:
+
 API Documentation - Available Pool Implementations
 --------------------------------------------------
 
@@ -722,6 +731,9 @@ API Documentation - Available Pool Implementations
 .. autoclass:: sqlalchemy.pool.QueuePool
     :members:
 
+.. autoclass:: sqlalchemy.pool.AsyncAdaptedQueuePool
+    :members:
+
 .. autoclass:: SingletonThreadPool
     :members:
 
index 55ac40ae5f6f167eae29baa3f317947297920158..d6645123154c800c3057eb09cf9ff4d595e09a13 100644 (file)
@@ -188,6 +188,28 @@ sooner.
 
  :ref:`connections_toplevel`
 
+.. _error_pcls:
+
+Pool class cannot be used with asyncio engine (or vice versa)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The :class:`_pool.QueuePool` pool class uses a ``thread.Lock`` object internally
+and is not compatible with asyncio.  If using the :func:`_asyncio.create_async_engine`
+function to create an :class:`.AsyncEngine`, the appropriate queue pool class
+is :class:`_pool.AsyncAdaptedQueuePool`, which is used automatically and does
+not need to be specified.
+
+In addition to :class:`_pool.AsyncAdaptedQueuePool`, the :class:`_pool.NullPool`
+and :class:`_pool.StaticPool` pool classes do not use locks and are also
+suitable for use with async engines.
+
+This error is also raised in reverse in the unlikely case that the
+:class:`_pool.AsyncAdaptedQueuePool` pool class is indicated explicitly with
+the :func:`_sa.create_engine` function.
+
+.. seealso::
+
+    :ref:`pooling_toplevel`
 
 .. _error_8s2b:
 
index e04057d44c7e13e584f8d49aa17b87223bac853b..722a10ed0525265e748aa8602f8ae73c602b153d 100644 (file)
@@ -655,6 +655,17 @@ def create_engine(url: Union[str, _url.URL], **kwargs: Any) -> Engine:
     else:
         pool._dialect = dialect
 
+    if (
+        hasattr(pool, "_is_asyncio")
+        and pool._is_asyncio is not dialect.is_async
+    ):
+        raise exc.ArgumentError(
+            f"Pool class {pool.__class__.__name__} cannot be "
+            f"used with {'non-' if not dialect.is_async else ''}"
+            "asyncio engine",
+            code="pcls",
+        )
+
     # create engine.
     if not pop_kwarg("future", True):
         raise exc.ArgumentError(
index e2bb81bf0de814efe21eb3184f8863c27448e779..d046d9f63e4230c7da069274f78220dc294579e5 100644 (file)
@@ -47,8 +47,18 @@ class QueuePool(Pool):
     that imposes a limit on the number of open connections.
 
     :class:`.QueuePool` is the default pooling implementation used for
-    all :class:`_engine.Engine` objects, unless the SQLite dialect is
-    in use with a ``:memory:`` database.
+    all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
+    database.
+
+    The :class:`.QueuePool` class **is not compatible** with asyncio and
+    :func:`_asyncio.create_async_engine`.  The
+    :class:`.AsyncAdaptedQueuePool` class is used automatically when
+    using :func:`_asyncio.create_async_engine`, if no other kind of pool
+    is specified.
+
+    .. seealso::
+
+        :class:`.AsyncAdaptedQueuePool`
 
     """
 
@@ -123,6 +133,7 @@ class QueuePool(Pool):
           :class:`_pool.Pool` constructor.
 
         """
+
         Pool.__init__(self, creator, **kw)
         self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
         self._overflow = 0 - pool_size
@@ -248,6 +259,18 @@ class QueuePool(Pool):
 
 
 class AsyncAdaptedQueuePool(QueuePool):
+    """An asyncio-compatible version of :class:`.QueuePool`.
+
+    This pool is used by default when using :class:`.AsyncEngine` engines that
+    were generated from :func:`_asyncio.create_async_engine`.   It uses an
+    asyncio-compatible queue implementation that does not use
+    ``threading.Lock``.
+
+    The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
+    otherwise identical to that of :class:`.QueuePool`.
+
+    """
+
     _is_asyncio = True  # type: ignore[assignment]
     _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
         sqla_queue.AsyncAdaptedQueue
@@ -266,6 +289,9 @@ class NullPool(Pool):
     invalidation are not supported by this Pool implementation, since
     no connections are held persistently.
 
+    The :class:`.NullPool` class **is compatible** with asyncio and
+    :func:`_asyncio.create_async_engine`.
+
     """
 
     def status(self) -> str:
@@ -313,6 +339,9 @@ class SingletonThreadPool(Pool):
        scenarios using a SQLite ``:memory:`` database and is not recommended
        for production use.
 
+    The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
+    and :func:`_asyncio.create_async_engine`.
+
 
     Options are the same as those of :class:`_pool.Pool`, as well as:
 
@@ -421,6 +450,8 @@ class StaticPool(Pool):
     invalidation (which is also used to support auto-reconnect) are only
     partially supported right now and may not yield good results.
 
+    The :class:`.StaticPool` class **is compatible** with asyncio and
+    :func:`_asyncio.create_async_engine`.
 
     """
 
@@ -485,6 +516,9 @@ class AssertionPool(Pool):
     at a time.  Useful for debugging code that is using more connections
     than desired.
 
+    The :class:`.AssertionPool` class **is compatible** with asyncio and
+    :func:`_asyncio.create_async_engine`.
+
     """
 
     _conn: Optional[ConnectionPoolEntry]
index 6b3f32c2b762ab9839a53f4259e2689488c0557b..bbb85890d007cc9f1ccf301e3a86bc9aa25bad81 100644 (file)
@@ -368,7 +368,12 @@ def testing_engine(
                 True  # enable event blocks, helps with profiling
             )
 
-    if isinstance(engine.pool, pool.QueuePool):
+    if (
+        isinstance(engine.pool, pool.QueuePool)
+        and "pool" not in options
+        and "pool_timeout" not in options
+        and "max_overflow" not in options
+    ):
         engine.pool._timeout = 0
         engine.pool._max_overflow = 0
     if use_reaper:
index 4618dfff8d57322da9f8c2fe2eac77a5c4c9bbae..122c08461d159db1bf030186f7f199abeff7ebfc 100644 (file)
@@ -34,6 +34,7 @@ from sqlalchemy.engine import BindTyping
 from sqlalchemy.engine import default
 from sqlalchemy.engine.base import Connection
 from sqlalchemy.engine.base import Engine
+from sqlalchemy.pool import AsyncAdaptedQueuePool
 from sqlalchemy.pool import NullPool
 from sqlalchemy.pool import QueuePool
 from sqlalchemy.sql import column
@@ -2411,7 +2412,15 @@ class EngineEventsTest(fixtures.TestBase):
     @testing.combinations(True, False, argnames="close")
     def test_close_parameter(self, testing_engine, close):
         eng = testing_engine(
-            options=dict(pool_size=1, max_overflow=0, poolclass=QueuePool)
+            options=dict(
+                pool_size=1,
+                max_overflow=0,
+                poolclass=(
+                    QueuePool
+                    if not testing.db.dialect.is_async
+                    else AsyncAdaptedQueuePool
+                ),
+            )
         )
 
         conn = eng.connect()
index a70e8e05d0fe2f480dba2a0e6135a8e82bfc8059..68650d6d2bc7767027f43f0e8e10f080a5c4630f 100644 (file)
@@ -12,6 +12,8 @@ from sqlalchemy.engine import base
 from sqlalchemy.engine import characteristics
 from sqlalchemy.engine import default
 from sqlalchemy.engine import url
+from sqlalchemy.pool import AsyncAdaptedQueuePool
+from sqlalchemy.pool import QueuePool
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing import expect_warnings
@@ -1345,10 +1347,17 @@ class IsolationLevelTest(fixtures.TestBase):
             eq_(c2.get_isolation_level(), self._default_isolation_level())
 
     def test_per_connection(self):
-        from sqlalchemy.pool import QueuePool
 
         eng = testing_engine(
-            options=dict(poolclass=QueuePool, pool_size=2, max_overflow=0)
+            options=dict(
+                poolclass=(
+                    QueuePool
+                    if not testing.db.dialect.is_async
+                    else AsyncAdaptedQueuePool
+                ),
+                pool_size=2,
+                max_overflow=0,
+            )
         )
 
         c1 = eng.connect()
index 15a0ebfd7f22df416e818854575a195654db01c5..c3d1e4835a02f3287aa99a4e8bd1a0269ee8cc05 100644 (file)
@@ -3,6 +3,7 @@ import contextlib
 import inspect as stdlib_inspect
 from unittest.mock import patch
 
+from sqlalchemy import AssertionPool
 from sqlalchemy import Column
 from sqlalchemy import create_engine
 from sqlalchemy import delete
@@ -11,7 +12,11 @@ from sqlalchemy import exc
 from sqlalchemy import func
 from sqlalchemy import inspect
 from sqlalchemy import Integer
+from sqlalchemy import NullPool
+from sqlalchemy import QueuePool
 from sqlalchemy import select
+from sqlalchemy import SingletonThreadPool
+from sqlalchemy import StaticPool
 from sqlalchemy import String
 from sqlalchemy import Table
 from sqlalchemy import testing
@@ -520,6 +525,77 @@ class AsyncEngineTest(EngineFixture):
 
         eq_(isolation_level, "SERIALIZABLE")
 
+    @testing.combinations(
+        (
+            AsyncAdaptedQueuePool,
+            True,
+        ),
+        (
+            QueuePool,
+            False,
+        ),
+        (NullPool, True),
+        (SingletonThreadPool, False),
+        (StaticPool, True),
+        (AssertionPool, True),
+        argnames="pool_cls,should_work",
+    )
+    @testing.variation("instantiate", [True, False])
+    @async_test
+    async def test_pool_classes(
+        self, async_testing_engine, pool_cls, instantiate, should_work
+    ):
+        """test #8771"""
+        if instantiate:
+            if pool_cls in (QueuePool, AsyncAdaptedQueuePool):
+                pool = pool_cls(creator=testing.db.pool._creator, timeout=10)
+            else:
+                pool = pool_cls(
+                    creator=testing.db.pool._creator,
+                )
+
+            options = {"pool": pool}
+        else:
+            if pool_cls in (QueuePool, AsyncAdaptedQueuePool):
+                options = {"poolclass": pool_cls, "pool_timeout": 10}
+            else:
+                options = {"poolclass": pool_cls}
+
+        if not should_work:
+            with expect_raises_message(
+                exc.ArgumentError,
+                f"Pool class {pool_cls.__name__} "
+                "cannot be used with asyncio engine",
+            ):
+                async_testing_engine(options=options)
+            return
+
+        e = async_testing_engine(options=options)
+
+        if pool_cls is AssertionPool:
+            async with e.connect() as conn:
+                result = await conn.scalar(select(1))
+                eq_(result, 1)
+            return
+
+        async def go():
+            async with e.connect() as conn:
+                result = await conn.scalar(select(1))
+                eq_(result, 1)
+                return result
+
+        eq_(await asyncio.gather(*[go() for i in range(10)]), [1] * 10)
+
+    def test_cant_use_async_pool_w_create_engine(self):
+        """supplemental test for #8771"""
+
+        with expect_raises_message(
+            exc.ArgumentError,
+            "Pool class AsyncAdaptedQueuePool "
+            "cannot be used with non-asyncio engine",
+        ):
+            create_engine("sqlite://", poolclass=AsyncAdaptedQueuePool)
+
     @testing.requires.queue_pool
     @async_test
     async def test_dispose(self, async_engine):