--- /dev/null
+.. change::
+ :tags: bug, mssql
+ :tickets: 12798
+
+ Improved the base implementation of the asyncio cursor such that it
+ includes the option for the underlying driver's cursor to be actively
+ closed in those cases where it requires ``await`` in order to complete the
+ close sequence, rather than relying on garbage collection to "close" it,
+ when a plain :class:`.Result` is returned that does not use ``await`` for
+ any of its methods. The previous approach of relying on gc was fine for
+ MySQL and SQLite dialects but has caused problems with the aioodbc
+ implementation on top of SQL Server. The new option is enabled
+ for those dialects which have an "awaitable" ``cursor.close()``, which
+ includes the aioodbc, aiomysql, and asyncmy dialects (aiosqlite is also
+ modified for 2.1 only).
from typing import TYPE_CHECKING
from ..engine import AdaptedConnection
+from ..util import EMPTY_DICT
from ..util.concurrency import await_
+from ..util.concurrency import in_greenlet
if TYPE_CHECKING:
from ..engine.interfaces import _DBAPICursorDescription
"_connection",
"_cursor",
"_rows",
+ "_soft_closed_memoized",
)
+ _awaitable_cursor_close: bool = True
+
_cursor: AsyncIODBAPICursor
_adapt_connection: AsyncAdapt_dbapi_connection
_connection: AsyncIODBAPIConnection
cursor = self._make_new_cursor(self._connection)
self._cursor = self._aenter_cursor(cursor)
-
+ self._soft_closed_memoized = EMPTY_DICT
if not self.server_side:
self._rows = collections.deque()
@property
def description(self) -> Optional[_DBAPICursorDescription]:
+ if "description" in self._soft_closed_memoized:
+ return self._soft_closed_memoized["description"] # type: ignore[no-any-return] # noqa: E501
return self._cursor.description
@property
def lastrowid(self) -> int:
return self._cursor.lastrowid
+ async def _async_soft_close(self) -> None:
+ """close the cursor but keep the results pending, and memoize the
+ description.
+
+ .. versionadded:: 2.0.44
+
+ """
+
+ if not self._awaitable_cursor_close or self.server_side:
+ return
+
+ self._soft_closed_memoized = self._soft_closed_memoized.union(
+ {
+ "description": self._cursor.description,
+ }
+ )
+ await self._cursor.close()
+
def close(self) -> None:
- # note we aren't actually closing the cursor here,
- # we are just letting GC do it. see notes in aiomysql dialect
self._rows.clear()
+ # updated as of 2.0.44
+ # try to "close" the cursor based on what we know about the driver
+ # and if we are able to. otherwise, hope that the asyncio
+ # extension called _async_soft_close() if the cursor is going into
+ # a sync context
+ if self._cursor is None or bool(self._soft_closed_memoized):
+ return
+
+ if not self._awaitable_cursor_close:
+ self._cursor.close() # type: ignore[unused-coroutine]
+ elif in_greenlet():
+ await_(self._cursor.close())
+
def execute(
self,
operation: Any,
class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor):
_cursor: AsyncCursor
+ _awaitable_cursor_close: bool = False
+
__slots__ = ()
@property
def var(self, *args, **kwargs):
return self._cursor.var(*args, **kwargs)
- def close(self):
- self._rows.clear()
- self._cursor.close()
-
def setinputsizes(self, *args: Any, **kwargs: Any) -> Any:
return self._cursor.setinputsizes(*args, **kwargs)
_adapt_connection: AsyncAdapt_asyncpg_connection
_connection: _AsyncpgConnection
_cursor: Optional[_AsyncpgCursor]
+ _awaitable_cursor_close: bool = False
def __init__(self, adapt_connection: AsyncAdapt_asyncpg_connection):
self._adapt_connection = adapt_connection
class AsyncAdapt_psycopg_cursor(AsyncAdapt_dbapi_cursor):
__slots__ = ()
+ _awaitable_cursor_close: bool = False
+
def close(self):
self._rows.clear()
# Normal cursor just call _close() in a non-sync way.
calling_method.__self__.__class__.__name__,
)
)
+
+ if is_cursor and cursor_result.cursor is not None:
+ await cursor_result.cursor._async_soft_close()
return result
else:
testing.fail(method)
+ @async_test
+ @testing.requires.async_dialect_with_await_close
+ async def test_active_await_close(self, async_engine):
+ select_one_sql = select(1).compile(async_engine.sync_engine).string
+
+ async with async_engine.connect() as conn:
+ result = await conn.exec_driver_sql(select_one_sql)
+ eq_(result.scalar_one(), 1)
+ driver_cursor = result.context.cursor._cursor
+
+ with expect_raises(Exception):
+ # because the cursor should be closed
+ await driver_cursor.execute(select_one_sql)
+
class AsyncCreatePoolTest(fixtures.TestBase):
@config.fixture
)
)
+ @property
+ def async_dialect_with_await_close(self):
+ """dialect's cursor has a close() method called with await"""
+
+ return only_on(["+aioodbc", "+aiosqlite", "+aiomysql", "+asyncmy"])
+
def _has_oracle_test_dblink(self, key):
def check(config):
assert config.db.dialect.name == "oracle"