From 11eecfacb7b36c209c1ad726f5e5b7525860977b Mon Sep 17 00:00:00 2001 From: long2ice Date: Thu, 16 Sep 2021 11:08:25 -0400 Subject: [PATCH] Add `asyncmy` support Added initial support for the ``asyncmy`` asyncio database driver for MySQL and MariaDB. This driver is very new, however appears to be the only current alternative to the ``aiomysql`` driver which currently appears to be unmaintained and is not working with current Python versions. Much thanks to long2ice for the pull request for this dialect. Fixes: #6993 Closes: #7000 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/7000 Pull-request-sha: f7d6c811fc72324a83c8af635bbca8b268b0098e Change-Id: I4ef54b43334feff7e3a710fc4de6821437f3bb68 --- doc/build/changelog/unreleased_14/6993.rst | 13 + doc/build/dialects/mysql.rst | 9 +- lib/sqlalchemy/dialects/mysql/__init__.py | 3 +- lib/sqlalchemy/dialects/mysql/aiomysql.py | 12 +- lib/sqlalchemy/dialects/mysql/asyncmy.py | 340 +++++++++++++++++++ lib/sqlalchemy/dialects/mysql/base.py | 5 +- lib/sqlalchemy/testing/suite/test_results.py | 2 +- setup.cfg | 15 +- test/dialect/mysql/test_types.py | 4 +- test/engine/test_reconnect.py | 1 + tox.ini | 5 +- 11 files changed, 388 insertions(+), 21 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/6993.rst create mode 100644 lib/sqlalchemy/dialects/mysql/asyncmy.py diff --git a/doc/build/changelog/unreleased_14/6993.rst b/doc/build/changelog/unreleased_14/6993.rst new file mode 100644 index 0000000000..fd2122eef7 --- /dev/null +++ b/doc/build/changelog/unreleased_14/6993.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: feature, asyncio, mysql + :tickets: 6993 + + Added initial support for the ``asyncmy`` asyncio database driver for MySQL + and MariaDB. This driver is very new, however appears to be the only + current alternative to the ``aiomysql`` driver which currently appears to + be unmaintained and is not working with current Python versions. Much + thanks to long2ice for the pull request for this dialect. + + .. seealso:: + + :ref:`asyncmy` diff --git a/doc/build/dialects/mysql.rst b/doc/build/dialects/mysql.rst index 573c2598c0..9eb7f5a740 100644 --- a/doc/build/dialects/mysql.rst +++ b/doc/build/dialects/mysql.rst @@ -189,6 +189,14 @@ MySQL-Connector .. automodule:: sqlalchemy.dialects.mysql.mysqlconnector +.. _asyncmy: + +asyncmy +------- + +.. automodule:: sqlalchemy.dialects.mysql.asyncmy + + .. _aiomysql: aiomysql @@ -210,4 +218,3 @@ pyodbc ------ .. automodule:: sqlalchemy.dialects.mysql.pyodbc - diff --git a/lib/sqlalchemy/dialects/mysql/__init__.py b/lib/sqlalchemy/dialects/mysql/__init__.py index 8631ea3b93..c83fec0c39 100644 --- a/lib/sqlalchemy/dialects/mysql/__init__.py +++ b/lib/sqlalchemy/dialects/mysql/__init__.py @@ -54,12 +54,11 @@ from ...util import compat if compat.py3k: from . import aiomysql # noqa - + from . import asyncmy # noqa # default dialect base.dialect = dialect = mysqldb.dialect - __all__ = ( "BIGINT", "BINARY", diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py index 3275d1888c..c9a87145e8 100644 --- a/lib/sqlalchemy/dialects/mysql/aiomysql.py +++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py @@ -11,6 +11,10 @@ r""" :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...] :url: https://github.com/aio-libs/aiomysql +.. warning:: The aiomysql dialect as of September, 2021 appears to be unmaintained + and no longer functions for Python version 3.10. Please refer to the + :ref:`asyncmy` dialect for current MySQL asyncio functionality. + The aiomysql dialect is SQLAlchemy's second Python asyncio dialect. Using a special asyncio mediation layer, the aiomysql dialect is usable @@ -21,13 +25,7 @@ This dialect should normally be used only with the :func:`_asyncio.create_async_engine` engine creation function:: from sqlalchemy.ext.asyncio import create_async_engine - engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname") - -Unicode -------- - -Please see :ref:`mysql_unicode` for current recommendations on unicode -handling. + engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4") """ # noqa diff --git a/lib/sqlalchemy/dialects/mysql/asyncmy.py b/lib/sqlalchemy/dialects/mysql/asyncmy.py new file mode 100644 index 0000000000..f312cf79bd --- /dev/null +++ b/lib/sqlalchemy/dialects/mysql/asyncmy.py @@ -0,0 +1,340 @@ +# mysql/asyncmy.py +# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +r""" +.. dialect:: mysql+asyncmy + :name: asyncmy + :dbapi: asyncmy + :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...] + :url: https://github.com/long2ice/asyncmy + +.. note:: The asyncmy dialect as of September, 2021 was added to provide + MySQL/MariaDB asyncio compatibility given that the :ref:`aiomysql` database + driver has become unmaintained, however asyncmy is itself very new. + +Using a special asyncio mediation layer, the asyncmy dialect is usable +as the backend for the :ref:`SQLAlchemy asyncio ` +extension package. + +This dialect should normally be used only with the +:func:`_asyncio.create_async_engine` engine creation function:: + + from sqlalchemy.ext.asyncio import create_async_engine + engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4") + + +""" # noqa + +import contextlib + +from .pymysql import MySQLDialect_pymysql +from ... import pool +from ... import util +from ...util.concurrency import asyncio +from ...util.concurrency import await_fallback +from ...util.concurrency import await_only + + +class AsyncAdapt_asyncmy_cursor: + server_side = False + __slots__ = ( + "_adapt_connection", + "_connection", + "await_", + "_cursor", + "_rows", + ) + + def __init__(self, adapt_connection): + self._adapt_connection = adapt_connection + self._connection = adapt_connection._connection + self.await_ = adapt_connection.await_ + + cursor = self._connection.cursor() + + self._cursor = self.await_(cursor.__aenter__()) + self._rows = [] + + @property + def description(self): + return self._cursor.description + + @property + def rowcount(self): + return self._cursor.rowcount + + @property + def arraysize(self): + return self._cursor.arraysize + + @arraysize.setter + def arraysize(self, value): + self._cursor.arraysize = value + + @property + def lastrowid(self): + return self._cursor.lastrowid + + def close(self): + # note we aren't actually closing the cursor here, + # we are just letting GC do it. to allow this to be async + # we would need the Result to change how it does "Safe close cursor". + # MySQL "cursors" don't actually have state to be "closed" besides + # exhausting rows, which we already have done for sync cursor. + # another option would be to emulate aiosqlite dialect and assign + # cursor only if we are doing server side cursor operation. + self._rows[:] = [] + + def execute(self, operation, parameters=None): + return self.await_(self._execute_async(operation, parameters)) + + def executemany(self, operation, seq_of_parameters): + return self.await_( + self._executemany_async(operation, seq_of_parameters) + ) + + async def _execute_async(self, operation, parameters): + async with self._adapt_connection._mutex_and_adapt_errors(): + if parameters is None: + result = await self._cursor.execute(operation) + else: + result = await self._cursor.execute(operation, parameters) + + if not self.server_side: + # asyncmy has a "fake" async result, so we have to pull it out + # of that here since our default result is not async. + # we could just as easily grab "_rows" here and be done with it + # but this is safer. + self._rows = list(await self._cursor.fetchall()) + return result + + async def _executemany_async(self, operation, seq_of_parameters): + async with self._adapt_connection._mutex_and_adapt_errors(): + return await self._cursor.executemany(operation, seq_of_parameters) + + def setinputsizes(self, *inputsizes): + pass + + def __iter__(self): + while self._rows: + yield self._rows.pop(0) + + def fetchone(self): + if self._rows: + return self._rows.pop(0) + else: + return None + + def fetchmany(self, size=None): + if size is None: + size = self.arraysize + + retval = self._rows[0:size] + self._rows[:] = self._rows[size:] + return retval + + def fetchall(self): + retval = self._rows[:] + self._rows[:] = [] + return retval + + +class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor): + __slots__ = () + server_side = True + + def __init__(self, adapt_connection): + self._adapt_connection = adapt_connection + self._connection = adapt_connection._connection + self.await_ = adapt_connection.await_ + + adapt_connection._ss_cursors.add(self) + + cursor = self._connection.cursor( + adapt_connection.dbapi.asyncmy.cursors.SSCursor + ) + + self._cursor = self.await_(cursor.__aenter__()) + + def close(self): + try: + if self._cursor is not None: + self.await_(self._cursor.fetchall()) + self.await_(self._cursor.close()) + self._cursor = None + finally: + self._adapt_connection._ss_cursors.discard(self) + + def fetchone(self): + return self.await_(self._cursor.fetchone()) + + def fetchmany(self, size=None): + return self.await_(self._cursor.fetchmany(size=size)) + + def fetchall(self): + return self.await_(self._cursor.fetchall()) + + +class AsyncAdapt_asyncmy_connection: + await_ = staticmethod(await_only) + __slots__ = ("dbapi", "_connection", "_execute_mutex", "_ss_cursors") + + def __init__(self, dbapi, connection): + self.dbapi = dbapi + self._connection = connection + self._execute_mutex = asyncio.Lock() + self._ss_cursors = set() + + @contextlib.asynccontextmanager + async def _mutex_and_adapt_errors(self): + async with self._execute_mutex: + try: + yield + except AttributeError: + raise self.dbapi.InternalError( + "network operation failed due to asyncmy attribute error" + ) + + def ping(self, reconnect): + assert not reconnect + return self.await_(self._do_ping()) + + async def _do_ping(self): + async with self._mutex_and_adapt_errors(): + return await self._connection.ping(False) + + def character_set_name(self): + return self._connection.character_set_name() + + def autocommit(self, value): + self.await_(self._connection.autocommit(value)) + + def cursor(self, server_side=False): + if server_side: + return AsyncAdapt_asyncmy_ss_cursor(self) + else: + return AsyncAdapt_asyncmy_cursor(self) + + def _shutdown_ss_cursors(self): + for curs in list(self._ss_cursors): + curs.close() + + def rollback(self): + self._shutdown_ss_cursors() + self.await_(self._connection.rollback()) + + def commit(self): + self._shutdown_ss_cursors() + self.await_(self._connection.commit()) + + def close(self): + self._shutdown_ss_cursors() + # it's not awaitable. + self._connection.close() + + +class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection): + __slots__ = () + + await_ = staticmethod(await_fallback) + + +class AsyncAdapt_asyncmy_dbapi: + def __init__(self, asyncmy, pymysql): + self.asyncmy = asyncmy + self.pymysql = pymysql + self.paramstyle = "format" + self._init_dbapi_attributes() + + def _init_dbapi_attributes(self): + for name in ( + "Warning", + "Error", + "InterfaceError", + "DataError", + "DatabaseError", + "OperationalError", + "InterfaceError", + "IntegrityError", + "ProgrammingError", + "InternalError", + "NotSupportedError", + ): + setattr(self, name, getattr(self.asyncmy.errors, name)) + + for name in ( + "NUMBER", + "STRING", + "DATETIME", + "BINARY", + "TIMESTAMP", + "Binary", + ): + setattr(self, name, getattr(self.pymysql, name)) + + def connect(self, *arg, **kw): + async_fallback = kw.pop("async_fallback", False) + + if util.asbool(async_fallback): + return AsyncAdaptFallback_asyncmy_connection( + self, + await_fallback(self.asyncmy.connect(*arg, **kw)), + ) + else: + return AsyncAdapt_asyncmy_connection( + self, + await_only(self.asyncmy.connect(*arg, **kw)), + ) + + +class MySQLDialect_asyncmy(MySQLDialect_pymysql): + driver = "asyncmy" + supports_statement_cache = True + + supports_server_side_cursors = True + _sscursor = AsyncAdapt_asyncmy_ss_cursor + + is_async = True + + @classmethod + def dbapi(cls): + return AsyncAdapt_asyncmy_dbapi( + __import__("asyncmy"), __import__("pymysql") + ) + + @classmethod + def get_pool_class(cls, url): + + async_fallback = url.query.get("async_fallback", False) + + if util.asbool(async_fallback): + return pool.FallbackAsyncAdaptedQueuePool + else: + return pool.AsyncAdaptedQueuePool + + def create_connect_args(self, url): + return super(MySQLDialect_asyncmy, self).create_connect_args( + url, _translate_args=dict(username="user", database="db") + ) + + def is_disconnect(self, e, connection, cursor): + if super(MySQLDialect_asyncmy, self).is_disconnect( + e, connection, cursor + ): + return True + else: + str_e = str(e).lower() + return ( + "not connected" in str_e or "network operation failed" in str_e + ) + + def _found_rows_client_flag(self): + from pymysql.constants import CLIENT + + return CLIENT.FOUND_ROWS + + +dialect = MySQLDialect_asyncmy diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index 9bf12e194c..04b0c1b6d9 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -2915,7 +2915,10 @@ class MySQLDialect(default.DefaultDialect): "WHERE TABLE_TYPE='SEQUENCE' and TABLE_NAME=:name AND " "TABLE_SCHEMA=:schema_name" ), - dict(name=sequence_name, schema_name=schema), + dict( + name=util.text_type(sequence_name), + schema_name=util.text_type(schema), + ), ) return cursor.first() is not None diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index 982ac498d4..c41a55025d 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -234,7 +234,7 @@ class ServerSideCursorsTest( elif self.engine.dialect.driver == "pymysql": sscursor = __import__("pymysql.cursors").cursors.SSCursor return isinstance(cursor, sscursor) - elif self.engine.dialect.driver == "aiomysql": + elif self.engine.dialect.driver in ("aiomysql", "asyncmy"): return cursor.server_side elif self.engine.dialect.driver == "mysqldb": sscursor = __import__("MySQLdb.cursors").cursors.SSCursor diff --git a/setup.cfg b/setup.cfg index 689539687b..8512e7abc0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -74,6 +74,9 @@ pymysql = aiomysql = %(asyncio)s aiomysql;python_version>="3" +asyncmy = + %(asyncio)s + asyncmy;python_version>="3" aiosqlite = %(asyncio)s aiosqlite;python_version>="3" @@ -111,11 +114,11 @@ exclude = .venv,.git,.tox,dist,doc,*egg,build import-order-style = google application-import-names = sqlalchemy,test per-file-ignores = - **/__init__.py:F401 - lib/sqlalchemy/events.py:F401 - lib/sqlalchemy/schema.py:F401 - lib/sqlalchemy/types.py:F401 - lib/sqlalchemy/sql/expression.py:F401 + **/__init__.py:F401 + lib/sqlalchemy/events.py:F401 + lib/sqlalchemy/schema.py:F401 + lib/sqlalchemy/types.py:F401 + lib/sqlalchemy/sql/expression.py:F401 [mypy] # min mypy version 0.800 @@ -164,6 +167,8 @@ mysql = mysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4 pymysql = mysql+pymysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4 aiomysql = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4 aiomysql_fallback = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true +asyncmy = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4 +asyncmy_fallback = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true mariadb = mariadb://scott:tiger@127.0.0.1:3306/test mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server mssql_pymssql = mssql+pymssql://scott:tiger@ms_2008 diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py index 0d466e26d6..7bdf6f8ceb 100644 --- a/test/dialect/mysql/test_types.py +++ b/test/dialect/mysql/test_types.py @@ -549,7 +549,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults): ([0, 0, 0, 0, i, i, i, i], None), ([0, 0, 0, 0, 0, j, j, j], None), ([0, 0, 0, 0, 0, 0, k, k], None), - ([0, 0, 0, 0, 0, 0, 0, l], None), + ([0, 0, 0, 0, 0, 0, 0, l], None, testing.fails_if("+asyncmy")), argnames="store, expected", ) def test_bit_50_roundtrip(self, connection, bit_table, store, expected): @@ -569,7 +569,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults): ([0, 0, 0, 0, i, i, i, i], None), ([0, 0, 0, 0, 0, j, j, j], None), ([0, 0, 0, 0, 0, 0, k, k], None), - ([0, 0, 0, 0, 0, 0, 0, l], None), + ([0, 0, 0, 0, 0, 0, 0, l], None, testing.fails_if("+asyncmy")), argnames="store, expected", ) def test_bit_50_roundtrip_reflected( diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 538ff8b896..ebcb8d520f 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -1386,6 +1386,7 @@ class InvalidateDuringResultTest(fixtures.TestBase): "+asyncpg", "+aiosqlite", "+aiomysql", + "+asyncmy", ], "Buffers the result set and doesn't check for connection close", ) diff --git a/tox.ini b/tox.ini index 544b27491e..67731240ad 100644 --- a/tox.ini +++ b/tox.ini @@ -31,6 +31,7 @@ deps= mysql: .[mysql] mysql: .[pymysql] mysql: git+https://github.com/sqlalchemy/aiomysql@sqlalchemy_tox; python_version >= '3' + mysql: .[asyncmy]; python_version >= '3' mysql: .[mariadb_connector]; python_version >= '3' oracle: .[oracle] @@ -102,9 +103,9 @@ setenv= py2{,7}-mysql: MYSQL={env:TOX_MYSQL_PY2K:{env:TOX_MYSQL:--db mysql}} mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql} - py3{,5,6,7,8,9}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver aiomysql} + py3{,5,6,7,8,9}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver aiomysql --dbdriver asyncmy} # omit aiomysql for Python 3.10 - py3{,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector} + py3{,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver asyncmy} mssql: MSSQL={env:TOX_MSSQL:--db mssql} -- 2.47.2