]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
backport mysql / base portions of #10415 to 2.0
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 May 2025 17:54:09 +0000 (13:54 -0400)
committerMichael Bayer <mike_mp@zzzcomputing.com>
Sun, 18 May 2025 19:37:54 +0000 (19:37 +0000)
For a backport of the mysql typing change in [1] to be useful, we need
to have most of the asyncio typing installed as well.  To make this
easier include that we will backport aiomysql / asyncmy over to the
connectors/asyncio.py connector which is already in use.

[1] https://gerrit.sqlalchemy.org/c/sqlalchemy/sqlalchemy/+/5829
Change-Id: I0414ed1f736a329ecdc9a662dbee71d621a463ae

lib/sqlalchemy/connectors/aioodbc.py
lib/sqlalchemy/connectors/asyncio.py
lib/sqlalchemy/dialects/mysql/aiomysql.py
lib/sqlalchemy/dialects/mysql/asyncmy.py

index 39b2a8a2382b6e197fef8133c14f9cd5461bf58e..6e4b864e7dcbf04caf998dcabfd6feec65382291 100644 (file)
@@ -20,6 +20,7 @@ from .. import util
 from ..util.concurrency import await_fallback
 from ..util.concurrency import await_only
 
+
 if TYPE_CHECKING:
     from ..engine.interfaces import ConnectArgsType
     from ..engine.url import URL
@@ -58,6 +59,15 @@ class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection):
 
         self._connection._conn.autocommit = value
 
+    def ping(self, reconnect):
+        return self.await_(self._connection.ping(reconnect))
+
+    def add_output_converter(self, *arg, **kw):
+        self._connection.add_output_converter(*arg, **kw)
+
+    def character_set_name(self):
+        return self._connection.character_set_name()
+
     def cursor(self, server_side=False):
         # aioodbc sets connection=None when closed and just fails with
         # AttributeError here.  Here we use the same ProgrammingError +
index c4f0d71541387f33ebabc268851c8bc79e706da0..c036d3fc7e6772303eab42d845bf265eacc615bc 100644 (file)
 #
 # This module is part of SQLAlchemy and is released under
 # the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
 
 """generic asyncio-adapted versions of DBAPI connection and cursor"""
 
 from __future__ import annotations
 
+import asyncio
 import collections
+import sys
+from typing import Any
+from typing import AsyncIterator
+from typing import Deque
+from typing import Iterator
+from typing import NoReturn
+from typing import Optional
+from typing import Sequence
 
 from ..engine import AdaptedConnection
-from ..util.concurrency import asyncio
+from ..engine.interfaces import _DBAPICursorDescription
+from ..engine.interfaces import _DBAPIMultiExecuteParams
+from ..engine.interfaces import _DBAPISingleExecuteParams
 from ..util.concurrency import await_fallback
 from ..util.concurrency import await_only
+from ..util.typing import Protocol
+from ..util.typing import Self
+
+
+class AsyncIODBAPIConnection(Protocol):
+    """protocol representing an async adapted version of a
+    :pep:`249` database connection.
+
+
+    """
+
+    async def close(self) -> None: ...
+
+    async def commit(self) -> None: ...
+
+    def cursor(self, *args: Any, **kwargs: Any) -> AsyncIODBAPICursor: ...
+
+    async def rollback(self) -> None: ...
+
+
+class AsyncIODBAPICursor(Protocol):
+    """protocol representing an async adapted version
+    of a :pep:`249` database cursor.
+
+
+    """
+
+    def __aenter__(self) -> Any: ...
+
+    @property
+    def description(
+        self,
+    ) -> _DBAPICursorDescription:
+        """The description attribute of the Cursor."""
+        ...
+
+    @property
+    def rowcount(self) -> int: ...
+
+    arraysize: int
+
+    lastrowid: int
+
+    async def close(self) -> None: ...
+
+    async def execute(
+        self,
+        operation: Any,
+        parameters: Optional[_DBAPISingleExecuteParams] = None,
+    ) -> Any: ...
+
+    async def executemany(
+        self,
+        operation: Any,
+        parameters: _DBAPIMultiExecuteParams,
+    ) -> Any: ...
+
+    async def fetchone(self) -> Optional[Any]: ...
+
+    async def fetchmany(self, size: Optional[int] = ...) -> Sequence[Any]: ...
+
+    async def fetchall(self) -> Sequence[Any]: ...
+
+    async def setinputsizes(self, sizes: Sequence[Any]) -> None: ...
+
+    def setoutputsize(self, size: Any, column: Any) -> None: ...
+
+    async def callproc(
+        self, procname: str, parameters: Sequence[Any] = ...
+    ) -> Any: ...
+
+    async def nextset(self) -> Optional[bool]: ...
+
+    def __aiter__(self) -> AsyncIterator[Any]: ...
 
 
 class AsyncAdapt_dbapi_cursor:
