From: Federico Caselli Date: Thu, 3 Dec 2020 22:53:47 +0000 (+0100) Subject: Detect non compatible execution in async mode X-Git-Tag: rel_1_4_0b2~114^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=76d90152302461637cfecb6c0cac65a50975c570;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Detect non compatible execution in async mode The SQLAlchemy async mode now detects and raises an informative error when an non asyncio compatible :term:`DBAPI` is used. Using a standard ``DBAPI`` with async SQLAlchemy will cause it to block like any sync call, interrupting the executing asyncio loop. Change-Id: I9aed87dc1b0df53e8cb2109495237038aa2cb2d4 --- diff --git a/doc/build/changelog/unreleased_14/async_dbapi_detection.rst b/doc/build/changelog/unreleased_14/async_dbapi_detection.rst new file mode 100644 index 0000000000..4764f7ad4b --- /dev/null +++ b/doc/build/changelog/unreleased_14/async_dbapi_detection.rst @@ -0,0 +1,8 @@ +.. change:: + :tags: asyncio + + The SQLAlchemy async mode now detects and raises an informative + error when an non asyncio compatible :term:`DBAPI` is used. + Using a standard ``DBAPI`` with async SQLAlchemy will cause + it to block like any sync call, interrupting the executing asyncio + loop. diff --git a/doc/build/errors.rst b/doc/build/errors.rst index 42c0db9776..a52444766d 100644 --- a/doc/build/errors.rst +++ b/doc/build/errors.rst @@ -1114,6 +1114,24 @@ message for details. :ref:`error_bbf0` + +AsyncIO Exceptions +================== + +.. _error_xd1r: + +AwaitRequired +------------- + +The SQLAlchemy async mode requires an async driver to be used to connect to the db. +This error is usually raised when trying to use the async version of SQLAlchemy +with a non compatible :term:`DBAPI`. + +.. seealso:: + + :ref:`asyncio extension ` + + Core Exception Classes ====================== diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 7c1bbb18ef..c56c1f0203 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -419,8 +419,8 @@ From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A candidate row will only be inserted if that row does not violate any unique or primary key constraints. In the case of a unique constraint violation, a -secondary action can occur which can be either “DO UPDATE”, indicating that -the data in the target row should be updated, or “DO NOTHING”, which indicates +secondary action can occur which can be either "DO UPDATE", indicating that +the data in the target row should be updated, or "DO NOTHING", which indicates to silently skip this row. Conflicts are determined using columns that are part of existing unique @@ -469,7 +469,7 @@ and :meth:`_sqlite.Insert.on_conflict_do_nothing`: Specifying the Target ^^^^^^^^^^^^^^^^^^^^^ -Both methods supply the “target” of the conflict using column inference: +Both methods supply the "target" of the conflict using column inference: * The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument specifies a sequence containing string column names, :class:`_schema.Column` diff --git a/lib/sqlalchemy/exc.py b/lib/sqlalchemy/exc.py index 7ba2e369b6..63c56c34d8 100644 --- a/lib/sqlalchemy/exc.py +++ b/lib/sqlalchemy/exc.py @@ -285,6 +285,15 @@ class NoReferenceError(InvalidRequestError): """Raised by ``ForeignKey`` to indicate a reference cannot be resolved.""" +class AwaitRequired(InvalidRequestError): + """Error raised by the async greenlet spawn if no async operation + was awaited when it required one + + """ + + code = "xd1r" + + class NoReferencedTableError(NoReferenceError): """Raised by ``ForeignKey`` when the referred ``Table`` cannot be located. @@ -355,10 +364,6 @@ class DontWrapMixin(object): """ -# Moved to orm.exc; compatibility definition installed by orm import until 0.6 -UnmappedColumnError = None - - class StatementError(SQLAlchemyError): """An error occurred during execution of a SQL statement. diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index 9e4851dfcd..16edcc2b2a 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -243,6 +243,7 @@ class AsyncConnection(StartableContext, AsyncConnectable): statement, parameters, execution_options, + _require_await=True, ) if result.context._is_server_side: raise async_exc.AsyncMethodRequired( @@ -272,6 +273,7 @@ class AsyncConnection(StartableContext, AsyncConnectable): util.EMPTY_DICT.merge_with( execution_options, {"stream_results": True} ), + _require_await=True, ) if not result.context._is_server_side: # TODO: real exception here @@ -322,6 +324,7 @@ class AsyncConnection(StartableContext, AsyncConnectable): statement, parameters, execution_options, + _require_await=True, ) if result.context._is_server_side: raise async_exc.AsyncMethodRequired( diff --git a/lib/sqlalchemy/testing/asyncio.py b/lib/sqlalchemy/testing/asyncio.py deleted file mode 100644 index 2e274de16f..0000000000 --- a/lib/sqlalchemy/testing/asyncio.py +++ /dev/null @@ -1,14 +0,0 @@ -from .assertions import assert_raises as _assert_raises -from .assertions import assert_raises_message as _assert_raises_message -from ..util import await_fallback as await_ -from ..util import greenlet_spawn - - -async def assert_raises_async(except_cls, msg, coroutine): - await greenlet_spawn(_assert_raises, except_cls, await_, coroutine) - - -async def assert_raises_message_async(except_cls, msg, coroutine): - await greenlet_spawn( - _assert_raises_message, except_cls, msg, await_, coroutine - ) diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py index dcee057134..8ad3be5439 100644 --- a/lib/sqlalchemy/util/_concurrency_py3k.py +++ b/lib/sqlalchemy/util/_concurrency_py3k.py @@ -79,7 +79,9 @@ def await_fallback(awaitable: Coroutine) -> Any: return current.driver.switch(awaitable) -async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any: +async def greenlet_spawn( + fn: Callable, *args, _require_await=False, **kwargs +) -> Any: """Runs a sync function ``fn`` in a new greenlet. The sync function can then use :func:`await_` to wait for async @@ -95,9 +97,11 @@ async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any: # is interrupted by await_, context is not dead and result is a # coroutine to wait. If the context is dead the function has # returned, and its result can be returned. + switch_occurred = False try: result = context.switch(*args, **kwargs) while not context.dead: + switch_occurred = True try: # wait for a coroutine from await_ and then return its # result back to it. @@ -112,6 +116,12 @@ async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any: finally: # clean up to avoid cycle resolution by gc del context.driver + if _require_await and not switch_occurred: + raise exc.AwaitRequired( + "The current operation required an async execution but none was " + "detected. This will usually happen when using a non compatible " + "DBAPI driver. Please ensure that an async DBAPI is used." + ) return result diff --git a/test/base/test_concurrency_py3k.py b/test/base/test_concurrency_py3k.py index ba53ea6352..cf1067667d 100644 --- a/test/base/test_concurrency_py3k.py +++ b/test/base/test_concurrency_py3k.py @@ -138,3 +138,16 @@ class TestAsyncioCompat(fixtures.TestBase): ) } eq_(values, set(range(concurrency))) + + @async_test + async def test_require_await(self): + def run(): + return 1 + 1 + + assert (await greenlet_spawn(run)) == 2 + + with expect_raises_message( + exc.AwaitRequired, + "The current operation required an async execution but none was", + ): + await greenlet_spawn(run, _require_await=True) diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index 83987b06f1..a361ff835a 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -16,12 +16,14 @@ from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import engine as _async_engine from sqlalchemy.ext.asyncio import exc as asyncio_exc from sqlalchemy.testing import async_test +from sqlalchemy.testing import combinations from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises +from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_not from sqlalchemy.testing import mock -from sqlalchemy.testing.asyncio import assert_raises_message_async from sqlalchemy.util.concurrency import greenlet_spawn @@ -254,12 +256,12 @@ class AsyncEngineTest(EngineFixture): async with async_engine.connect() as conn: trans = conn.begin() - await assert_raises_message_async( + with expect_raises_message( asyncio_exc.AsyncContextNotStarted, "AsyncTransaction context has not been started " "and object has not been awaited.", - trans.rollback(), - ) + ): + await trans.rollback(), @async_test async def test_pool_exhausted(self, async_engine): @@ -270,11 +272,8 @@ class AsyncEngineTest(EngineFixture): pool_timeout=0.1, ) async with engine.connect(): - await assert_raises_message_async( - asyncio.TimeoutError, - "", - engine.connect(), - ) + with expect_raises(asyncio.TimeoutError): + await engine.connect() @async_test async def test_create_async_engine_server_side_cursor(self, async_engine): @@ -530,15 +529,11 @@ class AsyncResultTest(EngineFixture): select(users).where(users.c.user_name == "nonexistent") ) - async def go(): + with expect_raises_message( + exc.NoResultFound, "No row was found when one was required" + ): await result.one() - await assert_raises_message_async( - exc.NoResultFound, - "No row was found when one was required", - go(), - ) - @async_test async def test_one_multi_result(self, async_engine): users = self.tables.users @@ -547,11 +542,38 @@ class AsyncResultTest(EngineFixture): select(users).where(users.c.user_name.in_(["name3", "name5"])) ) - async def go(): - await result.one() - - await assert_raises_message_async( + with expect_raises_message( exc.MultipleResultsFound, "Multiple rows were found when exactly one was required", - go(), + ): + await result.one() + + +class TextSyncDBAPI(fixtures.TestBase): + @testing.fixture + def async_engine(self): + return create_async_engine("sqlite:///:memory:") + + @async_test + @combinations( + lambda conn: conn.exec_driver_sql("select 1"), + lambda conn: conn.stream(text("select 1")), + lambda conn: conn.execute(text("select 1")), + argnames="case", + ) + async def test_sync_driver_execution(self, async_engine, case): + with expect_raises_message( + exc.AwaitRequired, + "The current operation required an async execution but none was", + ): + async with async_engine.connect() as conn: + await case(conn) + + @async_test + async def test_sync_driver_run_sync(self, async_engine): + async with async_engine.connect() as conn: + res = await conn.run_sync( + lambda conn: conn.scalar(text("select 1")) ) + assert res == 1 + assert await conn.run_sync(lambda _: 2) == 2