From: Federico Caselli Date: Sat, 9 Jan 2021 12:25:55 +0000 (+0100) Subject: Fix a couple of bugs in the asyncio implementation X-Git-Tag: rel_1_4_0b2~33 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e56534995de2a97210d9c3d58183e8d245cdae94;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Fix a couple of bugs in the asyncio implementation Log an informative message if a connection is not closed and the gc is reclaiming it when using an async dpapi, that does not support running IO at that stage. The ``AsyncAdaptedQueue`` used by default on async dpapis should instantiate a queue only when it's first used to avoid binding it to a possibly wrong event loop. Fixes: #5823 Change-Id: Ibfc50e209b1937ae3d6599ae7997f028c7a92c33 --- diff --git a/doc/build/changelog/unreleased_14/5823.rst b/doc/build/changelog/unreleased_14/5823.rst index 74debdaa93..e5f43db452 100644 --- a/doc/build/changelog/unreleased_14/5823.rst +++ b/doc/build/changelog/unreleased_14/5823.rst @@ -10,4 +10,6 @@ the connection including transaction rollback or connection close as this will often be outside of the event loop. - + The ``AsyncAdaptedQueue`` used by default on async dpapis + should instantiate a queue only when it's first used + to avoid binding it to a possibly wrong event loop. diff --git a/doc/build/orm/extensions/asyncio.rst b/doc/build/orm/extensions/asyncio.rst index aed01678af..2fa274fcdd 100644 --- a/doc/build/orm/extensions/asyncio.rst +++ b/doc/build/orm/extensions/asyncio.rst @@ -255,6 +255,29 @@ differences are as follows: concepts, no third party networking libraries as ``gevent`` and ``eventlet`` provides are in use. +Using multiple asyncio event loops +---------------------------------- + +An application that makes use of multiple event loops, for example by combining asyncio +with multithreading, should not share the same :class:`_asyncio.AsyncEngine` +with different event loops when using the default pool implementation. + +If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another, +the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's +re-used on a new event loop. Failing to do so may lead to a ``RuntimeError`` +along the lines of +``Task got Future attached to a different loop`` + +If the same engine must be shared between different loop, it should be configured +to disable pooling using :class:`~sqlalchemy.pool.NullPool`, preventing the Engine +from using any connection more than once: + + from sqlalchemy.pool import NullPool + engine = create_async_engine( + "postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool + ) + + .. currentmodule:: sqlalchemy.ext.asyncio Engine API Documentation diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index 829e89b716..aa7e60dfb5 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -494,7 +494,6 @@ class AsyncEngine(ProxyComparable, AsyncConnectable): .. versionadded:: 1.4 - """ # noqa # AsyncEngine is a thin proxy; no state should be added here diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index 47d9e2cbad..9b4e61fc3f 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -26,7 +26,6 @@ reset_none = util.symbol("reset_none") class _ConnDialect(object): - """partial implementation of :class:`.Dialect` which provides DBAPI connection methods. @@ -36,6 +35,8 @@ class _ConnDialect(object): """ + is_async = False + def do_rollback(self, dbapi_connection): dbapi_connection.rollback() @@ -606,11 +607,20 @@ class _ConnectionRecord(object): def _finalize_fairy( - connection, connection_record, pool, ref, echo, fairy=None + connection, + connection_record, + pool, + ref, # this is None when called directly, not by the gc + echo, + fairy=None, ): """Cleanup for a :class:`._ConnectionFairy` whether or not it's already been garbage collected. + When using an async dialect no IO can happen here (without using + a dedicated thread), since this is called outside the greenlet + context and with an already running loop. In this case function + will only log a message and raise a warning. """ if ref: @@ -624,7 +634,8 @@ def _finalize_fairy( assert connection is None connection = connection_record.connection - dont_restore_gced = pool._is_asyncio + # null pool is not _is_asyncio but can be used also with async dialects + dont_restore_gced = pool._dialect.is_async if dont_restore_gced: detach = not connection_record or ref @@ -658,11 +669,17 @@ def _finalize_fairy( pool._close_connection(connection) else: - util.warn( - "asyncio connection is being garbage " - "collected without being properly closed: %r" - % connection - ) + message = ( + "The garbage collector is trying to clean up " + "connection %r. This feature is unsupported on async " + "dbapi, since no IO can be performed at this stage to " + "reset the connection. Please close out all " + "connections when they are no longer used, calling " + "``close()`` or using a context manager to " + "manage their lifetime." + ) % connection + pool.logger.error(message) + util.warn(message) except BaseException as e: pool.logger.error( diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py index 825ac03070..08371a31a4 100644 --- a/lib/sqlalchemy/pool/impl.py +++ b/lib/sqlalchemy/pool/impl.py @@ -13,6 +13,7 @@ import traceback import weakref +from .base import _ConnDialect from .base import _ConnectionFairy from .base import _ConnectionRecord from .base import Pool @@ -221,9 +222,14 @@ class QueuePool(Pool): return self._pool.maxsize - self._pool.qsize() + self._overflow +class _AsyncConnDialect(_ConnDialect): + is_async = True + + class AsyncAdaptedQueuePool(QueuePool): _is_asyncio = True _queue_class = sqla_queue.AsyncAdaptedQueue + _dialect = _AsyncConnDialect() class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool): diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index ca5a3abded..30e3882480 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -26,6 +26,7 @@ from .compat import threading from .concurrency import asyncio from .concurrency import await_fallback from .concurrency import await_only +from .langhelpers import memoized_property __all__ = ["Empty", "Full", "Queue"] @@ -206,15 +207,32 @@ class AsyncAdaptedQueue: await_ = staticmethod(await_only) def __init__(self, maxsize=0, use_lifo=False): - if use_lifo: - self._queue = asyncio.LifoQueue(maxsize=maxsize) - else: - self._queue = asyncio.Queue(maxsize=maxsize) self.use_lifo = use_lifo self.maxsize = maxsize - self.empty = self._queue.empty - self.full = self._queue.full - self.qsize = self._queue.qsize + + def empty(self): + return self._queue.empty() + + def full(self): + return self._queue.full() + + def qsize(self): + return self._queue.qsize() + + @memoized_property + def _queue(self): + # Delay creation of the queue until it is first used, to avoid + # binding it to a possibly wrong event loop. + # By delaying the creation of the pool we accommodate the common + # usage pattern of instanciating the engine at module level, where a + # different event loop is in present compared to when the application + # is actually run. + + if self.use_lifo: + queue = asyncio.LifoQueue(maxsize=self.maxsize) + else: + queue = asyncio.Queue(maxsize=self.maxsize) + return queue def put_nowait(self, item): try: diff --git a/test/base/test_concurrency_py3k.py b/test/base/test_concurrency_py3k.py index e7ae8c9ad2..8eabece92a 100644 --- a/test/base/test_concurrency_py3k.py +++ b/test/base/test_concurrency_py3k.py @@ -1,12 +1,18 @@ +import threading + from sqlalchemy import exc from sqlalchemy import testing from sqlalchemy.testing import async_test from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures +from sqlalchemy.testing import is_true +from sqlalchemy.util import asyncio from sqlalchemy.util import await_fallback from sqlalchemy.util import await_only from sqlalchemy.util import greenlet_spawn +from sqlalchemy.util import queue try: from greenlet import greenlet @@ -152,3 +158,47 @@ class TestAsyncioCompat(fixtures.TestBase): "The current operation required an async execution but none was", ): await greenlet_spawn(run, _require_await=True) + + +class TestAsyncAdaptedQueue(fixtures.TestBase): + def test_lazy_init(self): + run = [False] + + def thread_go(q): + def go(): + q.get(timeout=0.1) + + with expect_raises(queue.Empty): + asyncio.run(greenlet_spawn(go)) + run[0] = True + + t = threading.Thread( + target=thread_go, args=[queue.AsyncAdaptedQueue()] + ) + t.start() + t.join() + + is_true(run[0]) + + def test_error_other_loop(self): + run = [False] + + def thread_go(q): + def go(): + eq_(q.get(block=False), 1) + q.get(timeout=0.1) + + with expect_raises_message( + RuntimeError, "Task .* attached to a different loop" + ): + asyncio.run(greenlet_spawn(go)) + + run[0] = True + + q = queue.AsyncAdaptedQueue() + q.put_nowait(1) + t = threading.Thread(target=thread_go, args=[q]) + t.start() + t.join() + + is_true(run[0]) diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index b43a29fae8..f29373e955 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -10,6 +10,7 @@ from sqlalchemy import pool from sqlalchemy import select from sqlalchemy import testing from sqlalchemy.engine import default +from sqlalchemy.pool.impl import _AsyncConnDialect from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_context_ok from sqlalchemy.testing import assert_raises_message @@ -89,10 +90,12 @@ class PoolTestBase(fixtures.TestBase): def _queuepool_dbapi_fixture(self, **kw): dbapi = MockDBAPI() - return ( - dbapi, - pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw), - ) + _is_asyncio = kw.pop("_is_asyncio", False) + p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw) + if _is_asyncio: + p._is_asyncio = True + p._dialect = _AsyncConnDialect() + return dbapi, p class PoolTest(PoolTestBase): @@ -283,6 +286,8 @@ class PoolDialectTest(PoolTestBase): canary = [] class PoolDialect(object): + is_async = False + def do_rollback(self, dbapi_connection): canary.append("R") dbapi_connection.rollback() @@ -361,8 +366,8 @@ class PoolEventsTest(PoolTestBase): return p, canary - def _checkin_event_fixture(self): - p = self._queuepool_fixture() + def _checkin_event_fixture(self, _is_asyncio=False): + p = self._queuepool_fixture(_is_asyncio=_is_asyncio) canary = [] @event.listens_for(p, "checkin") @@ -639,10 +644,7 @@ class PoolEventsTest(PoolTestBase): @testing.combinations((True, testing.requires.python3), (False,)) def test_checkin_event_gc(self, detach_gced): - p, canary = self._checkin_event_fixture() - - if detach_gced: - p._is_asyncio = True + p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced) c1 = p.connect() @@ -1517,11 +1519,11 @@ class QueuePoolTest(PoolTestBase): @testing.combinations((True, testing.requires.python3), (False,)) def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced): dbapi, pool = self._queuepool_dbapi_fixture( - pool_size=1, max_overflow=2 + pool_size=1, max_overflow=2, _is_asyncio=detach_gced ) if detach_gced: - pool._is_asyncio = True + pool._dialect.is_async = True @event.listens_for(pool, "checkout") def handle_checkout_event(dbapi_con, con_record, con_proxy):