@@ -28,96 +112,136 @@ class AsyncAdapt_dbapi_cursor:
         "_rows",
     )
 
-    def __init__(self, adapt_connection):
+    _cursor: AsyncIODBAPICursor
+    _adapt_connection: AsyncAdapt_dbapi_connection
+    _connection: AsyncIODBAPIConnection
+    _rows: Deque[Any]
+
+    def __init__(self, adapt_connection: AsyncAdapt_dbapi_connection):
         self._adapt_connection = adapt_connection
         self._connection = adapt_connection._connection
+
         self.await_ = adapt_connection.await_
 
-        cursor = self._connection.cursor()
+        cursor = self._make_new_cursor(self._connection)
         self._cursor = self._aenter_cursor(cursor)
 
         if not self.server_side:
             self._rows = collections.deque()
 
-    def _aenter_cursor(self, cursor):
-        return self.await_(cursor.__aenter__())
+    def _aenter_cursor(self, cursor: AsyncIODBAPICursor) -> AsyncIODBAPICursor:
+        return self.await_(cursor.__aenter__())  # type: ignore[no-any-return]
+
+    def _make_new_cursor(
+        self, connection: AsyncIODBAPIConnection
+    ) -> AsyncIODBAPICursor:
+        return connection.cursor()
 
     @property
-    def description(self):
+    def description(self) -> Optional[_DBAPICursorDescription]:
         return self._cursor.description
 
     @property
-    def rowcount(self):
+    def rowcount(self) -> int:
         return self._cursor.rowcount
 
     @property
-    def arraysize(self):
+    def arraysize(self) -> int:
         return self._cursor.arraysize
 
     @arraysize.setter
-    def arraysize(self, value):
+    def arraysize(self, value: int) -> None:
         self._cursor.arraysize = value
 
     @property
-    def lastrowid(self):
+    def lastrowid(self) -> int:
         return self._cursor.lastrowid
 
-    def close(self):
+    def close(self) -> None:
         # note we aren't actually closing the cursor here,
         # we are just letting GC do it.  see notes in aiomysql dialect
         self._rows.clear()
 
-    def execute(self, operation, parameters=None):
-        return self.await_(self._execute_async(operation, parameters))
-
-    def executemany(self, operation, seq_of_parameters):
-        return self.await_(
-            self._executemany_async(operation, seq_of_parameters)
-        )
+    def execute(
+        self,
+        operation: Any,
+        parameters: Optional[_DBAPISingleExecuteParams] = None,
+    ) -> Any:
+        try:
+            return self.await_(self._execute_async(operation, parameters))
+        except Exception as error:
+            self._adapt_connection._handle_exception(error)
+
+    def executemany(
+        self,
+        operation: Any,
+        seq_of_parameters: _DBAPIMultiExecuteParams,
+    ) -> Any:
+        try:
+            return self.await_(
+                self._executemany_async(operation, seq_of_parameters)
+            )
+        except Exception as error:
+            self._adapt_connection._handle_exception(error)
 
