from ..util.concurrency import await_fallback
from ..util.concurrency import await_only
+
if TYPE_CHECKING:
from ..engine.interfaces import ConnectArgsType
from ..engine.url import URL
self._connection._conn.autocommit = value
+ def ping(self, reconnect):
+ return self.await_(self._connection.ping(reconnect))
+
+ def add_output_converter(self, *arg, **kw):
+ self._connection.add_output_converter(*arg, **kw)
+
+ def character_set_name(self):
+ return self._connection.character_set_name()
+
def cursor(self, server_side=False):
# aioodbc sets connection=None when closed and just fails with
# AttributeError here. Here we use the same ProgrammingError +
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
"""generic asyncio-adapted versions of DBAPI connection and cursor"""
from __future__ import annotations
+import asyncio
import collections
+import sys
+from typing import Any
+from typing import AsyncIterator
+from typing import Deque
+from typing import Iterator
+from typing import NoReturn
+from typing import Optional
+from typing import Sequence
from ..engine import AdaptedConnection
-from ..util.concurrency import asyncio
+from ..engine.interfaces import _DBAPICursorDescription
+from ..engine.interfaces import _DBAPIMultiExecuteParams
+from ..engine.interfaces import _DBAPISingleExecuteParams
from ..util.concurrency import await_fallback
from ..util.concurrency import await_only
+from ..util.typing import Protocol
+from ..util.typing import Self
+
+
+class AsyncIODBAPIConnection(Protocol):
+ """protocol representing an async adapted version of a
+ :pep:`249` database connection.
+
+
+ """
+
+ async def close(self) -> None: ...
+
+ async def commit(self) -> None: ...
+
+ def cursor(self, *args: Any, **kwargs: Any) -> AsyncIODBAPICursor: ...
+
+ async def rollback(self) -> None: ...
+
+
+class AsyncIODBAPICursor(Protocol):
+ """protocol representing an async adapted version
+ of a :pep:`249` database cursor.
+
+
+ """
+
+ def __aenter__(self) -> Any: ...
+
+ @property
+ def description(
+ self,
+ ) -> _DBAPICursorDescription:
+ """The description attribute of the Cursor."""
+ ...
+
+ @property
+ def rowcount(self) -> int: ...
+
+ arraysize: int
+
+ lastrowid: int
+
+ async def close(self) -> None: ...
+
+ async def execute(
+ self,
+ operation: Any,
+ parameters: Optional[_DBAPISingleExecuteParams] = None,
+ ) -> Any: ...
+
+ async def executemany(
+ self,
+ operation: Any,
+ parameters: _DBAPIMultiExecuteParams,
+ ) -> Any: ...
+
+ async def fetchone(self) -> Optional[Any]: ...
+
+ async def fetchmany(self, size: Optional[int] = ...) -> Sequence[Any]: ...
+
+ async def fetchall(self) -> Sequence[Any]: ...
+
+ async def setinputsizes(self, sizes: Sequence[Any]) -> None: ...
+
+ def setoutputsize(self, size: Any, column: Any) -> None: ...
+
+ async def callproc(
+ self, procname: str, parameters: Sequence[Any] = ...
+ ) -> Any: ...
+
+ async def nextset(self) -> Optional[bool]: ...
+
+ def __aiter__(self) -> AsyncIterator[Any]: ...
class AsyncAdapt_dbapi_cursor:
"_rows",
)
- def __init__(self, adapt_connection):
+ _cursor: AsyncIODBAPICursor
+ _adapt_connection: AsyncAdapt_dbapi_connection
+ _connection: AsyncIODBAPIConnection
+ _rows: Deque[Any]
+
+ def __init__(self, adapt_connection: AsyncAdapt_dbapi_connection):
self._adapt_connection = adapt_connection
self._connection = adapt_connection._connection
+
self.await_ = adapt_connection.await_
- cursor = self._connection.cursor()
+ cursor = self._make_new_cursor(self._connection)
self._cursor = self._aenter_cursor(cursor)
if not self.server_side:
self._rows = collections.deque()
- def _aenter_cursor(self, cursor):
- return self.await_(cursor.__aenter__())
+ def _aenter_cursor(self, cursor: AsyncIODBAPICursor) -> AsyncIODBAPICursor:
+ return self.await_(cursor.__aenter__()) # type: ignore[no-any-return]
+
+ def _make_new_cursor(
+ self, connection: AsyncIODBAPIConnection
+ ) -> AsyncIODBAPICursor:
+ return connection.cursor()
@property
- def description(self):
+ def description(self) -> Optional[_DBAPICursorDescription]:
return self._cursor.description
@property
- def rowcount(self):
+ def rowcount(self) -> int:
return self._cursor.rowcount
@property
- def arraysize(self):
+ def arraysize(self) -> int:
return self._cursor.arraysize
@arraysize.setter
- def arraysize(self, value):
+ def arraysize(self, value: int) -> None:
self._cursor.arraysize = value
@property
- def lastrowid(self):
+ def lastrowid(self) -> int:
return self._cursor.lastrowid
- def close(self):
+ def close(self) -> None:
# note we aren't actually closing the cursor here,
# we are just letting GC do it. see notes in aiomysql dialect
self._rows.clear()
- def execute(self, operation, parameters=None):
- return self.await_(self._execute_async(operation, parameters))
-
- def executemany(self, operation, seq_of_parameters):
- return self.await_(
- self._executemany_async(operation, seq_of_parameters)
- )
+ def execute(
+ self,
+ operation: Any,
+ parameters: Optional[_DBAPISingleExecuteParams] = None,
+ ) -> Any:
+ try:
+ return self.await_(self._execute_async(operation, parameters))
+ except Exception as error:
+ self._adapt_connection._handle_exception(error)
+
+ def executemany(
+ self,
+ operation: Any,
+ seq_of_parameters: _DBAPIMultiExecuteParams,
+ ) -> Any:
+ try:
+ return self.await_(
+ self._executemany_async(operation, seq_of_parameters)
+ )
+ except Exception as error:
+ self._adapt_connection._handle_exception(error)
- async def _execute_async(self, operation, parameters):
+ async def _execute_async(
+ self, operation: Any, parameters: Optional[_DBAPISingleExecuteParams]
+ ) -> Any:
async with self._adapt_connection._execute_mutex:
- result = await self._cursor.execute(operation, parameters or ())
+ if parameters is None:
+ result = await self._cursor.execute(operation)
+ else:
+ result = await self._cursor.execute(operation, parameters)
if self._cursor.description and not self.server_side:
self._rows = collections.deque(await self._cursor.fetchall())
return result
- async def _executemany_async(self, operation, seq_of_parameters):
+ async def _executemany_async(
+ self,
+ operation: Any,
+ seq_of_parameters: _DBAPIMultiExecuteParams,
+ ) -> Any:
async with self._adapt_connection._execute_mutex:
return await self._cursor.executemany(operation, seq_of_parameters)
- def nextset(self):
+ def nextset(self) -> None:
self.await_(self._cursor.nextset())
if self._cursor.description and not self.server_side:
self._rows = collections.deque(
self.await_(self._cursor.fetchall())
)
- def setinputsizes(self, *inputsizes):
+ def setinputsizes(self, *inputsizes: Any) -> None:
# NOTE: this is overrridden in aioodbc due to
# see https://github.com/aio-libs/aioodbc/issues/451
# right now
return self.await_(self._cursor.setinputsizes(*inputsizes))
- def __iter__(self):
+ def __enter__(self) -> Self:
+ return self
+
+ def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ self.close()
+
+ def __iter__(self) -> Iterator[Any]:
while self._rows:
yield self._rows.popleft()
- def fetchone(self):
+ def fetchone(self) -> Optional[Any]:
if self._rows:
return self._rows.popleft()
else:
return None
- def fetchmany(self, size=None):
+ def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
if size is None:
size = self.arraysize
rr = self._rows
return [rr.popleft() for _ in range(min(size, len(rr)))]
- def fetchall(self):
+ def fetchall(self) -> Sequence[Any]:
retval = list(self._rows)
self._rows.clear()
return retval
__slots__ = ()
server_side = True
- def __init__(self, adapt_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
-
- cursor = self._connection.cursor()
-
- self._cursor = self.await_(cursor.__aenter__())
-
- def close(self):
+ def close(self) -> None:
if self._cursor is not None:
self.await_(self._cursor.close())
- self._cursor = None
+ self._cursor = None # type: ignore
- def fetchone(self):
+ def fetchone(self) -> Optional[Any]:
return self.await_(self._cursor.fetchone())
- def fetchmany(self, size=None):
+ def fetchmany(self, size: Optional[int] = None) -> Any:
return self.await_(self._cursor.fetchmany(size=size))
- def fetchall(self):
+ def fetchall(self) -> Sequence[Any]:
return self.await_(self._cursor.fetchall())
- def __iter__(self):
+ def __iter__(self) -> Iterator[Any]:
iterator = self._cursor.__aiter__()
while True:
try:
_ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor
await_ = staticmethod(await_only)
+
__slots__ = ("dbapi", "_execute_mutex")
- def __init__(self, dbapi, connection):
+ _connection: AsyncIODBAPIConnection
+
+ def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection):
self.dbapi = dbapi
self._connection = connection
self._execute_mutex = asyncio.Lock()
- def ping(self, reconnect):
- return self.await_(self._connection.ping(reconnect))
-
- def add_output_converter(self, *arg, **kw):
- self._connection.add_output_converter(*arg, **kw)
-
- def character_set_name(self):
- return self._connection.character_set_name()
-
- @property
- def autocommit(self):
- return self._connection.autocommit
-
- @autocommit.setter
- def autocommit(self, value):
- # https://github.com/aio-libs/aioodbc/issues/448
- # self._connection.autocommit = value
-
- self._connection._conn.autocommit = value
-
- def cursor(self, server_side=False):
+ def cursor(self, server_side: bool = False) -> AsyncAdapt_dbapi_cursor:
if server_side:
return self._ss_cursor_cls(self)
else:
return self._cursor_cls(self)
- def rollback(self):
- self.await_(self._connection.rollback())
-
- def commit(self):
- self.await_(self._connection.commit())
-
- def close(self):
+ def execute(
+ self,
+ operation: Any,
+ parameters: Optional[_DBAPISingleExecuteParams] = None,
+ ) -> Any:
+ """lots of DBAPIs seem to provide this, so include it"""
+ cursor = self.cursor()
+ cursor.execute(operation, parameters)
+ return cursor
+
+ def _handle_exception(self, error: Exception) -> NoReturn:
+ exc_info = sys.exc_info()
+
+ raise error.with_traceback(exc_info[2])
+
+ def rollback(self) -> None:
+ try:
+ self.await_(self._connection.rollback())
+ except Exception as error:
+ self._handle_exception(error)
+
+ def commit(self) -> None:
+ try:
+ self.await_(self._connection.commit())
+ except Exception as error:
+ self._handle_exception(error)
+
+ def close(self) -> None:
self.await_(self._connection.close())
)
""" # noqa
-from collections import deque
-
from .pymysql import MySQLDialect_pymysql
from ... import pool
from ... import util
-from ...engine import AdaptedConnection
-from ...util.concurrency import asyncio
+from ...connectors.asyncio import AsyncAdapt_dbapi_connection
+from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
+from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
-class AsyncAdapt_aiomysql_cursor:
- # TODO: base on connectors/asyncio.py
- # see #10415
- server_side = False
- __slots__ = (
- "_adapt_connection",
- "_connection",
- "await_",
- "_cursor",
- "_rows",
- )
-
- def __init__(self, adapt_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
-
- cursor = self._connection.cursor(adapt_connection.dbapi.Cursor)
-
- # see https://github.com/aio-libs/aiomysql/issues/543
- self._cursor = self.await_(cursor.__aenter__())
- self._rows = deque()
-
- @property
- def description(self):
- return self._cursor.description
-
- @property
- def rowcount(self):
- return self._cursor.rowcount
-
- @property
- def arraysize(self):
- return self._cursor.arraysize
-
- @arraysize.setter
- def arraysize(self, value):
- self._cursor.arraysize = value
-
- @property
- def lastrowid(self):
- return self._cursor.lastrowid
-
- def close(self):
- # note we aren't actually closing the cursor here,
- # we are just letting GC do it. to allow this to be async
- # we would need the Result to change how it does "Safe close cursor".
- # MySQL "cursors" don't actually have state to be "closed" besides
- # exhausting rows, which we already have done for sync cursor.
- # another option would be to emulate aiosqlite dialect and assign
- # cursor only if we are doing server side cursor operation.
- self._rows.clear()
-
- def execute(self, operation, parameters=None):
- return self.await_(self._execute_async(operation, parameters))
-
- def executemany(self, operation, seq_of_parameters):
- return self.await_(
- self._executemany_async(operation, seq_of_parameters)
- )
-
- async def _execute_async(self, operation, parameters):
- async with self._adapt_connection._execute_mutex:
- result = await self._cursor.execute(operation, parameters)
-
- if not self.server_side:
- # aiomysql has a "fake" async result, so we have to pull it out
- # of that here since our default result is not async.
- # we could just as easily grab "_rows" here and be done with it
- # but this is safer.
- self._rows = deque(await self._cursor.fetchall())
- return result
-
- async def _executemany_async(self, operation, seq_of_parameters):
- async with self._adapt_connection._execute_mutex:
- return await self._cursor.executemany(operation, seq_of_parameters)
-
- def setinputsizes(self, *inputsizes):
- pass
-
- def __iter__(self):
- while self._rows:
- yield self._rows.popleft()
-
- def fetchone(self):
- if self._rows:
- return self._rows.popleft()
- else:
- return None
-
- def fetchmany(self, size=None):
- if size is None:
- size = self.arraysize
-
- rr = self._rows
- return [rr.popleft() for _ in range(min(size, len(rr)))]
-
- def fetchall(self):
- retval = list(self._rows)
- self._rows.clear()
- return retval
-
-
-class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor):
- # TODO: base on connectors/asyncio.py
- # see #10415
+class AsyncAdapt_aiomysql_cursor(AsyncAdapt_dbapi_cursor):
__slots__ = ()
- server_side = True
-
- def __init__(self, adapt_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
- cursor = self._connection.cursor(adapt_connection.dbapi.SSCursor)
+ def _make_new_cursor(self, connection):
+ return connection.cursor(self._adapt_connection.dbapi.Cursor)
- self._cursor = self.await_(cursor.__aenter__())
- def close(self):
- if self._cursor is not None:
- self.await_(self._cursor.close())
- self._cursor = None
-
- def fetchone(self):
- return self.await_(self._cursor.fetchone())
-
- def fetchmany(self, size=None):
- return self.await_(self._cursor.fetchmany(size=size))
+class AsyncAdapt_aiomysql_ss_cursor(
+ AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_aiomysql_cursor
+):
+ __slots__ = ()
- def fetchall(self):
- return self.await_(self._cursor.fetchall())
+ def _make_new_cursor(self, connection):
+ return connection.cursor(
+ self._adapt_connection.dbapi.aiomysql.cursors.SSCursor
+ )
-class AsyncAdapt_aiomysql_connection(AdaptedConnection):
- # TODO: base on connectors/asyncio.py
- # see #10415
- await_ = staticmethod(await_only)
- __slots__ = ("dbapi", "_execute_mutex")
+class AsyncAdapt_aiomysql_connection(AsyncAdapt_dbapi_connection):
+ __slots__ = ()
- def __init__(self, dbapi, connection):
- self.dbapi = dbapi
- self._connection = connection
- self._execute_mutex = asyncio.Lock()
+ _cursor_cls = AsyncAdapt_aiomysql_cursor
+ _ss_cursor_cls = AsyncAdapt_aiomysql_ss_cursor
def ping(self, reconnect):
+ assert not reconnect
return self.await_(self._connection.ping(reconnect))
def character_set_name(self):
def autocommit(self, value):
self.await_(self._connection.autocommit(value))
- def cursor(self, server_side=False):
- if server_side:
- return AsyncAdapt_aiomysql_ss_cursor(self)
- else:
- return AsyncAdapt_aiomysql_cursor(self)
-
- def rollback(self):
- self.await_(self._connection.rollback())
-
- def commit(self):
- self.await_(self._connection.commit())
-
def terminate(self):
# it's not awaitable.
self._connection.close()
class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
- # TODO: base on connectors/asyncio.py
- # see #10415
__slots__ = ()
await_ = staticmethod(await_fallback)
)
""" # noqa
-from collections import deque
-from contextlib import asynccontextmanager
+from __future__ import annotations
from .pymysql import MySQLDialect_pymysql
from ... import pool
from ... import util
-from ...engine import AdaptedConnection
-from ...util.concurrency import asyncio
+from ...connectors.asyncio import AsyncAdapt_dbapi_connection
+from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
+from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
-class AsyncAdapt_asyncmy_cursor:
- # TODO: base on connectors/asyncio.py
- # see #10415
- server_side = False
- __slots__ = (
- "_adapt_connection",
- "_connection",
- "await_",
- "_cursor",
- "_rows",
- )
-
- def __init__(self, adapt_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
-
- cursor = self._connection.cursor()
-
- self._cursor = self.await_(cursor.__aenter__())
- self._rows = deque()
-
- @property
- def description(self):
- return self._cursor.description
-
- @property
- def rowcount(self):
- return self._cursor.rowcount
-
- @property
- def arraysize(self):
- return self._cursor.arraysize
-
- @arraysize.setter
- def arraysize(self, value):
- self._cursor.arraysize = value
-
- @property
- def lastrowid(self):
- return self._cursor.lastrowid
-
- def close(self):
- # note we aren't actually closing the cursor here,
- # we are just letting GC do it. to allow this to be async
- # we would need the Result to change how it does "Safe close cursor".
- # MySQL "cursors" don't actually have state to be "closed" besides
- # exhausting rows, which we already have done for sync cursor.
- # another option would be to emulate aiosqlite dialect and assign
- # cursor only if we are doing server side cursor operation.
- self._rows.clear()
-
- def execute(self, operation, parameters=None):
- return self.await_(self._execute_async(operation, parameters))
-
- def executemany(self, operation, seq_of_parameters):
- return self.await_(
- self._executemany_async(operation, seq_of_parameters)
- )
-
- async def _execute_async(self, operation, parameters):
- async with self._adapt_connection._mutex_and_adapt_errors():
- if parameters is None:
- result = await self._cursor.execute(operation)
- else:
- result = await self._cursor.execute(operation, parameters)
-
- if not self.server_side:
- # asyncmy has a "fake" async result, so we have to pull it out
- # of that here since our default result is not async.
- # we could just as easily grab "_rows" here and be done with it
- # but this is safer.
- self._rows = deque(await self._cursor.fetchall())
- return result
-
- async def _executemany_async(self, operation, seq_of_parameters):
- async with self._adapt_connection._mutex_and_adapt_errors():
- return await self._cursor.executemany(operation, seq_of_parameters)
-
- def setinputsizes(self, *inputsizes):
- pass
-
- def __iter__(self):
- while self._rows:
- yield self._rows.popleft()
-
- def fetchone(self):
- if self._rows:
- return self._rows.popleft()
- else:
- return None
-
- def fetchmany(self, size=None):
- if size is None:
- size = self.arraysize
-
- rr = self._rows
- return [rr.popleft() for _ in range(min(size, len(rr)))]
-
- def fetchall(self):
- retval = list(self._rows)
- self._rows.clear()
- return retval
+class AsyncAdapt_asyncmy_cursor(AsyncAdapt_dbapi_cursor):
+ __slots__ = ()
-class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
- # TODO: base on connectors/asyncio.py
- # see #10415
+class AsyncAdapt_asyncmy_ss_cursor(
+ AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_asyncmy_cursor
+):
__slots__ = ()
- server_side = True
- def __init__(self, adapt_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
-
- cursor = self._connection.cursor(
- adapt_connection.dbapi.asyncmy.cursors.SSCursor
+ def _make_new_cursor(self, connection):
+ return connection.cursor(
+ self._adapt_connection.dbapi.asyncmy.cursors.SSCursor
)
- self._cursor = self.await_(cursor.__aenter__())
-
- def close(self):
- if self._cursor is not None:
- self.await_(self._cursor.close())
- self._cursor = None
-
- def fetchone(self):
- return self.await_(self._cursor.fetchone())
-
- def fetchmany(self, size=None):
- return self.await_(self._cursor.fetchmany(size=size))
-
- def fetchall(self):
- return self.await_(self._cursor.fetchall())
+class AsyncAdapt_asyncmy_connection(AsyncAdapt_dbapi_connection):
+ __slots__ = ()
-class AsyncAdapt_asyncmy_connection(AdaptedConnection):
- # TODO: base on connectors/asyncio.py
- # see #10415
- await_ = staticmethod(await_only)
- __slots__ = ("dbapi", "_execute_mutex")
+ _cursor_cls = AsyncAdapt_asyncmy_cursor
+ _ss_cursor_cls = AsyncAdapt_asyncmy_ss_cursor
- def __init__(self, dbapi, connection):
- self.dbapi = dbapi
- self._connection = connection
- self._execute_mutex = asyncio.Lock()
+ def _handle_exception(self, error):
+ if isinstance(error, AttributeError):
+ raise self.dbapi.InternalError(
+ "network operation failed due to asyncmy attribute error"
+ )
- @asynccontextmanager
- async def _mutex_and_adapt_errors(self):
- async with self._execute_mutex:
- try:
- yield
- except AttributeError:
- raise self.dbapi.InternalError(
- "network operation failed due to asyncmy attribute error"
- )
+ raise error
def ping(self, reconnect):
assert not reconnect
return self.await_(self._do_ping())
async def _do_ping(self):
- async with self._mutex_and_adapt_errors():
- return await self._connection.ping(False)
+ try:
+ async with self._execute_mutex:
+ return await self._connection.ping(False)
+ except Exception as error:
+ self._handle_exception(error)
def character_set_name(self):
return self._connection.character_set_name()
def autocommit(self, value):
self.await_(self._connection.autocommit(value))
- def cursor(self, server_side=False):
- if server_side:
- return AsyncAdapt_asyncmy_ss_cursor(self)
- else:
- return AsyncAdapt_asyncmy_cursor(self)
-
- def rollback(self):
- self.await_(self._connection.rollback())
-
- def commit(self):
- self.await_(self._connection.commit())
-
def terminate(self):
# it's not awaitable.
self._connection.close()