]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add `asyncmy` support
authorlong2ice <long2ice@gmail.com>
Thu, 16 Sep 2021 15:08:25 +0000 (11:08 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 17 Sep 2021 15:20:19 +0000 (11:20 -0400)
Added initial support for the ``asyncmy`` asyncio database driver for MySQL
and MariaDB. This driver is very new, however appears to be the only
current alternative to the ``aiomysql`` driver which currently appears to
be unmaintained and is not working with current Python versions. Much
thanks to long2ice for the pull request for this dialect.

Fixes: #6993
Closes: #7000
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/7000
Pull-request-sha: f7d6c811fc72324a83c8af635bbca8b268b0098e

Change-Id: I4ef54b43334feff7e3a710fc4de6821437f3bb68

doc/build/changelog/unreleased_14/6993.rst [new file with mode: 0644]
doc/build/dialects/mysql.rst
lib/sqlalchemy/dialects/mysql/__init__.py
lib/sqlalchemy/dialects/mysql/aiomysql.py
lib/sqlalchemy/dialects/mysql/asyncmy.py [new file with mode: 0644]
lib/sqlalchemy/dialects/mysql/base.py
lib/sqlalchemy/testing/suite/test_results.py
setup.cfg
test/dialect/mysql/test_types.py
test/engine/test_reconnect.py
tox.ini

diff --git a/doc/build/changelog/unreleased_14/6993.rst b/doc/build/changelog/unreleased_14/6993.rst
new file mode 100644 (file)
index 0000000..fd2122e
--- /dev/null
@@ -0,0 +1,13 @@
+.. change::
+    :tags: feature, asyncio, mysql
+    :tickets: 6993
+
+    Added initial support for the ``asyncmy`` asyncio database driver for MySQL
+    and MariaDB. This driver is very new, however appears to be the only
+    current alternative to the ``aiomysql`` driver which currently appears to
+    be unmaintained and is not working with current Python versions. Much
+    thanks to long2ice for the pull request for this dialect.
+
+    .. seealso::
+
+        :ref:`asyncmy`
index 573c2598c0eda2fc8ab23763660c490ffcce55a0..9eb7f5a7405cce72a15f92b03db339de868924c0 100644 (file)
@@ -189,6 +189,14 @@ MySQL-Connector
 
 .. automodule:: sqlalchemy.dialects.mysql.mysqlconnector
 
+.. _asyncmy:
+
+asyncmy
+-------
+
+.. automodule:: sqlalchemy.dialects.mysql.asyncmy
+
+
 .. _aiomysql:
 
 aiomysql
@@ -210,4 +218,3 @@ pyodbc
 ------
 
 .. automodule:: sqlalchemy.dialects.mysql.pyodbc
-
index 8631ea3b933b720b491d62cb9df46cadbfffa3ea..c83fec0c394d5686f327e0f5f68d1b5f35a5dd48 100644 (file)
@@ -54,12 +54,11 @@ from ...util import compat
 
 if compat.py3k:
     from . import aiomysql  # noqa
-
+    from . import asyncmy  # noqa
 
 # default dialect
 base.dialect = dialect = mysqldb.dialect
 
-
 __all__ = (
     "BIGINT",
     "BINARY",
index 3275d1888c3f127a5bc1f3145643911ec73c19a6..c9a87145e89fcde3543e402fe60cf9df73ec03fb 100644 (file)
@@ -11,6 +11,10 @@ r"""
     :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...]
     :url: https://github.com/aio-libs/aiomysql
 
+.. warning:: The aiomysql dialect as of September, 2021 appears to be unmaintained
+   and no longer functions for Python version 3.10.   Please refer to the
+   :ref:`asyncmy` dialect for current MySQL asyncio functionality.
+
 The aiomysql dialect is SQLAlchemy's second Python asyncio dialect.
 
 Using a special asyncio mediation layer, the aiomysql dialect is usable
@@ -21,13 +25,7 @@ This dialect should normally be used only with the
 :func:`_asyncio.create_async_engine` engine creation function::
 
     from sqlalchemy.ext.asyncio import create_async_engine
-    engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname")
-
-Unicode
--------
-
-Please see :ref:`mysql_unicode` for current recommendations on unicode
-handling.
+    engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4")
 
 
 """  # noqa
diff --git a/lib/sqlalchemy/dialects/mysql/asyncmy.py b/lib/sqlalchemy/dialects/mysql/asyncmy.py
new file mode 100644 (file)
index 0000000..f312cf7
--- /dev/null
@@ -0,0 +1,340 @@
+# mysql/asyncmy.py
+# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors <see AUTHORS
+# file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+r"""
+.. dialect:: mysql+asyncmy
+    :name: asyncmy
+    :dbapi: asyncmy
+    :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...]
+    :url: https://github.com/long2ice/asyncmy
+
+.. note:: The asyncmy dialect as of September, 2021 was added to provide
+   MySQL/MariaDB asyncio compatibility given that the :ref:`aiomysql` database
+   driver has become unmaintained, however asyncmy is itself very new.
+
+Using a special asyncio mediation layer, the asyncmy dialect is usable
+as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
+extension package.
+
+This dialect should normally be used only with the
+:func:`_asyncio.create_async_engine` engine creation function::
+
+    from sqlalchemy.ext.asyncio import create_async_engine
+    engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4")
+
+
+"""  # noqa
+
+import contextlib
+
+from .pymysql import MySQLDialect_pymysql
+from ... import pool
+from ... import util
+from ...util.concurrency import asyncio
+from ...util.concurrency import await_fallback
+from ...util.concurrency import await_only
+
+
+class AsyncAdapt_asyncmy_cursor:
+    server_side = False
+    __slots__ = (
+        "_adapt_connection",
+        "_connection",
+        "await_",
+        "_cursor",
+        "_rows",
+    )
+
+    def __init__(self, adapt_connection):
+        self._adapt_connection = adapt_connection
+        self._connection = adapt_connection._connection
+        self.await_ = adapt_connection.await_
+
+        cursor = self._connection.cursor()
+
+        self._cursor = self.await_(cursor.__aenter__())
+        self._rows = []
+
+    @property
+    def description(self):
+        return self._cursor.description
+
+    @property
+    def rowcount(self):
+        return self._cursor.rowcount
+
+    @property
+    def arraysize(self):
+        return self._cursor.arraysize
+
+    @arraysize.setter
+    def arraysize(self, value):
+        self._cursor.arraysize = value
+
+    @property
+    def lastrowid(self):
+        return self._cursor.lastrowid
+
+    def close(self):
+        # note we aren't actually closing the cursor here,
+        # we are just letting GC do it.   to allow this to be async
+        # we would need the Result to change how it does "Safe close cursor".
+        # MySQL "cursors" don't actually have state to be "closed" besides
+        # exhausting rows, which we already have done for sync cursor.
+        # another option would be to emulate aiosqlite dialect and assign
+        # cursor only if we are doing server side cursor operation.
+        self._rows[:] = []
+
+    def execute(self, operation, parameters=None):
+        return self.await_(self._execute_async(operation, parameters))
+
+    def executemany(self, operation, seq_of_parameters):
+        return self.await_(
+            self._executemany_async(operation, seq_of_parameters)
+        )
+
+    async def _execute_async(self, operation, parameters):
+        async with self._adapt_connection._mutex_and_adapt_errors():
+            if parameters is None:
+                result = await self._cursor.execute(operation)
+            else:
+                result = await self._cursor.execute(operation, parameters)
+
+            if not self.server_side:
+                # asyncmy has a "fake" async result, so we have to pull it out
+                # of that here since our default result is not async.
+                # we could just as easily grab "_rows" here and be done with it
+                # but this is safer.
+                self._rows = list(await self._cursor.fetchall())
+            return result
+
+    async def _executemany_async(self, operation, seq_of_parameters):
+        async with self._adapt_connection._mutex_and_adapt_errors():
+            return await self._cursor.executemany(operation, seq_of_parameters)
+
+    def setinputsizes(self, *inputsizes):
+        pass
+
+    def __iter__(self):
+        while self._rows:
+            yield self._rows.pop(0)
+
+    def fetchone(self):
+        if self._rows:
+            return self._rows.pop(0)
+        else:
+            return None
+
+    def fetchmany(self, size=None):
+        if size is None:
+            size = self.arraysize
+
+        retval = self._rows[0:size]
+        self._rows[:] = self._rows[size:]
+        return retval
+
+    def fetchall(self):
+        retval = self._rows[:]
+        self._rows[:] = []
+        return retval
+
+
+class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
+    __slots__ = ()
+    server_side = True
+
+    def __init__(self, adapt_connection):
+        self._adapt_connection = adapt_connection
+        self._connection = adapt_connection._connection
+        self.await_ = adapt_connection.await_
+
+        adapt_connection._ss_cursors.add(self)
+
+        cursor = self._connection.cursor(
+            adapt_connection.dbapi.asyncmy.cursors.SSCursor
+        )
+
+        self._cursor = self.await_(cursor.__aenter__())
+
+    def close(self):
+        try:
+            if self._cursor is not None:
+                self.await_(self._cursor.fetchall())
+                self.await_(self._cursor.close())
+                self._cursor = None
+        finally:
+            self._adapt_connection._ss_cursors.discard(self)
+
+    def fetchone(self):
+        return self.await_(self._cursor.fetchone())
+
+    def fetchmany(self, size=None):
+        return self.await_(self._cursor.fetchmany(size=size))
+
+    def fetchall(self):
+        return self.await_(self._cursor.fetchall())
+
+
+class AsyncAdapt_asyncmy_connection:
+    await_ = staticmethod(await_only)
+    __slots__ = ("dbapi", "_connection", "_execute_mutex", "_ss_cursors")
+
+    def __init__(self, dbapi, connection):
+        self.dbapi = dbapi
+        self._connection = connection
+        self._execute_mutex = asyncio.Lock()
+        self._ss_cursors = set()
+
+    @contextlib.asynccontextmanager
+    async def _mutex_and_adapt_errors(self):
+        async with self._execute_mutex:
+            try:
+                yield
+            except AttributeError:
+                raise self.dbapi.InternalError(
+                    "network operation failed due to asyncmy attribute error"
+                )
+
+    def ping(self, reconnect):
+        assert not reconnect
+        return self.await_(self._do_ping())
+
+    async def _do_ping(self):
+        async with self._mutex_and_adapt_errors():
+            return await self._connection.ping(False)
+
+    def character_set_name(self):
+        return self._connection.character_set_name()
+
+    def autocommit(self, value):
+        self.await_(self._connection.autocommit(value))
+
+    def cursor(self, server_side=False):
+        if server_side:
+            return AsyncAdapt_asyncmy_ss_cursor(self)
+        else:
+            return AsyncAdapt_asyncmy_cursor(self)
+
+    def _shutdown_ss_cursors(self):
+        for curs in list(self._ss_cursors):
+            curs.close()
+
+    def rollback(self):
+        self._shutdown_ss_cursors()
+        self.await_(self._connection.rollback())
+
+    def commit(self):
+        self._shutdown_ss_cursors()
+        self.await_(self._connection.commit())
+
+    def close(self):
+        self._shutdown_ss_cursors()
+        # it's not awaitable.
+        self._connection.close()
+
+
+class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
+    __slots__ = ()
+
+    await_ = staticmethod(await_fallback)
+
+
+class AsyncAdapt_asyncmy_dbapi:
+    def __init__(self, asyncmy, pymysql):
+        self.asyncmy = asyncmy
+        self.pymysql = pymysql
+        self.paramstyle = "format"
+        self._init_dbapi_attributes()
+
+    def _init_dbapi_attributes(self):
+        for name in (
+            "Warning",
+            "Error",
+            "InterfaceError",
+            "DataError",
+            "DatabaseError",
+            "OperationalError",
+            "InterfaceError",
+            "IntegrityError",
+            "ProgrammingError",
+            "InternalError",
+            "NotSupportedError",
+        ):
+            setattr(self, name, getattr(self.asyncmy.errors, name))
+
+        for name in (
+            "NUMBER",
+            "STRING",
+            "DATETIME",
+            "BINARY",
+            "TIMESTAMP",
+            "Binary",
+        ):
+            setattr(self, name, getattr(self.pymysql, name))
+
+    def connect(self, *arg, **kw):
+        async_fallback = kw.pop("async_fallback", False)
+
+        if util.asbool(async_fallback):
+            return AsyncAdaptFallback_asyncmy_connection(
+                self,
+                await_fallback(self.asyncmy.connect(*arg, **kw)),
+            )
+        else:
+            return AsyncAdapt_asyncmy_connection(
+                self,
+                await_only(self.asyncmy.connect(*arg, **kw)),
+            )
+
+
+class MySQLDialect_asyncmy(MySQLDialect_pymysql):
+    driver = "asyncmy"
+    supports_statement_cache = True
+
+    supports_server_side_cursors = True
+    _sscursor = AsyncAdapt_asyncmy_ss_cursor
+
+    is_async = True
+
+    @classmethod
+    def dbapi(cls):
+        return AsyncAdapt_asyncmy_dbapi(
+            __import__("asyncmy"), __import__("pymysql")
+        )
+
+    @classmethod
+    def get_pool_class(cls, url):
+
+        async_fallback = url.query.get("async_fallback", False)
+
+        if util.asbool(async_fallback):
+            return pool.FallbackAsyncAdaptedQueuePool
+        else:
+            return pool.AsyncAdaptedQueuePool
+
+    def create_connect_args(self, url):
+        return super(MySQLDialect_asyncmy, self).create_connect_args(
+            url, _translate_args=dict(username="user", database="db")
+        )
+
+    def is_disconnect(self, e, connection, cursor):
+        if super(MySQLDialect_asyncmy, self).is_disconnect(
+            e, connection, cursor
+        ):
+            return True
+        else:
+            str_e = str(e).lower()
+            return (
+                "not connected" in str_e or "network operation failed" in str_e
+            )
+
+    def _found_rows_client_flag(self):
+        from pymysql.constants import CLIENT
+
+        return CLIENT.FOUND_ROWS
+
+
+dialect = MySQLDialect_asyncmy
index 9bf12e194c12e967053568b3023d8e2b5794967c..04b0c1b6d9eff7cf8cc718034f66d0c5b8a63ab9 100644 (file)
@@ -2915,7 +2915,10 @@ class MySQLDialect(default.DefaultDialect):
                 "WHERE TABLE_TYPE='SEQUENCE' and TABLE_NAME=:name AND "
                 "TABLE_SCHEMA=:schema_name"
             ),
-            dict(name=sequence_name, schema_name=schema),
+            dict(
+                name=util.text_type(sequence_name),
+                schema_name=util.text_type(schema),
+            ),
         )
         return cursor.first() is not None
 
index 982ac498d4017644feff72a6c1570779f935419d..c41a55025d68a46d65ed70e20e17aa70b822a5e7 100644 (file)
@@ -234,7 +234,7 @@ class ServerSideCursorsTest(
         elif self.engine.dialect.driver == "pymysql":
             sscursor = __import__("pymysql.cursors").cursors.SSCursor
             return isinstance(cursor, sscursor)
-        elif self.engine.dialect.driver == "aiomysql":
+        elif self.engine.dialect.driver in ("aiomysql", "asyncmy"):
             return cursor.server_side
         elif self.engine.dialect.driver == "mysqldb":
             sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
index 689539687b7c320887af0d49a4c174818ae3eb07..8512e7abc07523d4b169444e76dabb3929328124 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -74,6 +74,9 @@ pymysql =
 aiomysql =
     %(asyncio)s
     aiomysql;python_version>="3"
+asyncmy =
+    %(asyncio)s
+    asyncmy;python_version>="3"
 aiosqlite =
     %(asyncio)s
     aiosqlite;python_version>="3"
@@ -111,11 +114,11 @@ exclude = .venv,.git,.tox,dist,doc,*egg,build
 import-order-style = google
 application-import-names = sqlalchemy,test
 per-file-ignores =
-                **/__init__.py:F401
-                lib/sqlalchemy/events.py:F401
-                lib/sqlalchemy/schema.py:F401
-                lib/sqlalchemy/types.py:F401
-                lib/sqlalchemy/sql/expression.py:F401
+    **/__init__.py:F401
+    lib/sqlalchemy/events.py:F401
+    lib/sqlalchemy/schema.py:F401
+    lib/sqlalchemy/types.py:F401
+    lib/sqlalchemy/sql/expression.py:F401
 
 [mypy]
 # min mypy version 0.800
@@ -164,6 +167,8 @@ mysql = mysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
 pymysql = mysql+pymysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
 aiomysql = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
 aiomysql_fallback = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
+asyncmy = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
+asyncmy_fallback = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
 mariadb = mariadb://scott:tiger@127.0.0.1:3306/test
 mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server
 mssql_pymssql = mssql+pymssql://scott:tiger@ms_2008
index 0d466e26d63a9d31f998dd2e78c5a560a3015c3d..7bdf6f8ceb7557592ecad0cc3cbd99920986de6c 100644 (file)
@@ -549,7 +549,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults):
         ([0, 0, 0, 0, i, i, i, i], None),
         ([0, 0, 0, 0, 0, j, j, j], None),
         ([0, 0, 0, 0, 0, 0, k, k], None),
-        ([0, 0, 0, 0, 0, 0, 0, l], None),
+        ([0, 0, 0, 0, 0, 0, 0, l], None, testing.fails_if("+asyncmy")),
         argnames="store, expected",
     )
     def test_bit_50_roundtrip(self, connection, bit_table, store, expected):
@@ -569,7 +569,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults):
         ([0, 0, 0, 0, i, i, i, i], None),
         ([0, 0, 0, 0, 0, j, j, j], None),
         ([0, 0, 0, 0, 0, 0, k, k], None),
-        ([0, 0, 0, 0, 0, 0, 0, l], None),
+        ([0, 0, 0, 0, 0, 0, 0, l], None, testing.fails_if("+asyncmy")),
         argnames="store, expected",
     )
     def test_bit_50_roundtrip_reflected(
index 538ff8b896db0e9000054001a3a7c13ed13ac159..ebcb8d520f3d9855084f7d88c68bb84b728ec111 100644 (file)
@@ -1386,6 +1386,7 @@ class InvalidateDuringResultTest(fixtures.TestBase):
             "+asyncpg",
             "+aiosqlite",
             "+aiomysql",
+            "+asyncmy",
         ],
         "Buffers the result set and doesn't check for connection close",
     )
diff --git a/tox.ini b/tox.ini
index 544b27491ec5855b03af4fc2c1968c85d344ccc3..67731240aded08e4a734961f2a4d03971a92f7b4 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -31,6 +31,7 @@ deps=
      mysql: .[mysql]
      mysql: .[pymysql]
      mysql: git+https://github.com/sqlalchemy/aiomysql@sqlalchemy_tox; python_version >= '3'
+     mysql: .[asyncmy]; python_version >= '3'
      mysql: .[mariadb_connector]; python_version >= '3'
 
      oracle: .[oracle]
@@ -102,9 +103,9 @@ setenv=
     py2{,7}-mysql: MYSQL={env:TOX_MYSQL_PY2K:{env:TOX_MYSQL:--db mysql}}
     mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql}
 
-    py3{,5,6,7,8,9}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver aiomysql}
+    py3{,5,6,7,8,9}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver aiomysql --dbdriver asyncmy}
     # omit aiomysql for Python 3.10
-    py3{,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector}
+    py3{,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver asyncmy}
 
 
     mssql: MSSQL={env:TOX_MSSQL:--db mssql}