-    async def _execute_async(self, operation, parameters):
+    async def _execute_async(
+        self, operation: Any, parameters: Optional[_DBAPISingleExecuteParams]
+    ) -> Any:
         async with self._adapt_connection._execute_mutex:
-            result = await self._cursor.execute(operation, parameters or ())
+            if parameters is None:
+                result = await self._cursor.execute(operation)
+            else:
+                result = await self._cursor.execute(operation, parameters)
 
             if self._cursor.description and not self.server_side:
                 self._rows = collections.deque(await self._cursor.fetchall())
             return result
 
-    async def _executemany_async(self, operation, seq_of_parameters):
+    async def _executemany_async(
+        self,
+        operation: Any,
+        seq_of_parameters: _DBAPIMultiExecuteParams,
+    ) -> Any:
         async with self._adapt_connection._execute_mutex:
             return await self._cursor.executemany(operation, seq_of_parameters)
 
-    def nextset(self):
+    def nextset(self) -> None:
         self.await_(self._cursor.nextset())
         if self._cursor.description and not self.server_side:
             self._rows = collections.deque(
                 self.await_(self._cursor.fetchall())
             )
 
-    def setinputsizes(self, *inputsizes):
+    def setinputsizes(self, *inputsizes: Any) -> None:
         # NOTE: this is overrridden in aioodbc due to
         # see https://github.com/aio-libs/aioodbc/issues/451
         # right now
 
         return self.await_(self._cursor.setinputsizes(*inputsizes))
 
-    def __iter__(self):
+    def __enter__(self) -> Self:
+        return self
+
+    def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
+        self.close()
+
+    def __iter__(self) -> Iterator[Any]:
         while self._rows:
             yield self._rows.popleft()
 
-    def fetchone(self):
+    def fetchone(self) -> Optional[Any]:
         if self._rows:
             return self._rows.popleft()
         else:
             return None
 
-    def fetchmany(self, size=None):
+    def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
         if size is None:
             size = self.arraysize
         rr = self._rows
         return [rr.popleft() for _ in range(min(size, len(rr)))]
 
-    def fetchall(self):
+    def fetchall(self) -> Sequence[Any]:
         retval = list(self._rows)
         self._rows.clear()
         return retval
@@ -127,30 +251,21 @@ class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor):
     __slots__ = ()
     server_side = True
 
-    def __init__(self, adapt_connection):
-        self._adapt_connection = adapt_connection
-        self._connection = adapt_connection._connection
-        self.await_ = adapt_connection.await_
-
-        cursor = self._connection.cursor()
-
-        self._cursor = self.await_(cursor.__aenter__())
-
-    def close(self):
+    def close(self) -> None:
         if self._cursor is not None:
             self.await_(self._cursor.close())
-            self._cursor = None
+            self._cursor = None  # type: ignore
 
-    def fetchone(self):
+    def fetchone(self) -> Optional[Any]:
         return self.await_(self._cursor.fetchone())
 
-    def fetchmany(self, size=None):
+    def fetchmany(self, size: Optional[int] = None) -> Any:
         return self.await_(self._cursor.fetchmany(size=size))
 
-    def fetchall(self):
+    def fetchall(self) -> Sequence[Any]:
         return self.await_(self._cursor.fetchall())
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[Any]:
         iterator = self._cursor.__aiter__()
         while True:
             try:
@@ -164,46 +279,50 @@ class AsyncAdapt_dbapi_connection(AdaptedConnection):
     _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor
 
     await_ = staticmethod(await_only)
+
     __slots__ = ("dbapi", "_execute_mutex")
 
-    def __init__(self, dbapi, connection):
+    _connection: AsyncIODBAPIConnection
+
+    def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection):
         self.dbapi = dbapi
         self._connection = connection
         self._execute_mutex = asyncio.Lock()
 
