From: Mike Bayer Date: Sun, 18 May 2025 17:54:09 +0000 (-0400) Subject: backport mysql / base portions of #10415 to 2.0 X-Git-Tag: rel_2_0_42~58 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=eca7a97de36ceb5ef58652a486dc6125aa5fa21a;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git backport mysql / base portions of #10415 to 2.0 For a backport of the mysql typing change in [1] to be useful, we need to have most of the asyncio typing installed as well. To make this easier include that we will backport aiomysql / asyncmy over to the connectors/asyncio.py connector which is already in use. [1] https://gerrit.sqlalchemy.org/c/sqlalchemy/sqlalchemy/+/5829 Change-Id: I0414ed1f736a329ecdc9a662dbee71d621a463ae --- diff --git a/lib/sqlalchemy/connectors/aioodbc.py b/lib/sqlalchemy/connectors/aioodbc.py index 39b2a8a238..6e4b864e7d 100644 --- a/lib/sqlalchemy/connectors/aioodbc.py +++ b/lib/sqlalchemy/connectors/aioodbc.py @@ -20,6 +20,7 @@ from .. import util from ..util.concurrency import await_fallback from ..util.concurrency import await_only + if TYPE_CHECKING: from ..engine.interfaces import ConnectArgsType from ..engine.url import URL @@ -58,6 +59,15 @@ class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection): self._connection._conn.autocommit = value + def ping(self, reconnect): + return self.await_(self._connection.ping(reconnect)) + + def add_output_converter(self, *arg, **kw): + self._connection.add_output_converter(*arg, **kw) + + def character_set_name(self): + return self._connection.character_set_name() + def cursor(self, server_side=False): # aioodbc sets connection=None when closed and just fails with # AttributeError here. Here we use the same ProgrammingError + diff --git a/lib/sqlalchemy/connectors/asyncio.py b/lib/sqlalchemy/connectors/asyncio.py index c4f0d71541..c036d3fc7e 100644 --- a/lib/sqlalchemy/connectors/asyncio.py +++ b/lib/sqlalchemy/connectors/asyncio.py @@ -4,18 +4,102 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors """generic asyncio-adapted versions of DBAPI connection and cursor""" from __future__ import annotations +import asyncio import collections +import sys +from typing import Any +from typing import AsyncIterator +from typing import Deque +from typing import Iterator +from typing import NoReturn +from typing import Optional +from typing import Sequence from ..engine import AdaptedConnection -from ..util.concurrency import asyncio +from ..engine.interfaces import _DBAPICursorDescription +from ..engine.interfaces import _DBAPIMultiExecuteParams +from ..engine.interfaces import _DBAPISingleExecuteParams from ..util.concurrency import await_fallback from ..util.concurrency import await_only +from ..util.typing import Protocol +from ..util.typing import Self + + +class AsyncIODBAPIConnection(Protocol): + """protocol representing an async adapted version of a + :pep:`249` database connection. + + + """ + + async def close(self) -> None: ... + + async def commit(self) -> None: ... + + def cursor(self, *args: Any, **kwargs: Any) -> AsyncIODBAPICursor: ... + + async def rollback(self) -> None: ... + + +class AsyncIODBAPICursor(Protocol): + """protocol representing an async adapted version + of a :pep:`249` database cursor. + + + """ + + def __aenter__(self) -> Any: ... + + @property + def description( + self, + ) -> _DBAPICursorDescription: + """The description attribute of the Cursor.""" + ... + + @property + def rowcount(self) -> int: ... + + arraysize: int + + lastrowid: int + + async def close(self) -> None: ... + + async def execute( + self, + operation: Any, + parameters: Optional[_DBAPISingleExecuteParams] = None, + ) -> Any: ... + + async def executemany( + self, + operation: Any, + parameters: _DBAPIMultiExecuteParams, + ) -> Any: ... + + async def fetchone(self) -> Optional[Any]: ... + + async def fetchmany(self, size: Optional[int] = ...) -> Sequence[Any]: ... + + async def fetchall(self) -> Sequence[Any]: ... + + async def setinputsizes(self, sizes: Sequence[Any]) -> None: ... + + def setoutputsize(self, size: Any, column: Any) -> None: ... + + async def callproc( + self, procname: str, parameters: Sequence[Any] = ... + ) -> Any: ... + + async def nextset(self) -> Optional[bool]: ... + + def __aiter__(self) -> AsyncIterator[Any]: ... class AsyncAdapt_dbapi_cursor: @@ -28,96 +112,136 @@ class AsyncAdapt_dbapi_cursor: "_rows", ) - def __init__(self, adapt_connection): + _cursor: AsyncIODBAPICursor + _adapt_connection: AsyncAdapt_dbapi_connection + _connection: AsyncIODBAPIConnection + _rows: Deque[Any] + + def __init__(self, adapt_connection: AsyncAdapt_dbapi_connection): self._adapt_connection = adapt_connection self._connection = adapt_connection._connection + self.await_ = adapt_connection.await_ - cursor = self._connection.cursor() + cursor = self._make_new_cursor(self._connection) self._cursor = self._aenter_cursor(cursor) if not self.server_side: self._rows = collections.deque() - def _aenter_cursor(self, cursor): - return self.await_(cursor.__aenter__()) + def _aenter_cursor(self, cursor: AsyncIODBAPICursor) -> AsyncIODBAPICursor: + return self.await_(cursor.__aenter__()) # type: ignore[no-any-return] + + def _make_new_cursor( + self, connection: AsyncIODBAPIConnection + ) -> AsyncIODBAPICursor: + return connection.cursor() @property - def description(self): + def description(self) -> Optional[_DBAPICursorDescription]: return self._cursor.description @property - def rowcount(self): + def rowcount(self) -> int: return self._cursor.rowcount @property - def arraysize(self): + def arraysize(self) -> int: return self._cursor.arraysize @arraysize.setter - def arraysize(self, value): + def arraysize(self, value: int) -> None: self._cursor.arraysize = value @property - def lastrowid(self): + def lastrowid(self) -> int: return self._cursor.lastrowid - def close(self): + def close(self) -> None: # note we aren't actually closing the cursor here, # we are just letting GC do it. see notes in aiomysql dialect self._rows.clear() - def execute(self, operation, parameters=None): - return self.await_(self._execute_async(operation, parameters)) - - def executemany(self, operation, seq_of_parameters): - return self.await_( - self._executemany_async(operation, seq_of_parameters) - ) + def execute( + self, + operation: Any, + parameters: Optional[_DBAPISingleExecuteParams] = None, + ) -> Any: + try: + return self.await_(self._execute_async(operation, parameters)) + except Exception as error: + self._adapt_connection._handle_exception(error) + + def executemany( + self, + operation: Any, + seq_of_parameters: _DBAPIMultiExecuteParams, + ) -> Any: + try: + return self.await_( + self._executemany_async(operation, seq_of_parameters) + ) + except Exception as error: + self._adapt_connection._handle_exception(error) - async def _execute_async(self, operation, parameters): + async def _execute_async( + self, operation: Any, parameters: Optional[_DBAPISingleExecuteParams] + ) -> Any: async with self._adapt_connection._execute_mutex: - result = await self._cursor.execute(operation, parameters or ()) + if parameters is None: + result = await self._cursor.execute(operation) + else: + result = await self._cursor.execute(operation, parameters) if self._cursor.description and not self.server_side: self._rows = collections.deque(await self._cursor.fetchall()) return result - async def _executemany_async(self, operation, seq_of_parameters): + async def _executemany_async( + self, + operation: Any, + seq_of_parameters: _DBAPIMultiExecuteParams, + ) -> Any: async with self._adapt_connection._execute_mutex: return await self._cursor.executemany(operation, seq_of_parameters) - def nextset(self): + def nextset(self) -> None: self.await_(self._cursor.nextset()) if self._cursor.description and not self.server_side: self._rows = collections.deque( self.await_(self._cursor.fetchall()) ) - def setinputsizes(self, *inputsizes): + def setinputsizes(self, *inputsizes: Any) -> None: # NOTE: this is overrridden in aioodbc due to # see https://github.com/aio-libs/aioodbc/issues/451 # right now return self.await_(self._cursor.setinputsizes(*inputsizes)) - def __iter__(self): + def __enter__(self) -> Self: + return self + + def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: + self.close() + + def __iter__(self) -> Iterator[Any]: while self._rows: yield self._rows.popleft() - def fetchone(self): + def fetchone(self) -> Optional[Any]: if self._rows: return self._rows.popleft() else: return None - def fetchmany(self, size=None): + def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]: if size is None: size = self.arraysize rr = self._rows return [rr.popleft() for _ in range(min(size, len(rr)))] - def fetchall(self): + def fetchall(self) -> Sequence[Any]: retval = list(self._rows) self._rows.clear() return retval @@ -127,30 +251,21 @@ class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor): __slots__ = () server_side = True - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor() - - self._cursor = self.await_(cursor.__aenter__()) - - def close(self): + def close(self) -> None: if self._cursor is not None: self.await_(self._cursor.close()) - self._cursor = None + self._cursor = None # type: ignore - def fetchone(self): + def fetchone(self) -> Optional[Any]: return self.await_(self._cursor.fetchone()) - def fetchmany(self, size=None): + def fetchmany(self, size: Optional[int] = None) -> Any: return self.await_(self._cursor.fetchmany(size=size)) - def fetchall(self): + def fetchall(self) -> Sequence[Any]: return self.await_(self._cursor.fetchall()) - def __iter__(self): + def __iter__(self) -> Iterator[Any]: iterator = self._cursor.__aiter__() while True: try: @@ -164,46 +279,50 @@ class AsyncAdapt_dbapi_connection(AdaptedConnection): _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor await_ = staticmethod(await_only) + __slots__ = ("dbapi", "_execute_mutex") - def __init__(self, dbapi, connection): + _connection: AsyncIODBAPIConnection + + def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection): self.dbapi = dbapi self._connection = connection self._execute_mutex = asyncio.Lock() - def ping(self, reconnect): - return self.await_(self._connection.ping(reconnect)) - - def add_output_converter(self, *arg, **kw): - self._connection.add_output_converter(*arg, **kw) - - def character_set_name(self): - return self._connection.character_set_name() - - @property - def autocommit(self): - return self._connection.autocommit - - @autocommit.setter - def autocommit(self, value): - # https://github.com/aio-libs/aioodbc/issues/448 - # self._connection.autocommit = value - - self._connection._conn.autocommit = value - - def cursor(self, server_side=False): + def cursor(self, server_side: bool = False) -> AsyncAdapt_dbapi_cursor: if server_side: return self._ss_cursor_cls(self) else: return self._cursor_cls(self) - def rollback(self): - self.await_(self._connection.rollback()) - - def commit(self): - self.await_(self._connection.commit()) - - def close(self): + def execute( + self, + operation: Any, + parameters: Optional[_DBAPISingleExecuteParams] = None, + ) -> Any: + """lots of DBAPIs seem to provide this, so include it""" + cursor = self.cursor() + cursor.execute(operation, parameters) + return cursor + + def _handle_exception(self, error: Exception) -> NoReturn: + exc_info = sys.exc_info() + + raise error.with_traceback(exc_info[2]) + + def rollback(self) -> None: + try: + self.await_(self._connection.rollback()) + except Exception as error: + self._handle_exception(error) + + def commit(self) -> None: + try: + self.await_(self._connection.commit()) + except Exception as error: + self._handle_exception(error) + + def close(self) -> None: self.await_(self._connection.close()) diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py index bd5e7de6b4..ea11f3bc87 100644 --- a/lib/sqlalchemy/dialects/mysql/aiomysql.py +++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py @@ -29,162 +29,42 @@ This dialect should normally be used only with the ) """ # noqa -from collections import deque - from .pymysql import MySQLDialect_pymysql from ... import pool from ... import util -from ...engine import AdaptedConnection -from ...util.concurrency import asyncio +from ...connectors.asyncio import AsyncAdapt_dbapi_connection +from ...connectors.asyncio import AsyncAdapt_dbapi_cursor +from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor from ...util.concurrency import await_fallback from ...util.concurrency import await_only -class AsyncAdapt_aiomysql_cursor: - # TODO: base on connectors/asyncio.py - # see #10415 - server_side = False - __slots__ = ( - "_adapt_connection", - "_connection", - "await_", - "_cursor", - "_rows", - ) - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor(adapt_connection.dbapi.Cursor) - - # see https://github.com/aio-libs/aiomysql/issues/543 - self._cursor = self.await_(cursor.__aenter__()) - self._rows = deque() - - @property - def description(self): - return self._cursor.description - - @property - def rowcount(self): - return self._cursor.rowcount - - @property - def arraysize(self): - return self._cursor.arraysize - - @arraysize.setter - def arraysize(self, value): - self._cursor.arraysize = value - - @property - def lastrowid(self): - return self._cursor.lastrowid - - def close(self): - # note we aren't actually closing the cursor here, - # we are just letting GC do it. to allow this to be async - # we would need the Result to change how it does "Safe close cursor". - # MySQL "cursors" don't actually have state to be "closed" besides - # exhausting rows, which we already have done for sync cursor. - # another option would be to emulate aiosqlite dialect and assign - # cursor only if we are doing server side cursor operation. - self._rows.clear() - - def execute(self, operation, parameters=None): - return self.await_(self._execute_async(operation, parameters)) - - def executemany(self, operation, seq_of_parameters): - return self.await_( - self._executemany_async(operation, seq_of_parameters) - ) - - async def _execute_async(self, operation, parameters): - async with self._adapt_connection._execute_mutex: - result = await self._cursor.execute(operation, parameters) - - if not self.server_side: - # aiomysql has a "fake" async result, so we have to pull it out - # of that here since our default result is not async. - # we could just as easily grab "_rows" here and be done with it - # but this is safer. - self._rows = deque(await self._cursor.fetchall()) - return result - - async def _executemany_async(self, operation, seq_of_parameters): - async with self._adapt_connection._execute_mutex: - return await self._cursor.executemany(operation, seq_of_parameters) - - def setinputsizes(self, *inputsizes): - pass - - def __iter__(self): - while self._rows: - yield self._rows.popleft() - - def fetchone(self): - if self._rows: - return self._rows.popleft() - else: - return None - - def fetchmany(self, size=None): - if size is None: - size = self.arraysize - - rr = self._rows - return [rr.popleft() for _ in range(min(size, len(rr)))] - - def fetchall(self): - retval = list(self._rows) - self._rows.clear() - return retval - - -class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor): - # TODO: base on connectors/asyncio.py - # see #10415 +class AsyncAdapt_aiomysql_cursor(AsyncAdapt_dbapi_cursor): __slots__ = () - server_side = True - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - cursor = self._connection.cursor(adapt_connection.dbapi.SSCursor) + def _make_new_cursor(self, connection): + return connection.cursor(self._adapt_connection.dbapi.Cursor) - self._cursor = self.await_(cursor.__aenter__()) - def close(self): - if self._cursor is not None: - self.await_(self._cursor.close()) - self._cursor = None - - def fetchone(self): - return self.await_(self._cursor.fetchone()) - - def fetchmany(self, size=None): - return self.await_(self._cursor.fetchmany(size=size)) +class AsyncAdapt_aiomysql_ss_cursor( + AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_aiomysql_cursor +): + __slots__ = () - def fetchall(self): - return self.await_(self._cursor.fetchall()) + def _make_new_cursor(self, connection): + return connection.cursor( + self._adapt_connection.dbapi.aiomysql.cursors.SSCursor + ) -class AsyncAdapt_aiomysql_connection(AdaptedConnection): - # TODO: base on connectors/asyncio.py - # see #10415 - await_ = staticmethod(await_only) - __slots__ = ("dbapi", "_execute_mutex") +class AsyncAdapt_aiomysql_connection(AsyncAdapt_dbapi_connection): + __slots__ = () - def __init__(self, dbapi, connection): - self.dbapi = dbapi - self._connection = connection - self._execute_mutex = asyncio.Lock() + _cursor_cls = AsyncAdapt_aiomysql_cursor + _ss_cursor_cls = AsyncAdapt_aiomysql_ss_cursor def ping(self, reconnect): + assert not reconnect return self.await_(self._connection.ping(reconnect)) def character_set_name(self): @@ -193,18 +73,6 @@ class AsyncAdapt_aiomysql_connection(AdaptedConnection): def autocommit(self, value): self.await_(self._connection.autocommit(value)) - def cursor(self, server_side=False): - if server_side: - return AsyncAdapt_aiomysql_ss_cursor(self) - else: - return AsyncAdapt_aiomysql_cursor(self) - - def rollback(self): - self.await_(self._connection.rollback()) - - def commit(self): - self.await_(self._connection.commit()) - def terminate(self): # it's not awaitable. self._connection.close() @@ -214,8 +82,6 @@ class AsyncAdapt_aiomysql_connection(AdaptedConnection): class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection): - # TODO: base on connectors/asyncio.py - # see #10415 __slots__ = () await_ = staticmethod(await_fallback) diff --git a/lib/sqlalchemy/dialects/mysql/asyncmy.py b/lib/sqlalchemy/dialects/mysql/asyncmy.py index 9ec54e694d..179d6c2035 100644 --- a/lib/sqlalchemy/dialects/mysql/asyncmy.py +++ b/lib/sqlalchemy/dialects/mysql/asyncmy.py @@ -27,183 +27,57 @@ This dialect should normally be used only with the ) """ # noqa -from collections import deque -from contextlib import asynccontextmanager +from __future__ import annotations from .pymysql import MySQLDialect_pymysql from ... import pool from ... import util -from ...engine import AdaptedConnection -from ...util.concurrency import asyncio +from ...connectors.asyncio import AsyncAdapt_dbapi_connection +from ...connectors.asyncio import AsyncAdapt_dbapi_cursor +from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor from ...util.concurrency import await_fallback from ...util.concurrency import await_only -class AsyncAdapt_asyncmy_cursor: - # TODO: base on connectors/asyncio.py - # see #10415 - server_side = False - __slots__ = ( - "_adapt_connection", - "_connection", - "await_", - "_cursor", - "_rows", - ) - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor() - - self._cursor = self.await_(cursor.__aenter__()) - self._rows = deque() - - @property - def description(self): - return self._cursor.description - - @property - def rowcount(self): - return self._cursor.rowcount - - @property - def arraysize(self): - return self._cursor.arraysize - - @arraysize.setter - def arraysize(self, value): - self._cursor.arraysize = value - - @property - def lastrowid(self): - return self._cursor.lastrowid - - def close(self): - # note we aren't actually closing the cursor here, - # we are just letting GC do it. to allow this to be async - # we would need the Result to change how it does "Safe close cursor". - # MySQL "cursors" don't actually have state to be "closed" besides - # exhausting rows, which we already have done for sync cursor. - # another option would be to emulate aiosqlite dialect and assign - # cursor only if we are doing server side cursor operation. - self._rows.clear() - - def execute(self, operation, parameters=None): - return self.await_(self._execute_async(operation, parameters)) - - def executemany(self, operation, seq_of_parameters): - return self.await_( - self._executemany_async(operation, seq_of_parameters) - ) - - async def _execute_async(self, operation, parameters): - async with self._adapt_connection._mutex_and_adapt_errors(): - if parameters is None: - result = await self._cursor.execute(operation) - else: - result = await self._cursor.execute(operation, parameters) - - if not self.server_side: - # asyncmy has a "fake" async result, so we have to pull it out - # of that here since our default result is not async. - # we could just as easily grab "_rows" here and be done with it - # but this is safer. - self._rows = deque(await self._cursor.fetchall()) - return result - - async def _executemany_async(self, operation, seq_of_parameters): - async with self._adapt_connection._mutex_and_adapt_errors(): - return await self._cursor.executemany(operation, seq_of_parameters) - - def setinputsizes(self, *inputsizes): - pass - - def __iter__(self): - while self._rows: - yield self._rows.popleft() - - def fetchone(self): - if self._rows: - return self._rows.popleft() - else: - return None - - def fetchmany(self, size=None): - if size is None: - size = self.arraysize - - rr = self._rows - return [rr.popleft() for _ in range(min(size, len(rr)))] - - def fetchall(self): - retval = list(self._rows) - self._rows.clear() - return retval +class AsyncAdapt_asyncmy_cursor(AsyncAdapt_dbapi_cursor): + __slots__ = () -class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor): - # TODO: base on connectors/asyncio.py - # see #10415 +class AsyncAdapt_asyncmy_ss_cursor( + AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_asyncmy_cursor +): __slots__ = () - server_side = True - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor( - adapt_connection.dbapi.asyncmy.cursors.SSCursor + def _make_new_cursor(self, connection): + return connection.cursor( + self._adapt_connection.dbapi.asyncmy.cursors.SSCursor ) - self._cursor = self.await_(cursor.__aenter__()) - - def close(self): - if self._cursor is not None: - self.await_(self._cursor.close()) - self._cursor = None - - def fetchone(self): - return self.await_(self._cursor.fetchone()) - - def fetchmany(self, size=None): - return self.await_(self._cursor.fetchmany(size=size)) - - def fetchall(self): - return self.await_(self._cursor.fetchall()) +class AsyncAdapt_asyncmy_connection(AsyncAdapt_dbapi_connection): + __slots__ = () -class AsyncAdapt_asyncmy_connection(AdaptedConnection): - # TODO: base on connectors/asyncio.py - # see #10415 - await_ = staticmethod(await_only) - __slots__ = ("dbapi", "_execute_mutex") + _cursor_cls = AsyncAdapt_asyncmy_cursor + _ss_cursor_cls = AsyncAdapt_asyncmy_ss_cursor - def __init__(self, dbapi, connection): - self.dbapi = dbapi - self._connection = connection - self._execute_mutex = asyncio.Lock() + def _handle_exception(self, error): + if isinstance(error, AttributeError): + raise self.dbapi.InternalError( + "network operation failed due to asyncmy attribute error" + ) - @asynccontextmanager - async def _mutex_and_adapt_errors(self): - async with self._execute_mutex: - try: - yield - except AttributeError: - raise self.dbapi.InternalError( - "network operation failed due to asyncmy attribute error" - ) + raise error def ping(self, reconnect): assert not reconnect return self.await_(self._do_ping()) async def _do_ping(self): - async with self._mutex_and_adapt_errors(): - return await self._connection.ping(False) + try: + async with self._execute_mutex: + return await self._connection.ping(False) + except Exception as error: + self._handle_exception(error) def character_set_name(self): return self._connection.character_set_name() @@ -211,18 +85,6 @@ class AsyncAdapt_asyncmy_connection(AdaptedConnection): def autocommit(self, value): self.await_(self._connection.autocommit(value)) - def cursor(self, server_side=False): - if server_side: - return AsyncAdapt_asyncmy_ss_cursor(self) - else: - return AsyncAdapt_asyncmy_cursor(self) - - def rollback(self): - self.await_(self._connection.rollback()) - - def commit(self): - self.await_(self._connection.commit()) - def terminate(self): # it's not awaitable. self._connection.close()