]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fix a couple of bugs in the asyncio implementation
authorFederico Caselli <cfederico87@gmail.com>
Sat, 9 Jan 2021 12:25:55 +0000 (13:25 +0100)
committerFederico Caselli <cfederico87@gmail.com>
Thu, 21 Jan 2021 20:42:58 +0000 (21:42 +0100)
Log an informative message if a connection is not closed
and the gc is reclaiming it when using an async dpapi, that
does not support running IO at that stage.

The ``AsyncAdaptedQueue`` used by default on async dpapis
should instantiate a queue only when it's first used
to avoid binding it to a possibly wrong event loop.

Fixes: #5823
Change-Id: Ibfc50e209b1937ae3d6599ae7997f028c7a92c33

doc/build/changelog/unreleased_14/5823.rst
doc/build/orm/extensions/asyncio.rst
lib/sqlalchemy/ext/asyncio/engine.py
lib/sqlalchemy/pool/base.py
lib/sqlalchemy/pool/impl.py
lib/sqlalchemy/util/queue.py
test/base/test_concurrency_py3k.py
test/engine/test_pool.py

index 74debdaa93b172ffe1dbade2d2a7c8ad7e9be0cb..e5f43db4526e24f8eeebcec699811340169e3a44 100644 (file)
@@ -10,4 +10,6 @@
     the connection including transaction rollback or connection close as this
     will often be outside of the event loop.
 
-
+    The ``AsyncAdaptedQueue`` used by default on async dpapis
+    should instantiate a queue only when it's first used
+    to avoid binding it to a possibly wrong event loop.
index aed01678af28ccfcfd7414e28418f3f525701d00..2fa274fcdd40cd0f6abd710e2c988ddd64e53d19 100644 (file)
@@ -255,6 +255,29 @@ differences are as follows:
    concepts, no third party networking libraries as ``gevent`` and ``eventlet``
    provides are in use.
 
+Using multiple asyncio event loops
+----------------------------------
+
+An application that makes use of multiple event loops, for example by combining asyncio
+with multithreading, should not share the same :class:`_asyncio.AsyncEngine`
+with different event loops when using the default pool implementation.
+
+If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another,
+the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's
+re-used on a new event loop. Failing to do so may lead to a ``RuntimeError``
+along the lines of
+``Task <Task pending ...> got Future attached to a different loop``
+
+If the same engine must be shared between different loop, it should be configured
+to disable pooling using :class:`~sqlalchemy.pool.NullPool`, preventing the Engine
+from using any connection more than once:
+
+    from sqlalchemy.pool import NullPool
+    engine = create_async_engine(
+        "postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool
+    )
+
+
 .. currentmodule:: sqlalchemy.ext.asyncio
 
 Engine API Documentation
index 829e89b71673889142fc3b0a111a401532c44854..aa7e60dfb5526d7814f084bb959330e20723c7dd 100644 (file)
@@ -494,7 +494,6 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
 
     .. versionadded:: 1.4
 
-
     """  # noqa
 
     # AsyncEngine is a thin proxy; no state should be added here
index 47d9e2cbad7728b2ee47c08735d49fe4164cf3e0..9b4e61fc3fd336688b267017e92789e6cf8e8771 100644 (file)
@@ -26,7 +26,6 @@ reset_none = util.symbol("reset_none")
 
 
 class _ConnDialect(object):
-
     """partial implementation of :class:`.Dialect`
     which provides DBAPI connection methods.
 
@@ -36,6 +35,8 @@ class _ConnDialect(object):
 
     """
 
+    is_async = False
+
     def do_rollback(self, dbapi_connection):
         dbapi_connection.rollback()
 
@@ -606,11 +607,20 @@ class _ConnectionRecord(object):
 
 
 def _finalize_fairy(
-    connection, connection_record, pool, ref, echo, fairy=None
+    connection,
+    connection_record,
+    pool,
+    ref,  # this is None when called directly, not by the gc
+    echo,
+    fairy=None,
 ):
     """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
     been garbage collected.
 