-    def ping(self, reconnect):
-        return self.await_(self._connection.ping(reconnect))
-
-    def add_output_converter(self, *arg, **kw):
-        self._connection.add_output_converter(*arg, **kw)
-
-    def character_set_name(self):
-        return self._connection.character_set_name()
-
-    @property
-    def autocommit(self):
-        return self._connection.autocommit
-
-    @autocommit.setter
-    def autocommit(self, value):
-        # https://github.com/aio-libs/aioodbc/issues/448
-        # self._connection.autocommit = value
-
-        self._connection._conn.autocommit = value
-
-    def cursor(self, server_side=False):
+    def cursor(self, server_side: bool = False) -> AsyncAdapt_dbapi_cursor:
         if server_side:
             return self._ss_cursor_cls(self)
         else:
             return self._cursor_cls(self)
 
-    def rollback(self):
-        self.await_(self._connection.rollback())
-
-    def commit(self):
-        self.await_(self._connection.commit())
-
-    def close(self):
+    def execute(
+        self,
+        operation: Any,
+        parameters: Optional[_DBAPISingleExecuteParams] = None,
+    ) -> Any:
+        """lots of DBAPIs seem to provide this, so include it"""
+        cursor = self.cursor()
+        cursor.execute(operation, parameters)
+        return cursor
+
+    def _handle_exception(self, error: Exception) -> NoReturn:
+        exc_info = sys.exc_info()
+
+        raise error.with_traceback(exc_info[2])
+
+    def rollback(self) -> None:
+        try:
+            self.await_(self._connection.rollback())
+        except Exception as error:
+            self._handle_exception(error)
+
+    def commit(self) -> None:
+        try:
+            self.await_(self._connection.commit())
+        except Exception as error:
+            self._handle_exception(error)
+
+    def close(self) -> None:
         self.await_(self._connection.close())
 
 
