--- /dev/null
+.. change::
+ :tags: usecase, mssql
+ :tickets: 6521
+
+ Added support for the ``aioodbc`` driver implemented for SQL Server,
+ which builds on top of the pyodbc and general aio* dialect architecture.
+
+ .. seealso::
+
+ :ref:`mssql_aioodbc` - in the SQL Server dialect documentation.
+
+
.. autoclass:: XML
:members: __init__
+.. _mssql_pyodbc:
PyODBC
------
-------
.. automodule:: sqlalchemy.dialects.mssql.pymssql
+
+.. _mssql_aioodbc:
+
+aioodbc
+-------
+
+.. automodule:: sqlalchemy.dialects.mssql.aioodbc
--- /dev/null
+# connectors/aioodbc.py
+# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from .asyncio import AsyncAdapt_dbapi_connection
+from .asyncio import AsyncAdapt_dbapi_cursor
+from .asyncio import AsyncAdapt_dbapi_ss_cursor
+from .asyncio import AsyncAdaptFallback_dbapi_connection
+from .pyodbc import PyODBCConnector
+from .. import pool
+from .. import util
+from ..util.concurrency import await_fallback
+from ..util.concurrency import await_only
+
+if TYPE_CHECKING:
+ from ..engine.interfaces import ConnectArgsType
+ from ..engine.url import URL
+
+
+class AsyncAdapt_aioodbc_cursor(AsyncAdapt_dbapi_cursor):
+ __slots__ = ()
+
+ def setinputsizes(self, *inputsizes):
+ # see https://github.com/aio-libs/aioodbc/issues/451
+ return self._cursor._impl.setinputsizes(*inputsizes)
+
+ # how it's supposed to work
+ # return self.await_(self._cursor.setinputsizes(*inputsizes))
+
+
+class AsyncAdapt_aioodbc_ss_cursor(
+ AsyncAdapt_aioodbc_cursor, AsyncAdapt_dbapi_ss_cursor
+):
+ __slots__ = ()
+
+
+class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection):
+ _cursor_cls = AsyncAdapt_aioodbc_cursor
+ _ss_cursor_cls = AsyncAdapt_aioodbc_ss_cursor
+ __slots__ = ()
+
+ @property
+ def autocommit(self):
+ return self._connection.autocommit
+
+ @autocommit.setter
+ def autocommit(self, value):
+ # https://github.com/aio-libs/aioodbc/issues/448
+ # self._connection.autocommit = value
+
+ self._connection._conn.autocommit = value
+
+ def cursor(self, server_side=False):
+ # aioodbc sets connection=None when closed and just fails with
+ # AttributeError here. Here we use the same ProgrammingError +
+ # message that pyodbc uses, so it triggers is_disconnect() as well.
+ if self._connection.closed:
+ raise self.dbapi.ProgrammingError(
+ "Attempt to use a closed connection."
+ )
+ return super().cursor(server_side=server_side)
+
+ def rollback(self):
+ # aioodbc sets connection=None when closed and just fails with
+ # AttributeError here. should be a no-op
+ if not self._connection.closed:
+ super().rollback()
+
+ def commit(self):
+ # aioodbc sets connection=None when closed and just fails with
+ # AttributeError here. should be a no-op
+ if not self._connection.closed:
+ super().commit()
+
+ def close(self):
+ # aioodbc sets connection=None when closed and just fails with
+ # AttributeError here. should be a no-op
+ if not self._connection.closed:
+ super().close()
+
+
+class AsyncAdaptFallback_aioodbc_connection(
+ AsyncAdaptFallback_dbapi_connection, AsyncAdapt_aioodbc_connection
+):
+ __slots__ = ()
+
+
+class AsyncAdapt_aioodbc_dbapi:
+ def __init__(self, aioodbc, pyodbc):
+ self.aioodbc = aioodbc
+ self.pyodbc = pyodbc
+ self.paramstyle = pyodbc.paramstyle
+ self._init_dbapi_attributes()
+ self.Cursor = AsyncAdapt_dbapi_cursor
+ self.version = pyodbc.version
+
+ def _init_dbapi_attributes(self):
+ for name in (
+ "Warning",
+ "Error",
+ "InterfaceError",
+ "DataError",
+ "DatabaseError",
+ "OperationalError",
+ "InterfaceError",
+ "IntegrityError",
+ "ProgrammingError",
+ "InternalError",
+ "NotSupportedError",
+ "NUMBER",
+ "STRING",
+ "DATETIME",
+ "BINARY",
+ "Binary",
+ "BinaryNull",
+ "SQL_VARCHAR",
+ "SQL_WVARCHAR",
+ ):
+ setattr(self, name, getattr(self.pyodbc, name))
+
+ def connect(self, *arg, **kw):
+ async_fallback = kw.pop("async_fallback", False)
+ creator_fn = kw.pop("async_creator_fn", self.aioodbc.connect)
+
+ if util.asbool(async_fallback):
+ return AsyncAdaptFallback_aioodbc_connection(
+ self,
+ await_fallback(creator_fn(*arg, **kw)),
+ )
+ else:
+ return AsyncAdapt_aioodbc_connection(
+ self,
+ await_only(creator_fn(*arg, **kw)),
+ )
+
+
+class aiodbcConnector(PyODBCConnector):
+ is_async = True
+ supports_statement_cache = True
+
+ supports_server_side_cursors = True
+
+ @classmethod
+ def import_dbapi(cls):
+ return AsyncAdapt_aioodbc_dbapi(
+ __import__("aioodbc"), __import__("pyodbc")
+ )
+
+ def create_connect_args(self, url: URL) -> ConnectArgsType:
+ arg, kw = super().create_connect_args(url)
+ if arg and arg[0]:
+ kw["dsn"] = arg[0]
+
+ return (), kw
+
+ @classmethod
+ def get_pool_class(cls, url):
+ async_fallback = url.query.get("async_fallback", False)
+
+ if util.asbool(async_fallback):
+ return pool.FallbackAsyncAdaptedQueuePool
+ else:
+ return pool.AsyncAdaptedQueuePool
+
+ def _do_isolation_level(self, connection, autocommit, isolation_level):
+ connection.set_autocommit(autocommit)
+ connection.set_isolation_level(isolation_level)
+
+ def _do_autocommit(self, connection, value):
+ connection.set_autocommit(value)
+
+ def set_readonly(self, connection, value):
+ connection.set_read_only(value)
+
+ def set_deferrable(self, connection, value):
+ connection.set_deferrable(value)
+
+ def get_driver_connection(self, connection):
+ return connection._connection
--- /dev/null
+# connectors/asyncio.py
+# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+"""generic asyncio-adapted versions of DBAPI connection and cursor"""
+
+from __future__ import annotations
+
+import collections
+import itertools
+
+from ..engine import AdaptedConnection
+from ..util.concurrency import asyncio
+from ..util.concurrency import await_fallback
+from ..util.concurrency import await_only
+
+
+class AsyncAdapt_dbapi_cursor:
+ server_side = False
+ __slots__ = (
+ "_adapt_connection",
+ "_connection",
+ "await_",
+ "_cursor",
+ "_rows",
+ )
+
+ def __init__(self, adapt_connection):
+ self._adapt_connection = adapt_connection
+ self._connection = adapt_connection._connection
+ self.await_ = adapt_connection.await_
+
+ cursor = self._connection.cursor()
+
+ self._cursor = self.await_(cursor.__aenter__())
+ self._rows = collections.deque()
+
+ @property
+ def description(self):
+ return self._cursor.description
+
+ @property
+ def rowcount(self):
+ return self._cursor.rowcount
+
+ @property
+ def arraysize(self):
+ return self._cursor.arraysize
+
+ @arraysize.setter
+ def arraysize(self, value):
+ self._cursor.arraysize = value
+
+ @property
+ def lastrowid(self):
+ return self._cursor.lastrowid
+
+ def close(self):
+ # note we aren't actually closing the cursor here,
+ # we are just letting GC do it. see notes in aiomysql dialect
+ self._rows.clear()
+
+ def execute(self, operation, parameters=None):
+ return self.await_(self._execute_async(operation, parameters))
+
+ def executemany(self, operation, seq_of_parameters):
+ return self.await_(
+ self._executemany_async(operation, seq_of_parameters)
+ )
+
+ async def _execute_async(self, operation, parameters):
+ async with self._adapt_connection._execute_mutex:
+ result = await self._cursor.execute(operation, parameters or ())
+
+ if self._cursor.description and not self.server_side:
+ # aioodbc has a "fake" async result, so we have to pull it out
+ # of that here since our default result is not async.
+ # we could just as easily grab "_rows" here and be done with it
+ # but this is safer.
+ self._rows = collections.deque(await self._cursor.fetchall())
+ return result
+
+ async def _executemany_async(self, operation, seq_of_parameters):
+ async with self._adapt_connection._execute_mutex:
+ return await self._cursor.executemany(operation, seq_of_parameters)
+
+ def nextset(self):
+ self.await_(self._cursor.nextset())
+ if self._cursor.description and not self.server_side:
+ self._rows = collections.deque(
+ self.await_(self._cursor.fetchall())
+ )
+
+ def setinputsizes(self, *inputsizes):
+ # NOTE: this is overrridden in aioodbc due to
+ # see https://github.com/aio-libs/aioodbc/issues/451
+ # right now
+
+ return self.await_(self._cursor.setinputsizes(*inputsizes))
+
+ def __iter__(self):
+ while self._rows:
+ yield self._rows.popleft()
+
+ def fetchone(self):
+ if self._rows:
+ return self._rows.popleft()
+ else:
+ return None
+
+ def fetchmany(self, size=None):
+ if size is None:
+ size = self.arraysize
+
+ rr = iter(self._rows)
+ retval = list(itertools.islice(rr, 0, size))
+ self._rows = collections.deque(rr)
+ return retval
+
+ def fetchall(self):
+ retval = list(self._rows)
+ self._rows.clear()
+ return retval
+
+
+class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor):
+ __slots__ = ()
+ server_side = True
+
+ def __init__(self, adapt_connection):
+ self._adapt_connection = adapt_connection
+ self._connection = adapt_connection._connection
+ self.await_ = adapt_connection.await_
+
+ cursor = self._connection.cursor()
+
+ self._cursor = self.await_(cursor.__aenter__())
+
+ def close(self):
+ if self._cursor is not None:
+ self.await_(self._cursor.close())
+ self._cursor = None
+
+ def fetchone(self):
+ return self.await_(self._cursor.fetchone())
+
+ def fetchmany(self, size=None):
+ return self.await_(self._cursor.fetchmany(size=size))
+
+ def fetchall(self):
+ return self.await_(self._cursor.fetchall())
+
+
+class AsyncAdapt_dbapi_connection(AdaptedConnection):
+ _cursor_cls = AsyncAdapt_dbapi_cursor
+ _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor
+
+ await_ = staticmethod(await_only)
+ __slots__ = ("dbapi", "_execute_mutex")
+
+ def __init__(self, dbapi, connection):
+ self.dbapi = dbapi
+ self._connection = connection
+ self._execute_mutex = asyncio.Lock()
+
+ def ping(self, reconnect):
+ return self.await_(self._connection.ping(reconnect))
+
+ def add_output_converter(self, *arg, **kw):
+ self._connection.add_output_converter(*arg, **kw)
+
+ def character_set_name(self):
+ return self._connection.character_set_name()
+
+ @property
+ def autocommit(self):
+ return self._connection.autocommit
+
+ @autocommit.setter
+ def autocommit(self, value):
+ # https://github.com/aio-libs/aioodbc/issues/448
+ # self._connection.autocommit = value
+
+ self._connection._conn.autocommit = value
+
+ def cursor(self, server_side=False):
+ if server_side:
+ return self._ss_cursor_cls(self)
+ else:
+ return self._cursor_cls(self)
+
+ def rollback(self):
+ self.await_(self._connection.rollback())
+
+ def commit(self):
+ self.await_(self._connection.commit())
+
+ def close(self):
+ self.await_(self._connection.close())
+
+
+class AsyncAdaptFallback_dbapi_connection(AsyncAdapt_dbapi_connection):
+ __slots__ = ()
+
+ await_ = staticmethod(await_fallback)
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: ignore-errors
+from . import aioodbc # noqa
from . import base # noqa
from . import pymssql # noqa
from . import pyodbc # noqa
--- /dev/null
+# mssql/aioodbc.py
+# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+r"""
+.. dialect:: mssql+aioodbc
+ :name: aioodbc
+ :dbapi: aioodbc
+ :connectstring: mssql+aioodbc://<username>:<password>@<dsnname>
+ :url: https://pypi.org/project/aioodbc/
+
+
+Support for the SQL Server database in asyncio style, using the aioodbc
+driver which itself is a thread-wrapper around pyodbc.
+
+.. versionadded:: 2.0.23 Added the mssql+aioodbc dialect which builds
+ on top of the pyodbc and general aio* dialect architecture.
+
+Using a special asyncio mediation layer, the aioodbc dialect is usable
+as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
+extension package.
+
+Most behaviors and caveats for this driver are the same as that of the
+pyodbc dialect used on SQL Server; see :ref:`mssql_pyodbc` for general
+background.
+
+This dialect should normally be used only with the
+:func:`_asyncio.create_async_engine` engine creation function; connection
+styles are otherwise equivalent to those documented in the pyodbc section::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine(
+ "mssql+aioodbc://scott:tiger@mssql2017:1433/test?"
+ "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
+ )
+
+
+
+"""
+
+from __future__ import annotations
+
+from .pyodbc import MSDialect_pyodbc
+from .pyodbc import MSExecutionContext_pyodbc
+from ...connectors.aioodbc import aiodbcConnector
+
+
+class MSExecutionContext_aioodbc(MSExecutionContext_pyodbc):
+ def create_server_side_cursor(self):
+ return self._dbapi_connection.cursor(server_side=True)
+
+
+class MSDialectAsync_aioodbc(aiodbcConnector, MSDialect_pyodbc):
+ driver = "aioodbc"
+
+ supports_statement_cache = True
+
+ execution_ctx_cls = MSExecutionContext_aioodbc
+
+
+dialect = MSDialectAsync_aioodbc
new_url = url.set(drivername="%s+%s" % (backend, driver))
- if driver != "pyodbc":
+ if driver not in ("pyodbc", "aioodbc"):
new_url = new_url.set(query="")
if query_str:
try:
# fetchall() ensures the cursor is consumed
# without closing it (FreeTDS particularly)
- row = self.cursor.fetchall()[0]
- break
+ rows = self.cursor.fetchall()
except self.dialect.dbapi.Error:
# no way around this - nextset() consumes the previous set
# so we need to just keep flipping
self.cursor.nextset()
+ else:
+ if not rows:
+ # async adapter drivers just return None here
+ self.cursor.nextset()
+ continue
+ row = rows[0]
+ break
self._lastrowid = int(row[0])
class AsyncAdapt_aiomysql_cursor:
+ # TODO: base on connectors/asyncio.py
+ # see #10415
server_side = False
__slots__ = (
"_adapt_connection",
class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor):
+ # TODO: base on connectors/asyncio.py
+ # see #10415
__slots__ = ()
server_side = True
class AsyncAdapt_aiomysql_connection(AdaptedConnection):
+ # TODO: base on connectors/asyncio.py
+ # see #10415
await_ = staticmethod(await_only)
__slots__ = ("dbapi", "_execute_mutex")
class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
+ # TODO: base on connectors/asyncio.py
+ # see #10415
__slots__ = ()
await_ = staticmethod(await_fallback)
class AsyncAdapt_asyncmy_cursor:
+ # TODO: base on connectors/asyncio.py
+ # see #10415
server_side = False
__slots__ = (
"_adapt_connection",
class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
+ # TODO: base on connectors/asyncio.py
+ # see #10415
__slots__ = ()
server_side = True
class AsyncAdapt_asyncmy_connection(AdaptedConnection):
+ # TODO: base on connectors/asyncio.py
+ # see #10415
await_ = staticmethod(await_only)
__slots__ = ("dbapi", "_execute_mutex")
class AsyncAdapt_aiosqlite_cursor:
+ # TODO: base on connectors/asyncio.py
+ # see #10415
+
__slots__ = (
"_adapt_connection",
"_connection",
class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
+ # TODO: base on connectors/asyncio.py
+ # see #10415
__slots__ = "_cursor"
server_side = True
if self.statement != stmt.statement or (
self.params is not None and self.params != stmt.parameters
):
+ self.consume_statement = True
self.errormessage = (
"Testing for exact SQL %s parameters %s received %s %s"
% (
elif self.engine.dialect.driver == "pymysql":
sscursor = __import__("pymysql.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
- elif self.engine.dialect.driver in ("aiomysql", "asyncmy"):
+ elif self.engine.dialect.driver in ("aiomysql", "asyncmy", "aioodbc"):
return cursor.server_side
elif self.engine.dialect.driver == "mysqldb":
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
True,
"SELECT 1 FOR UPDATE",
True,
- testing.skip_if("sqlite"),
+ testing.skip_if(["sqlite", "mssql"]),
),
("text_no_ss", False, text("select 42"), False),
(
aiomysql =
%(asyncio)s
aiomysql>=0.2.0
+aioodbc =
+ %(asyncio)s
+ aioodbc
asyncmy =
%(asyncio)s
asyncmy>=0.2.3,!=0.2.4,!=0.2.6
mariadb = mariadb+mysqldb://scott:tiger@127.0.0.1:3306/test
mariadb_connector = mariadb+mariadbconnector://scott:tiger@127.0.0.1:3306/test
mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes
+mssql_async = mssql+aioodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes
pymssql = mssql+pymssql://scott:tiger^5HHH@mssql2017:1433/test
docker_mssql = mssql+pyodbc://scott:tiger^5HHH@127.0.0.1:1433/test?driver=ODBC+Driver+18+for+SQL+Server
oracle = oracle+cx_oracle://scott:tiger@oracle18c/xe
def process_bind_param(self, value, dialect):
if value:
- value.stuff = "BIND" + value.stuff
+ value = pickleable.Foo(value.moredata, stuff="BIND" + value.stuff)
return value
def process_result_value(self, value, dialect):
oracle: oracle
oracle: oracle_oracledb
- py{3,37,38,39,310,311}-mssql: mssql
+ mssql: mssql
+ mssql: aioodbc
py{3,37,38,39,310,311}-mssql: mssql_pymssql
install_command=
mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver asyncmy --dbdriver aiomysql --dbdriver mariadbconnector}
mssql: MSSQL={env:TOX_MSSQL:--db mssql}
- py{3,37,38,39,310,311}-mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql}
- py312-mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc}
+ py{3,37,38,39,310,311}-mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc --dbdriver pymssql}
+ py312-mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc}
oracle,mssql,sqlite_file: IDENTS=--write-idents db_idents.txt