Improved the base implementation of the asyncio cursor such that it
includes the option for the underlying driver's cursor to be actively
closed in those cases where it requires ``await`` in order to complete the
close sequence, rather than relying on garbage collection to "close" it,
when a plain :class:`.Result` is returned that does not use ``await`` for
any of its methods. The previous approach of relying on gc was fine for
MySQL and SQLite dialects but has caused problems with the aioodbc
implementation on top of SQL Server. The new option is enabled
for those dialects which have an "awaitable" ``cursor.close()``, which
includes the aioodbc, aiomysql, and asyncmy dialects (aiosqlite is also
modified for 2.1 only).
Fixes: #12798
Change-Id: Ib17d611201fedf9780dfe3d760760ace99a8835c
(cherry picked from commit
5dbb5ec0e4ce71f7b806b87808a504083a7e8ffa)
--- /dev/null
+.. change::
+ :tags: bug, mssql
+ :tickets: 12798
+
+ Improved the base implementation of the asyncio cursor such that it
+ includes the option for the underlying driver's cursor to be actively
+ closed in those cases where it requires ``await`` in order to complete the
+ close sequence, rather than relying on garbage collection to "close" it,
+ when a plain :class:`.Result` is returned that does not use ``await`` for
+ any of its methods. The previous approach of relying on gc was fine for
+ MySQL and SQLite dialects but has caused problems with the aioodbc
+ implementation on top of SQL Server. The new option is enabled
+ for those dialects which have an "awaitable" ``cursor.close()``, which
+ includes the aioodbc, aiomysql, and asyncmy dialects (aiosqlite is also
+ modified for 2.1 only).
from typing import TYPE_CHECKING
from ..engine import AdaptedConnection
+from ..util import EMPTY_DICT
from ..util.concurrency import await_fallback
from ..util.concurrency import await_only
+from ..util.concurrency import in_greenlet
from ..util.typing import Protocol
if TYPE_CHECKING:
"await_",
"_cursor",
"_rows",
+ "_soft_closed_memoized",
)
+ _awaitable_cursor_close: bool = True
+
_cursor: AsyncIODBAPICursor
_adapt_connection: AsyncAdapt_dbapi_connection
_connection: AsyncIODBAPIConnection
cursor = self._make_new_cursor(self._connection)
self._cursor = self._aenter_cursor(cursor)
-
+ self._soft_closed_memoized = EMPTY_DICT
if not self.server_side:
self._rows = collections.deque()
@property
def description(self) -> Optional[_DBAPICursorDescription]:
+ if "description" in self._soft_closed_memoized:
+ return self._soft_closed_memoized["description"] # type: ignore[no-any-return] # noqa: E501
return self._cursor.description
@property
def lastrowid(self) -> int:
return self._cursor.lastrowid
+ async def _async_soft_close(self) -> None:
+ """close the cursor but keep the results pending, and memoize the
+ description.
+
+ .. versionadded:: 2.0.44
+
+ """
+
+ if not self._awaitable_cursor_close or self.server_side:
+ return
+
+ self._soft_closed_memoized = self._soft_closed_memoized.union(
+ {
+ "description": self._cursor.description,
+ }
+ )
+ await self._cursor.close()
+
def close(self) -> None:
- # note we aren't actually closing the cursor here,
- # we are just letting GC do it. see notes in aiomysql dialect
self._rows.clear()
+ # updated as of 2.0.44
+ # try to "close" the cursor based on what we know about the driver
+ # and if we are able to. otherwise, hope that the asyncio
+ # extension called _async_soft_close() if the cursor is going into
+ # a sync context
+ if self._cursor is None or bool(self._soft_closed_memoized):
+ return
+
+ if not self._awaitable_cursor_close:
+ self._cursor.close() # type: ignore[unused-coroutine]
+ elif in_greenlet():
+ self.await_(self._cursor.close())
+
def execute(
self,
operation: Any,
class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor):
_cursor: AsyncCursor
+ _awaitable_cursor_close: bool = False
+
__slots__ = ()
@property
def var(self, *args, **kwargs):
return self._cursor.var(*args, **kwargs)
- def close(self):
- self._rows.clear()
- self._cursor.close()
-
def setinputsizes(self, *args: Any, **kwargs: Any) -> Any:
return self._cursor.setinputsizes(*args, **kwargs)
)
server_side = False
+ _awaitable_cursor_close: bool = False
def __init__(self, adapt_connection):
self._adapt_connection = adapt_connection
self.rowcount = -1
self._invalidate_schema_cache_asof = 0
+ async def _async_soft_close(self) -> None:
+ return
+
def close(self):
self._rows.clear()
def arraysize(self, value):
self._cursor.arraysize = value
+ async def _async_soft_close(self) -> None:
+ return
+
def close(self):
self._rows.clear()
# Normal cursor just call _close() in a non-sync way.
self.description: Optional[_DBAPICursorDescription] = None
self._rows: Deque[Any] = deque()
+ async def _async_soft_close(self) -> None:
+ return
+
def close(self) -> None:
self._rows.clear()
calling_method.__self__.__class__.__name__,
)
)
+
+ if is_cursor and cursor_result.cursor is not None:
+ await cursor_result.cursor._async_soft_close()
return result
else:
testing.fail(method)
+ @async_test
+ @testing.requires.async_dialect_with_await_close
+ async def test_active_await_close(self, async_engine):
+ select_one_sql = select(1).compile(async_engine.sync_engine).string
+
+ async with async_engine.connect() as conn:
+ result = await conn.exec_driver_sql(select_one_sql)
+ eq_(result.scalar_one(), 1)
+ driver_cursor = result.context.cursor._cursor
+
+ with expect_raises(Exception):
+ # because the cursor should be closed
+ await driver_cursor.execute(select_one_sql)
+
class AsyncCreatePoolTest(fixtures.TestBase):
@config.fixture
)
)
+ @property
+ def async_dialect_with_await_close(self):
+ """dialect's cursor has a close() method called with await"""
+
+ return only_on(["+aioodbc", "+aiomysql", "+asyncmy"])
+
def _has_oracle_test_dblink(self, key):
def check(config):
assert config.db.dialect.name == "oracle"