index bd5e7de6b4fcb2d2d0ce9f8e0d77176f5c151cc8..ea11f3bc87d1c02634d6a6a51e790a5ef07b81b1 100644 (file)
@@ -29,162 +29,42 @@ This dialect should normally be used only with the
     )
 
 """  # noqa
-from collections import deque
-
 from .pymysql import MySQLDialect_pymysql
 from ... import pool
 from ... import util
-from ...engine import AdaptedConnection
-from ...util.concurrency import asyncio
+from ...connectors.asyncio import AsyncAdapt_dbapi_connection
+from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
+from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
 from ...util.concurrency import await_fallback
 from ...util.concurrency import await_only
 
 
-class AsyncAdapt_aiomysql_cursor:
-    # TODO: base on connectors/asyncio.py
-    # see #10415
-    server_side = False
-    __slots__ = (
-        "_adapt_connection",
-        "_connection",
-        "await_",
-        "_cursor",
-        "_rows",
-    )
-
-    def __init__(self, adapt_connection):
-        self._adapt_connection = adapt_connection
-        self._connection = adapt_connection._connection
-        self.await_ = adapt_connection.await_
-
-        cursor = self._connection.cursor(adapt_connection.dbapi.Cursor)
-
-        # see https://github.com/aio-libs/aiomysql/issues/543
-        self._cursor = self.await_(cursor.__aenter__())
-        self._rows = deque()
-
-    @property
-    def description(self):
-        return self._cursor.description
-
-    @property
-    def rowcount(self):
-        return self._cursor.rowcount
-
-    @property
-    def arraysize(self):
-        return self._cursor.arraysize
-
-    @arraysize.setter
-    def arraysize(self, value):
-        self._cursor.arraysize = value
-
-    @property
-    def lastrowid(self):
-        return self._cursor.lastrowid
-
-    def close(self):
-        # note we aren't actually closing the cursor here,
-        # we are just letting GC do it.   to allow this to be async
-        # we would need the Result to change how it does "Safe close cursor".
-        # MySQL "cursors" don't actually have state to be "closed" besides
-        # exhausting rows, which we already have done for sync cursor.
-        # another option would be to emulate aiosqlite dialect and assign
-        # cursor only if we are doing server side cursor operation.
-        self._rows.clear()
-
-    def execute(self, operation, parameters=None):
-        return self.await_(self._execute_async(operation, parameters))
-
-    def executemany(self, operation, seq_of_parameters):
-        return self.await_(
-            self._executemany_async(operation, seq_of_parameters)
-        )
-
-    async def _execute_async(self, operation, parameters):
-        async with self._adapt_connection._execute_mutex:
-            result = await self._cursor.execute(operation, parameters)
-
-            if not self.server_side:
-                # aiomysql has a "fake" async result, so we have to pull it out
-                # of that here since our default result is not async.
-                # we could just as easily grab "_rows" here and be done with it
-                # but this is safer.
-                self._rows = deque(await self._cursor.fetchall())
-            return result
-
-    async def _executemany_async(self, operation, seq_of_parameters):
-        async with self._adapt_connection._execute_mutex:
-            return await self._cursor.executemany(operation, seq_of_parameters)
-
-    def setinputsizes(self, *inputsizes):
-        pass
-
-    def __iter__(self):
-        while self._rows:
-            yield self._rows.popleft()
-
-    def fetchone(self):
-        if self._rows:
-            return self._rows.popleft()
-        else:
-            return None
-
-    def fetchmany(self, size=None):
-        if size is None:
-            size = self.arraysize
-
-        rr = self._rows
-        return [rr.popleft() for _ in range(min(size, len(rr)))]
-
-    def fetchall(self):
-        retval = list(self._rows)
-        self._rows.clear()
-        return retval
-
-
-class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor):
-    # TODO: base on connectors/asyncio.py
-    # see #10415
+class AsyncAdapt_aiomysql_cursor(AsyncAdapt_dbapi_cursor):
     __slots__ = ()
-    server_side = True
-
-    def __init__(self, adapt_connection):
-        self._adapt_connection = adapt_connection
-        self._connection = adapt_connection._connection
-        self.await_ = adapt_connection.await_
 
-        cursor = self._connection.cursor(adapt_connection.dbapi.SSCursor)
+    def _make_new_cursor(self, connection):
+        return connection.cursor(self._adapt_connection.dbapi.Cursor)
 
-        self._cursor = self.await_(cursor.__aenter__())
 
-    def close(self):
-        if self._cursor is not None:
-            self.await_(self._cursor.close())
-            self._cursor = None
-
-    def fetchone(self):
-        return self.await_(self._cursor.fetchone())
-
-    def fetchmany(self, size=None):
-        return self.await_(self._cursor.fetchmany(size=size))
+class AsyncAdapt_aiomysql_ss_cursor(
+    AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_aiomysql_cursor
+):
+    __slots__ = ()
 
-    def fetchall(self):
-        return self.await_(self._cursor.fetchall())
+    def _make_new_cursor(self, connection):
+        return connection.cursor(
+            self._adapt_connection.dbapi.aiomysql.cursors.SSCursor
+        )
 
 
-class AsyncAdapt_aiomysql_connection(AdaptedConnection):
-    # TODO: base on connectors/asyncio.py
-    # see #10415
-    await_ = staticmethod(await_only)
-    __slots__ = ("dbapi", "_execute_mutex")
+class AsyncAdapt_aiomysql_connection(AsyncAdapt_dbapi_connection):
+    __slots__ = ()
 
-    def __init__(self, dbapi, connection):
-        self.dbapi = dbapi
-        self._connection = connection
-        self._execute_mutex = asyncio.Lock()
+    _cursor_cls = AsyncAdapt_aiomysql_cursor
+    _ss_cursor_cls = AsyncAdapt_aiomysql_ss_cursor
 
     def ping(self, reconnect):
+        assert not reconnect
         return self.await_(self._connection.ping(reconnect))
 
     def character_set_name(self):
@@ -193,18 +73,6 @@ class AsyncAdapt_aiomysql_connection(AdaptedConnection):
     def autocommit(self, value):
         self.await_(self._connection.autocommit(value))
 
-    def cursor(self, server_side=False):
-        if server_side:
-            return AsyncAdapt_aiomysql_ss_cursor(self)
-        else:
-            return AsyncAdapt_aiomysql_cursor(self)
-
-    def rollback(self):
-        self.await_(self._connection.rollback())
-
-    def commit(self):
-        self.await_(self._connection.commit())
-
     def terminate(self):
         # it's not awaitable.
         self._connection.close()
@@ -214,8 +82,6 @@ class AsyncAdapt_aiomysql_connection(AdaptedConnection):
 
 
 class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
-    # TODO: base on connectors/asyncio.py
-    # see #10415
     __slots__ = ()
 
     await_ = staticmethod(await_fallback)
index 9ec54e694da067d91825c7f4ad35967060e1c1fc..179d6c2035b8e5dd0b0ac0399d1fd1ef8a375ea5 100644 (file)
@@ -27,183 +27,57 @@ This dialect should normally be used only with the
     )
 
 """  # noqa