+    When using an async dialect no IO can happen here (without using
+    a dedicated thread), since this is called outside the greenlet
+    context and with an already running loop. In this case function
+    will only log a message and raise a warning.
     """
 
     if ref:
@@ -624,7 +634,8 @@ def _finalize_fairy(
         assert connection is None
         connection = connection_record.connection
 
-    dont_restore_gced = pool._is_asyncio
+    # null pool is not _is_asyncio but can be used also with async dialects
+    dont_restore_gced = pool._dialect.is_async
 
     if dont_restore_gced:
         detach = not connection_record or ref
@@ -658,11 +669,17 @@ def _finalize_fairy(
 
                     pool._close_connection(connection)
                 else:
-                    util.warn(
-                        "asyncio connection is being garbage "
-                        "collected without being properly closed: %r"
-                        % connection
-                    )
+                    message = (
+                        "The garbage collector is trying to clean up "
+                        "connection %r. This feature is unsupported on async "
+                        "dbapi, since no IO can be performed at this stage to "
+                        "reset the connection. Please close out all "
+                        "connections when they are no longer used, calling "
+                        "``close()`` or using a context manager to "
+                        "manage their lifetime."
+                    ) % connection
+                    pool.logger.error(message)
+                    util.warn(message)
 
         except BaseException as e:
             pool.logger.error(
index 825ac03070184950ca325da8a01bf6b2ef1ee278..08371a31a4f9c905628cf615580065daa2835ef7 100644 (file)
@@ -13,6 +13,7 @@
 import traceback
 import weakref
 
+from .base import _ConnDialect
 from .base import _ConnectionFairy
 from .base import _ConnectionRecord
 from .base import Pool
@@ -221,9 +222,14 @@ class QueuePool(Pool):
         return self._pool.maxsize - self._pool.qsize() + self._overflow
 
 
+class _AsyncConnDialect(_ConnDialect):
+    is_async = True
+
+
 class AsyncAdaptedQueuePool(QueuePool):
     _is_asyncio = True
     _queue_class = sqla_queue.AsyncAdaptedQueue
+    _dialect = _AsyncConnDialect()
 
 
 class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
index ca5a3abded150ec449333ef5d616f5962b300375..30e388248066f4404d4d52eb6cacc9cba7e59fb7 100644 (file)
@@ -26,6 +26,7 @@ from .compat import threading
 from .concurrency import asyncio
 from .concurrency import await_fallback
 from .concurrency import await_only
+from .langhelpers import memoized_property
 
 
 __all__ = ["Empty", "Full", "Queue"]
@@ -206,15 +207,32 @@ class AsyncAdaptedQueue:
     await_ = staticmethod(await_only)
 
     def __init__(self, maxsize=0, use_lifo=False):
-        if use_lifo:
-            self._queue = asyncio.LifoQueue(maxsize=maxsize)
-        else:
-            self._queue = asyncio.Queue(maxsize=maxsize)
         self.use_lifo = use_lifo
         self.maxsize = maxsize
-        self.empty = self._queue.empty
-        self.full = self._queue.full
-        self.qsize = self._queue.qsize
+
+    def empty(self):
+        return self._queue.empty()
+
+    def full(self):
+        return self._queue.full()
+
+    def qsize(self):
+        return self._queue.qsize()
+
+    @memoized_property
+    def _queue(self):
+        # Delay creation of the queue until it is first used, to avoid
+        # binding it to a possibly wrong event loop.
+        # By delaying the creation of the pool we accommodate the common
+        # usage pattern of instanciating the engine at module level, where a
+        # different event loop is in present compared to when the application
+        # is actually run.
+
+        if self.use_lifo:
+            queue = asyncio.LifoQueue(maxsize=self.maxsize)
+        else:
+            queue = asyncio.Queue(maxsize=self.maxsize)
+        return queue
 
     def put_nowait(self, item):
         try:
index e7ae8c9ad203ffd8aadb4c0bfbde2a1b78301958..8eabece92a50b6d43d78ec9eb1e3ad5237537338 100644 (file)
@@ -1,12 +1,18 @@
+import threading
+
 from sqlalchemy import exc
 from sqlalchemy import testing
 from sqlalchemy.testing import async_test
 from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
 from sqlalchemy.testing import expect_raises_message
 from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_true
+from sqlalchemy.util import asyncio
 from sqlalchemy.util import await_fallback
 from sqlalchemy.util import await_only
 from sqlalchemy.util import greenlet_spawn
+from sqlalchemy.util import queue
 
 try:
     from greenlet import greenlet
@@ -152,3 +158,47 @@ class TestAsyncioCompat(fixtures.TestBase):
             "The current operation required an async execution but none was",
         ):
             await greenlet_spawn(run, _require_await=True)
+
+
+class TestAsyncAdaptedQueue(fixtures.TestBase):
+    def test_lazy_init(self):
+        run = [False]
+
+        def thread_go(q):
+            def go():
+                q.get(timeout=0.1)
+
+            with expect_raises(queue.Empty):
+                asyncio.run(greenlet_spawn(go))
+            run[0] = True
+
+        t = threading.Thread(
+            target=thread_go, args=[queue.AsyncAdaptedQueue()]
+        )
+        t.start()
+        t.join()
+
+        is_true(run[0])
+
+    def test_error_other_loop(self):
+        run = [False]
+
+        def thread_go(q):
+            def go():
+                eq_(q.get(block=False), 1)
+                q.get(timeout=0.1)
+
+            with expect_raises_message(
+                RuntimeError, "Task .* attached to a different loop"
+            ):
+                asyncio.run(greenlet_spawn(go))
+
+            run[0] = True
+
+        q = queue.AsyncAdaptedQueue()
+        q.put_nowait(1)
+        t = threading.Thread(target=thread_go, args=[q])
+        t.start()
+        t.join()
+
+        is_true(run[0])
index b43a29fae84dda841ea385df343ce98baaa0bcac..f29373e955dcf7f026f86a2aad65cf812c85fda3 100644 (file)
@@ -10,6 +10,7 @@ from sqlalchemy import pool
 from sqlalchemy import select
 from sqlalchemy import testing
 from sqlalchemy.engine import default
+from sqlalchemy.pool.impl import _AsyncConnDialect
 from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_context_ok
 from sqlalchemy.testing import assert_raises_message
@@ -89,10 +90,12 @@ class PoolTestBase(fixtures.TestBase):
 
     def _queuepool_dbapi_fixture(self, **kw):
         dbapi = MockDBAPI()
-        return (
-            dbapi,
-            pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw),
-        )
+        _is_asyncio = kw.pop("_is_asyncio", False)
+        p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
+        if _is_asyncio:
+            p._is_asyncio = True
+            p._dialect = _AsyncConnDialect()
+        return dbapi, p
 
 
 class PoolTest(PoolTestBase):
@@ -283,6 +286,8 @@ class PoolDialectTest(PoolTestBase):
         canary = []
 
         class PoolDialect(object):
+            is_async = False
+
             def do_rollback(self, dbapi_connection):
                 canary.append("R")
                 dbapi_connection.rollback()
@@ -361,8 +366,8 @@ class PoolEventsTest(PoolTestBase):
 
         return p, canary
 
-    def _checkin_event_fixture(self):
-        p = self._queuepool_fixture()
+    def _checkin_event_fixture(self, _is_asyncio=False):
+        p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
         canary = []
 
         @event.listens_for(p, "checkin")
@@ -639,10 +644,7 @@ class PoolEventsTest(PoolTestBase):
 
     @testing.combinations((True, testing.requires.python3), (False,))
     def test_checkin_event_gc(self, detach_gced):
-        p, canary = self._checkin_event_fixture()
-
-        if detach_gced:
-            p._is_asyncio = True
+        p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
 
         c1 = p.connect()
 
@@ -1517,11 +1519,11 @@ class QueuePoolTest(PoolTestBase):
     @testing.combinations((True, testing.requires.python3), (False,))
     def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):
         dbapi, pool = self._queuepool_dbapi_fixture(
-            pool_size=1, max_overflow=2
+            pool_size=1, max_overflow=2, _is_asyncio=detach_gced
         )
 
         if detach_gced:
-            pool._is_asyncio = True
+            pool._dialect.is_async = True
 
         @event.listens_for(pool, "checkout")
         def handle_checkout_event(dbapi_con, con_record, con_proxy):