--- /dev/null
+.. change::
+ :tags: bug, asyncio
+ :tickets: 7667
+
+ Fixed issue where the :meth:`_asyncio.AsyncSession.execute` method failed
+ to raise an informative exception if the ``stream_results`` execution
+ option were used, which is incompatible with a sync-style
+ :class:`_result.Result` object. An exception is now raised in this scenario
+ in the same way one is already raised when using ``stream_results`` in
+ conjunction with the :meth:`_asyncio.AsyncConnection.execute` method.
+ Additionally, for improved stability with state-sensitive dialects such as
+ asyncmy, the cursor is now closed when this error condition is raised;
+ previously with the asyncmy dialect, the connection would go into an
+ invalid state with unconsumed server side results remaining.
+
_cursor_metadata = CursorResultMetaData
_cursor_strategy_cls = CursorFetchStrategy
_no_result_metadata = _NO_RESULT_METADATA
+ _is_cursor = True
def _fetchiter_impl(self):
fetchone = self.cursor_strategy.fetchone
_generate_rows = True
_unique_filter_state = None
_post_creational_filter = None
+ _is_cursor = False
@HasMemoized.memoized_attribute
def _row_getter(self):
from . import exc as async_exc
from .base import ProxyComparable
from .base import StartableContext
+from .result import _ensure_sync_result
from .result import AsyncResult
from ... import exc
from ... import inspection
execution_options,
_require_await=True,
)
- if result.context._is_server_side:
- raise async_exc.AsyncMethodRequired(
- "Can't use the connection.exec_driver_sql() method with a "
- "server-side cursor."
- "Use the connection.stream() method for an async "
- "streaming result set."
- )
- return result
+ return await _ensure_sync_result(result, self.exec_driver_sql)
async def stream(
self,
execution_options,
_require_await=True,
)
- if result.context._is_server_side:
- raise async_exc.AsyncMethodRequired(
- "Can't use the connection.execute() method with a "
- "server-side cursor."
- "Use the connection.stream() method for an async "
- "streaming result set."
- )
- return result
+ return await _ensure_sync_result(result, self.execute)
async def scalar(
self,
import operator
+from . import exc as async_exc
from ...engine.result import _NO_ROW
from ...engine.result import FilterResult
from ...engine.result import FrozenResult
"""
return await greenlet_spawn(self._only_one_row, True, True, False)
+
+
+async def _ensure_sync_result(result, calling_method):
+ if not result._is_cursor:
+ cursor_result = getattr(result, "raw", None)
+ else:
+ cursor_result = result
+ if cursor_result and cursor_result.context._is_server_side:
+ await greenlet_spawn(cursor_result.close)
+ raise async_exc.AsyncMethodRequired(
+ "Can't use the %s.%s() method with a "
+ "server-side cursor. "
+ "Use the %s.stream() method for an async "
+ "streaming result set."
+ % (
+ calling_method.__self__.__class__.__name__,
+ calling_method.__name__,
+ calling_method.__self__.__class__.__name__,
+ )
+ )
+ return result
from . import result as _result
from .base import ReversibleProxy
from .base import StartableContext
+from .result import _ensure_sync_result
from ... import util
from ...orm import object_session
from ...orm import Session
else:
execution_options = _EXECUTE_OPTIONS
- return await greenlet_spawn(
+ result = await greenlet_spawn(
self.sync_session.execute,
statement,
params=params,
bind_arguments=bind_arguments,
**kw,
)
+ return await _ensure_sync_result(result, self.execute)
async def scalar(
self,
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import engine as _async_engine
+from sqlalchemy.ext.asyncio import exc as async_exc
from sqlalchemy.ext.asyncio import exc as asyncio_exc
from sqlalchemy.ext.asyncio.base import ReversibleProxy
from sqlalchemy.ext.asyncio.engine import AsyncConnection
class AsyncResultTest(EngineFixture):
+ @async_test
+ async def test_no_ss_cursor_w_execute(self, async_engine):
+ users = self.tables.users
+ async with async_engine.connect() as conn:
+ conn = await conn.execution_options(stream_results=True)
+ with expect_raises_message(
+ async_exc.AsyncMethodRequired,
+ r"Can't use the AsyncConnection.execute\(\) method with a "
+ r"server-side cursor. Use the AsyncConnection.stream\(\) "
+ r"method for an async streaming result set.",
+ ):
+ await conn.execute(select(users))
+
+ @async_test
+ async def test_no_ss_cursor_w_exec_driver_sql(self, async_engine):
+ async with async_engine.connect() as conn:
+ conn = await conn.execution_options(stream_results=True)
+ with expect_raises_message(
+ async_exc.AsyncMethodRequired,
+ r"Can't use the AsyncConnection.exec_driver_sql\(\) "
+ r"method with a "
+ r"server-side cursor. Use the AsyncConnection.stream\(\) "
+ r"method for an async streaming result set.",
+ ):
+ await conn.exec_driver_sql("SELECT * FROM users")
+
@testing.combinations(
(None,), ("scalars",), ("mappings",), argnames="filter_"
)
from sqlalchemy import update
from sqlalchemy.ext.asyncio import async_object_session
from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.ext.asyncio import exc as async_exc
from sqlalchemy.ext.asyncio.base import ReversibleProxy
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.testing import async_test
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
],
)
+ @testing.combinations("statement", "execute", argnames="location")
+ @async_test
+ async def test_no_ss_cursor_w_execute(self, async_session, location):
+ User = self.classes.User
+
+ stmt = select(User)
+ if location == "statement":
+ stmt = stmt.execution_options(stream_results=True)
+
+ with expect_raises_message(
+ async_exc.AsyncMethodRequired,
+ r"Can't use the AsyncSession.execute\(\) method with a "
+ r"server-side cursor. Use the AsyncSession.stream\(\) "
+ r"method for an async streaming result set.",
+ ):
+ if location == "execute":
+ await async_session.execute(
+ stmt, execution_options={"stream_results": True}
+ )
+ else:
+ await async_session.execute(stmt)
+
class AsyncSessionTransactionTest(AsyncFixture):
run_inserts = None