-from collections import deque
-from contextlib import asynccontextmanager
+from __future__ import annotations
 
 from .pymysql import MySQLDialect_pymysql
 from ... import pool
 from ... import util
-from ...engine import AdaptedConnection
-from ...util.concurrency import asyncio
+from ...connectors.asyncio import AsyncAdapt_dbapi_connection
+from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
+from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
 from ...util.concurrency import await_fallback
 from ...util.concurrency import await_only
 
 
-class AsyncAdapt_asyncmy_cursor:
-    # TODO: base on connectors/asyncio.py
-    # see #10415
-    server_side = False
-    __slots__ = (
-        "_adapt_connection",
-        "_connection",
-        "await_",
-        "_cursor",
-        "_rows",
-    )
-
-    def __init__(self, adapt_connection):
-        self._adapt_connection = adapt_connection
-        self._connection = adapt_connection._connection
-        self.await_ = adapt_connection.await_
-
-        cursor = self._connection.cursor()
-
-        self._cursor = self.await_(cursor.__aenter__())
-        self._rows = deque()
-
-    @property
-    def description(self):
-        return self._cursor.description
-
-    @property
-    def rowcount(self):
-        return self._cursor.rowcount
-
-    @property
-    def arraysize(self):
-        return self._cursor.arraysize
-
-    @arraysize.setter
-    def arraysize(self, value):
-        self._cursor.arraysize = value
-
-    @property
-    def lastrowid(self):
-        return self._cursor.lastrowid
-
-    def close(self):
-        # note we aren't actually closing the cursor here,
-        # we are just letting GC do it.   to allow this to be async
-        # we would need the Result to change how it does "Safe close cursor".
-        # MySQL "cursors" don't actually have state to be "closed" besides
-        # exhausting rows, which we already have done for sync cursor.
-        # another option would be to emulate aiosqlite dialect and assign
-        # cursor only if we are doing server side cursor operation.
-        self._rows.clear()
-
-    def execute(self, operation, parameters=None):
-        return self.await_(self._execute_async(operation, parameters))
-
-    def executemany(self, operation, seq_of_parameters):
-        return self.await_(
-            self._executemany_async(operation, seq_of_parameters)
-        )
-
-    async def _execute_async(self, operation, parameters):
-        async with self._adapt_connection._mutex_and_adapt_errors():
-            if parameters is None:
-                result = await self._cursor.execute(operation)
-            else:
-                result = await self._cursor.execute(operation, parameters)
-
-            if not self.server_side:
-                # asyncmy has a "fake" async result, so we have to pull it out
-                # of that here since our default result is not async.
-                # we could just as easily grab "_rows" here and be done with it
-                # but this is safer.
-                self._rows = deque(await self._cursor.fetchall())
-            return result
-
-    async def _executemany_async(self, operation, seq_of_parameters):
-        async with self._adapt_connection._mutex_and_adapt_errors():
-            return await self._cursor.executemany(operation, seq_of_parameters)
-
-    def setinputsizes(self, *inputsizes):
-        pass
-
-    def __iter__(self):
-        while self._rows:
-            yield self._rows.popleft()
-
-    def fetchone(self):
-        if self._rows:
-            return self._rows.popleft()
-        else:
-            return None
-
-    def fetchmany(self, size=None):
-        if size is None:
-            size = self.arraysize
-
-        rr = self._rows
-        return [rr.popleft() for _ in range(min(size, len(rr)))]
-
-    def fetchall(self):
-        retval = list(self._rows)
-        self._rows.clear()
-        return retval
+class AsyncAdapt_asyncmy_cursor(AsyncAdapt_dbapi_cursor):
+    __slots__ = ()
 
 
-class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
-    # TODO: base on connectors/asyncio.py
-    # see #10415
+class AsyncAdapt_asyncmy_ss_cursor(
+    AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_asyncmy_cursor
+):
     __slots__ = ()
-    server_side = True
 
-    def __init__(self, adapt_connection):
-        self._adapt_connection = adapt_connection
-        self._connection = adapt_connection._connection
-        self.await_ = adapt_connection.await_
-
-        cursor = self._connection.cursor(
-            adapt_connection.dbapi.asyncmy.cursors.SSCursor
+    def _make_new_cursor(self, connection):
+        return connection.cursor(
+            self._adapt_connection.dbapi.asyncmy.cursors.SSCursor
         )
 
-        self._cursor = self.await_(cursor.__aenter__())
-
-    def close(self):
-        if self._cursor is not None:
-            self.await_(self._cursor.close())
-            self._cursor = None
-
-    def fetchone(self):
-        return self.await_(self._cursor.fetchone())
-
-    def fetchmany(self, size=None):
-        return self.await_(self._cursor.fetchmany(size=size))
-
-    def fetchall(self):
-        return self.await_(self._cursor.fetchall())
 
+class AsyncAdapt_asyncmy_connection(AsyncAdapt_dbapi_connection):
+    __slots__ = ()
 
-class AsyncAdapt_asyncmy_connection(AdaptedConnection):
-    # TODO: base on connectors/asyncio.py
-    # see #10415
-    await_ = staticmethod(await_only)
-    __slots__ = ("dbapi", "_execute_mutex")
+    _cursor_cls = AsyncAdapt_asyncmy_cursor
+    _ss_cursor_cls = AsyncAdapt_asyncmy_ss_cursor
 
-    def __init__(self, dbapi, connection):
-        self.dbapi = dbapi
-        self._connection = connection
-        self._execute_mutex = asyncio.Lock()
+    def _handle_exception(self, error):
+        if isinstance(error, AttributeError):
+            raise self.dbapi.InternalError(
+                "network operation failed due to asyncmy attribute error"
+            )
 
-    @asynccontextmanager
-    async def _mutex_and_adapt_errors(self):
-        async with self._execute_mutex:
-            try:
-                yield
-            except AttributeError:
-                raise self.dbapi.InternalError(
-                    "network operation failed due to asyncmy attribute error"
-                )
+        raise error
 
     def ping(self, reconnect):
         assert not reconnect
         return self.await_(self._do_ping())
 
     async def _do_ping(self):
-        async with self._mutex_and_adapt_errors():
-            return await self._connection.ping(False)
+        try:
+            async with self._execute_mutex:
+                return await self._connection.ping(False)
+        except Exception as error:
+            self._handle_exception(error)
 
     def character_set_name(self):
         return self._connection.character_set_name()
@@ -211,18 +85,6 @@ class AsyncAdapt_asyncmy_connection(AdaptedConnection):
     def autocommit(self, value):
         self.await_(self._connection.autocommit(value))
 
-    def cursor(self, server_side=False):
-        if server_side:
-            return AsyncAdapt_asyncmy_ss_cursor(self)
-        else:
-            return AsyncAdapt_asyncmy_cursor(self)
-
-    def rollback(self):
-        self.await_(self._connection.rollback())
-
-    def commit(self):
-        self.await_(self._connection.commit())
-
     def terminate(self):
         # it's not awaitable.
         self._connection.close()