From 26140c08111da9833dd2eff0b5091494f253db46 Mon Sep 17 00:00:00 2001 From: Federico Caselli Date: Tue, 31 Aug 2021 23:03:18 +0200 Subject: [PATCH] Surface driver connection object when using a proxied dialect Improve the interface used by adapted drivers, like the asyncio ones, to access the actual connection object returned by the driver. The :class:`_engine._ConnectionRecord` and :class:`_engine._ConnectionFairy` now have two new attributes: * ``dbapi_connection`` always represents a DBAPI compatible object. For pep-249 drivers, this is the DBAPI connection as it always has been, previously accessed under the ``.connection`` attribute. For asyncio drivers that SQLAlchemy adapts into a pep-249 interface, the returned object will normally be a SQLAlchemy adaption object called :class:`_engine.AdaptedConnection`. * ``driver_connection`` always represents the actual connection object maintained by the third party pep-249 DBAPI or async driver in use. For standard pep-249 DBAPIs, this will always be the same object as that of the ``dbapi_connection``. For an asyncio driver, it will be the underlying asyncio-only connection object. The ``.connection`` attribute remains available and is now a legacy alias of ``.dbapi_connection``. Fixes: #6832 Change-Id: Ib72f97deefca96dce4e61e7c38ba430068d6a82e --- README.dialects.rst | 12 + doc/build/changelog/unreleased_14/6832.rst | 28 +++ doc/build/core/connections.rst | 6 + doc/build/core/internals.rst | 2 + doc/build/core/pooling.rst | 3 +- doc/build/faq/connections.rst | 95 +++++++- lib/sqlalchemy/connectors/pyodbc.py | 4 +- lib/sqlalchemy/dialects/mysql/aiomysql.py | 8 +- lib/sqlalchemy/dialects/mysql/asyncmy.py | 6 +- lib/sqlalchemy/dialects/mysql/base.py | 4 +- lib/sqlalchemy/dialects/oracle/cx_oracle.py | 4 +- lib/sqlalchemy/dialects/postgresql/asyncpg.py | 6 +- lib/sqlalchemy/dialects/postgresql/pg8000.py | 8 +- .../dialects/postgresql/psycopg2.py | 4 +- lib/sqlalchemy/dialects/sqlite/aiosqlite.py | 6 +- lib/sqlalchemy/dialects/sqlite/pysqlite.py | 8 +- lib/sqlalchemy/engine/__init__.py | 1 + lib/sqlalchemy/engine/base.py | 5 + lib/sqlalchemy/engine/default.py | 5 +- lib/sqlalchemy/engine/interfaces.py | 37 ++- lib/sqlalchemy/ext/asyncio/engine.py | 12 +- lib/sqlalchemy/pool/base.py | 217 +++++++++++++----- lib/sqlalchemy/pool/events.py | 30 +++ lib/sqlalchemy/pool/impl.py | 4 +- lib/sqlalchemy/testing/engines.py | 2 +- lib/sqlalchemy/testing/plugin/pytestplugin.py | 45 +++- lib/sqlalchemy/util/langhelpers.py | 2 +- test/dialect/postgresql/test_dialect.py | 10 +- test/engine/test_execute.py | 18 +- test/engine/test_pool.py | 196 ++++++++++------ test/engine/test_reconnect.py | 24 +- test/ext/asyncio/test_engine_py3k.py | 4 +- 32 files changed, 621 insertions(+), 195 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/6832.rst diff --git a/README.dialects.rst b/README.dialects.rst index 50fcdb7f58..810267a20c 100644 --- a/README.dialects.rst +++ b/README.dialects.rst @@ -183,6 +183,18 @@ Key aspects of this file layout include: # [ODBC Microsoft Access Driver] Optional feature not implemented. return +AsyncIO dialects +---------------- + +As of version 1.4 SQLAlchemy supports also dialects that use +asyncio drivers to interface with the database backend. + +SQLAlchemy's approach to asyncio drivers is that the connection and cursor +objects of the driver (if any) are adapted into a pep-249 compliant interface, +using the ``AdaptedConnection`` interface class. Refer to the internal asyncio +driver implementations such as that of ``asyncpg``, ``asyncmy`` and +``aiosqlite`` for examples. + Going Forward ============== diff --git a/doc/build/changelog/unreleased_14/6832.rst b/doc/build/changelog/unreleased_14/6832.rst new file mode 100644 index 0000000000..fcc5b6f6ef --- /dev/null +++ b/doc/build/changelog/unreleased_14/6832.rst @@ -0,0 +1,28 @@ +.. change:: + :tags: engine, asyncio + :tickets: 6832 + + Improve the interface used by adapted drivers, like the asyncio ones, + to access the actual connection object returned by the driver. + + The :class:`._ConnectionFairy` object has two new attributes: + + * :attr:`._ConnectionFairy.dbapi_connection` always represents a DBAPI + compatible object. For pep-249 drivers, this is the DBAPI connection as + it always has been, previously accessed under the ``.connection`` + attribute. For asyncio drivers that SQLAlchemy adapts into a pep-249 + interface, the returned object will normally be a SQLAlchemy adaption + object called :class:`_engine.AdaptedConnection`. + * :attr:`._ConnectionFairy.driver_connection` always represents the actual + connection object maintained by the third party pep-249 DBAPI or async + driver in use. For standard pep-249 DBAPIs, this will always be the same + object as that of the ``dbapi_connection``. For an asyncio driver, it + will be the underlying asyncio-only connection object. + + The ``.connection`` attribute remains available and is now a legacy alias + of ``.dbapi_connection``. + + .. seealso:: + + :ref:`faq_dbapi_connection` + diff --git a/doc/build/core/connections.rst b/doc/build/core/connections.rst index ac9ee717fb..52197a7958 100644 --- a/doc/build/core/connections.rst +++ b/doc/build/core/connections.rst @@ -1743,6 +1743,12 @@ needed and they also vary highly dependent on the type of DBAPI in use, so in any case the direct DBAPI calling pattern is always there for those cases where it is needed. +.. seealso:: + + :ref:`faq_dbapi_connection` - includes additional details about how + the DBAPI connection is accessed as well as the "driver" connection + when using asyncio drivers. + Some recipes for DBAPI connection use follow. .. _stored_procedures: diff --git a/doc/build/core/internals.rst b/doc/build/core/internals.rst index 34bf0407cd..074acc798d 100644 --- a/doc/build/core/internals.rst +++ b/doc/build/core/internals.rst @@ -49,3 +49,5 @@ Some key internal constructs are listed here. :members: +.. autoclass:: sqlalchemy.engine.AdaptedConnection + :members: diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index 0c01d3f453..878a9ccab6 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -154,6 +154,7 @@ to disable under the following conditions: * If the pool itself doesn't maintain a connection after it's checked in, such as when using :class:`.NullPool`, the behavior can be disabled. * Otherwise, it must be ensured that: + * the application ensures that all :class:`_engine.Connection` objects are explicitly closed out using a context manager (i.e. ``with`` block) or a ``try/finally`` style block @@ -518,7 +519,7 @@ are three general approaches to this: def checkout(dbapi_connection, connection_record, connection_proxy): pid = os.getpid() if connection_record.info['pid'] != pid: - connection_record.connection = connection_proxy.connection = None + connection_record.dbapi_connection = connection_proxy.dbapi_connection = None raise exc.DisconnectionError( "Connection record belongs to pid %s, " "attempting to check out in pid %s" % diff --git a/doc/build/faq/connections.rst b/doc/build/faq/connections.rst index 79e8804c17..1bee24c324 100644 --- a/doc/build/faq/connections.rst +++ b/doc/build/faq/connections.rst @@ -406,28 +406,107 @@ current SQLAlchemy versions. :ref:`pysqlite_threading_pooling` - info on PySQLite's behavior. +.. _faq_dbapi_connection: + How do I get at the raw DBAPI connection when using an Engine? -------------------------------------------------------------- With a regular SA engine-level Connection, you can get at a pool-proxied version of the DBAPI connection via the :attr:`_engine.Connection.connection` attribute on :class:`_engine.Connection`, and for the really-real DBAPI connection you can call the -:attr:`.ConnectionFairy.connection` attribute on that - but there should never be any need to access -the non-pool-proxied DBAPI connection, as all methods are proxied through:: +:attr:`._ConnectionFairy.dbapi_connection` attribute on that. On regular sync drivers +there is usually no need to access the non-pool-proxied DBAPI connection, +as all methods are proxied through:: engine = create_engine(...) conn = engine.connect() - conn.connection. - cursor_obj = conn.connection.cursor() + + # pep-249 style ConnectionFairy connection pool proxy object + connection_fairy = conn.connection + + # typically to run statements one would get a cursor() from this + # object + cursor_obj = connection_fairy.cursor() + # ... work with cursor_obj + + # to bypass "connection_fairy", such as to set attributes on the + # unproxied pep-249 DBAPI connection, use .dbapi_connection + raw_dbapi_connection = connection_fairy.dbapi_connection + + # the same thing is available as .driver_connection (more on this + # in the next section) + also_raw_dbapi_connection = connection_fairy.driver_connection + +.. versionchanged:: 1.4.24 Added the + :attr:`._ConnectionFairy.dbapi_connection` attribute, + which supersedes the previous + :attr:`._ConnectionFairy.connection` attribute which still remains + available; this attribute always provides a pep-249 synchronous style + connection object. The :attr:`._ConnectionFairy.driver_connection` + attribute is also added which will always refer to the real driver-level + connection regardless of what API it presents. + +Accessing the underlying connnection for an asyncio driver +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +When an asyncio driver is in use, there are two changes to the above +scheme. The first is that when using an :class:`_asyncio.AsyncConnection`, +the :class:`._ConnectionFairy` must be accessed using the awaitable method +:meth:`_asyncio.AsyncConnection.get_raw_connection`. The +returned :class:`._ConnectionFairy` in this case retains a sync-style +pep-249 usage pattern, and the :attr:`._ConnectionFairy.dbapi_connection` +attribute refers to a +a SQLAlchemy-adapted connection object which adapts the asyncio +connection to a sync style pep-249 API, in other words there are *two* levels +of proxying going on when using an asyncio driver. The actual asyncio connection +is available from the :class:`._ConnectionFairy.driver_connection` attribute. +To restate the previous example in terms of asyncio looks like:: + + async def main(): + engine = create_async_engine(...) + conn = await engine.connect() + + # pep-249 style ConnectionFairy connection pool proxy object + # presents a sync interface + connection_fairy = await conn.get_raw_connection() + + # beneath that proxy is a second proxy which adapts the + # asyncio driver into a pep-249 connection object, accessible + # via .dbapi_connection as is the same with a sync API + sqla_sync_conn = connection_fairy.dbapi_connection + + # the really-real innermost driver connection is available + # from the .driver_connection attribute + raw_asyncio_connection = connection_fairy.driver_connection + + # work with raw asyncio connection + result = await raw_asyncio_connection.execute(...) + +.. versionchanged:: 1.4.24 Added the + :attr:`._ConnectionFairy.dbapi_connection` + and :attr:`._ConnectionFairy.driver_connection` attributes to allow access + to pep-249 connections, pep-249 adaption layers, and underlying driver + connections using a consistent interface. + +When using asyncio drivers, the above "DBAPI" connection is actually a +SQLAlchemy-adapted form of connection which presents a synchronous-style +pep-249 style API. To access the actual +asyncio driver connection, which will present the original asyncio API +of the driver in use, this can be accessed via the +:attr:`._ConnectionFairy.driver_connection` attribute of +:class:`._ConnectionFairy`. +For a standard pep-249 driver, :attr:`._ConnectionFairy.dbapi_connection` +and :attr:`._ConnectionFairy.driver_connection` are synonymous. You must ensure that you revert any isolation level settings or other operation-specific settings on the connection back to normal before returning it to the pool. -As an alternative to reverting settings, you can call the :meth:`_engine.Connection.detach` method on -either :class:`_engine.Connection` or the proxied connection, which will de-associate -the connection from the pool such that it will be closed and discarded -when :meth:`_engine.Connection.close` is called:: +As an alternative to reverting settings, you can call the +:meth:`_engine.Connection.detach` method on either :class:`_engine.Connection` +or the proxied connection, which will de-associate the connection from the pool +such that it will be closed and discarded when :meth:`_engine.Connection.close` +is called:: conn = engine.connect() conn.detach() # detaches the DBAPI connection from the connection pool diff --git a/lib/sqlalchemy/connectors/pyodbc.py b/lib/sqlalchemy/connectors/pyodbc.py index ed7260d6b0..c2bbdf7ce9 100644 --- a/lib/sqlalchemy/connectors/pyodbc.py +++ b/lib/sqlalchemy/connectors/pyodbc.py @@ -183,8 +183,8 @@ class PyODBCConnector(Connector): # adjust for ConnectionFairy being present # allows attribute set e.g. "connection.autocommit = True" # to work properly - if hasattr(connection, "connection"): - connection = connection.connection + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection if level == "AUTOCOMMIT": connection.autocommit = True diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py index c9a87145e8..c5ba635c24 100644 --- a/lib/sqlalchemy/dialects/mysql/aiomysql.py +++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py @@ -13,7 +13,7 @@ r""" .. warning:: The aiomysql dialect as of September, 2021 appears to be unmaintained and no longer functions for Python version 3.10. Please refer to the - :ref:`asyncmy` dialect for current MySQL asyncio functionality. + :ref:`asyncmy` dialect for current MySQL/MariaDD asyncio functionality. The aiomysql dialect is SQLAlchemy's second Python asyncio dialect. @@ -33,6 +33,7 @@ This dialect should normally be used only with the from .pymysql import MySQLDialect_pymysql from ... import pool from ... import util +from ...engine import AdaptedConnection from ...util.concurrency import asyncio from ...util.concurrency import await_fallback from ...util.concurrency import await_only @@ -173,7 +174,7 @@ class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor): return self.await_(self._cursor.fetchall()) -class AsyncAdapt_aiomysql_connection: +class AsyncAdapt_aiomysql_connection(AdaptedConnection): await_ = staticmethod(await_only) __slots__ = ("dbapi", "_connection", "_execute_mutex") @@ -306,5 +307,8 @@ class MySQLDialect_aiomysql(MySQLDialect_pymysql): return CLIENT.FOUND_ROWS + def get_driver_connection(self, connection): + return connection._connection + dialect = MySQLDialect_aiomysql diff --git a/lib/sqlalchemy/dialects/mysql/asyncmy.py b/lib/sqlalchemy/dialects/mysql/asyncmy.py index cde43398d2..fb96cd6866 100644 --- a/lib/sqlalchemy/dialects/mysql/asyncmy.py +++ b/lib/sqlalchemy/dialects/mysql/asyncmy.py @@ -31,6 +31,7 @@ This dialect should normally be used only with the from .pymysql import MySQLDialect_pymysql from ... import pool from ... import util +from ...engine import AdaptedConnection from ...util.concurrency import asynccontextmanager from ...util.concurrency import asyncio from ...util.concurrency import await_fallback @@ -177,7 +178,7 @@ class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor): return self.await_(self._cursor.fetchall()) -class AsyncAdapt_asyncmy_connection: +class AsyncAdapt_asyncmy_connection(AdaptedConnection): await_ = staticmethod(await_only) __slots__ = ("dbapi", "_connection", "_execute_mutex", "_ss_cursors") @@ -335,5 +336,8 @@ class MySQLDialect_asyncmy(MySQLDialect_pymysql): return CLIENT.FOUND_ROWS + def get_driver_connection(self, connection): + return connection._connection + dialect = MySQLDialect_asyncmy diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index 04b0c1b6d9..2bba2f81a7 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -2687,8 +2687,8 @@ class MySQLDialect(default.DefaultDialect): # adjust for ConnectionFairy being present # allows attribute set e.g. "connection.autocommit = True" # to work properly - if hasattr(connection, "connection"): - connection = connection.connection + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection self._set_isolation_level(connection, level) diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py index aab2018bf9..3e705dced3 100644 --- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py +++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py @@ -1085,8 +1085,8 @@ class OracleDialect_cx_oracle(OracleDialect): return result def set_isolation_level(self, connection, level): - if hasattr(connection, "connection"): - dbapi_connection = connection.connection + if hasattr(connection, "dbapi_connection"): + dbapi_connection = connection.dbapi_connection else: dbapi_connection = connection if level == "AUTOCOMMIT": diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 825558f26a..dc3da224ca 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -121,6 +121,7 @@ from ... import exc from ... import pool from ... import processors from ... import util +from ...engine import AdaptedConnection from ...sql import sqltypes from ...util.concurrency import asyncio from ...util.concurrency import await_fallback @@ -566,7 +567,7 @@ class AsyncAdapt_asyncpg_ss_cursor(AsyncAdapt_asyncpg_cursor): ) -class AsyncAdapt_asyncpg_connection: +class AsyncAdapt_asyncpg_connection(AdaptedConnection): __slots__ = ( "dbapi", "_connection", @@ -1045,5 +1046,8 @@ class PGDialect_asyncpg(PGDialect): return connect + def get_driver_connection(self, connection): + return connection._connection + dialect = PGDialect_asyncpg diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py index 3d1051b7dd..d42dd9560d 100644 --- a/lib/sqlalchemy/dialects/postgresql/pg8000.py +++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py @@ -433,8 +433,8 @@ class PGDialect_pg8000(PGDialect): level = level.replace("_", " ") # adjust for ConnectionFairy possibly being present - if hasattr(connection, "connection"): - connection = connection.connection + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection if level == "AUTOCOMMIT": connection.autocommit = True @@ -498,8 +498,8 @@ class PGDialect_pg8000(PGDialect): def set_client_encoding(self, connection, client_encoding): # adjust for ConnectionFairy possibly being present - if hasattr(connection, "connection"): - connection = connection.connection + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection cursor = connection.cursor() cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'") diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index c80198825e..a5a56cb6b0 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -982,8 +982,8 @@ class PGDialect_psycopg2(PGDialect): @util.memoized_instancemethod def _hstore_oids(self, conn): extras = self._psycopg2_extras() - if hasattr(conn, "connection"): - conn = conn.connection + if hasattr(conn, "dbapi_connection"): + conn = conn.dbapi_connection oids = extras.HstoreAdapter.get_oids(conn) if oids is not None and oids[0]: return oids[0:2] diff --git a/lib/sqlalchemy/dialects/sqlite/aiosqlite.py b/lib/sqlalchemy/dialects/sqlite/aiosqlite.py index eb750b0e7f..4319e26611 100644 --- a/lib/sqlalchemy/dialects/sqlite/aiosqlite.py +++ b/lib/sqlalchemy/dialects/sqlite/aiosqlite.py @@ -41,6 +41,7 @@ from .base import SQLiteExecutionContext from .pysqlite import SQLiteDialect_pysqlite from ... import pool from ... import util +from ...engine import AdaptedConnection from ...util.concurrency import await_fallback from ...util.concurrency import await_only @@ -162,7 +163,7 @@ class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): return self.await_(self._cursor.fetchall()) -class AsyncAdapt_aiosqlite_connection: +class AsyncAdapt_aiosqlite_connection(AdaptedConnection): await_ = staticmethod(await_only) __slots__ = ("dbapi", "_connection") @@ -328,5 +329,8 @@ class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): return super().is_disconnect(e, connection, cursor) + def get_driver_connection(self, connection): + return connection._connection + dialect = SQLiteDialect_aiosqlite diff --git a/lib/sqlalchemy/dialects/sqlite/pysqlite.py b/lib/sqlalchemy/dialects/sqlite/pysqlite.py index 0f96e88303..e9d5d96827 100644 --- a/lib/sqlalchemy/dialects/sqlite/pysqlite.py +++ b/lib/sqlalchemy/dialects/sqlite/pysqlite.py @@ -499,8 +499,8 @@ class SQLiteDialect_pysqlite(SQLiteDialect): ) def set_isolation_level(self, connection, level): - if hasattr(connection, "connection"): - dbapi_connection = connection.connection + if hasattr(connection, "dbapi_connection"): + dbapi_connection = connection.dbapi_connection else: dbapi_connection = connection @@ -521,8 +521,8 @@ class SQLiteDialect_pysqlite(SQLiteDialect): return re.search(a, b) is not None def set_regexp(connection): - if hasattr(connection, "connection"): - dbapi_connection = connection.connection + if hasattr(connection, "dbapi_connection"): + dbapi_connection = connection.dbapi_connection else: dbapi_connection = connection dbapi_connection.create_function( diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py index 3761f5005a..6306e201d0 100644 --- a/lib/sqlalchemy/engine/__init__.py +++ b/lib/sqlalchemy/engine/__init__.py @@ -33,6 +33,7 @@ from .cursor import CursorResult from .cursor import FullyBufferedResultProxy from .cursor import LegacyCursorResult from .cursor import ResultProxy +from .interfaces import AdaptedConnection from .interfaces import Compiled from .interfaces import Connectable from .interfaces import CreateEnginePlugin diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 25ced0343e..2444b5c7fe 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -440,6 +440,11 @@ class Connection(Connectable): def connection(self): """The underlying DB-API connection managed by this Connection. + This is a SQLAlchemy connection-pool proxied connection + which then has the attribute + :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the + actual driver connection. + .. seealso:: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 8bd8a121b3..eff28e3400 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -646,7 +646,7 @@ class DefaultDialect(interfaces.Dialect): % (", ".join(name for name, obj in trans_objs)) ) - dbapi_connection = connection.connection.connection + dbapi_connection = connection.connection.dbapi_connection for name, characteristic, value in characteristic_values: characteristic.set_characteristic(self, dbapi_connection, value) connection.connection._connection_record.finalize_callback.append( @@ -779,6 +779,9 @@ class DefaultDialect(interfaces.Dialect): name = unicode(name) # noqa return name + def get_driver_connection(self, connection): + return connection + class _RendersLiteral(object): def literal_processor(self, dialect): diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 8379c731a8..d1484718eb 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -1056,7 +1056,21 @@ class Dialect(object): .. versionadded:: 1.0.3 """ - pass + + def get_driver_connection(self, connection): + """Returns the connection object as returned by the external driver + package. + + For normal dialects that use a DBAPI compliant driver this call + will just return the ``connection`` passed as argument. + For dialects that instead adapt a non DBAPI compliant driver, like + when adapting an asyncio driver, this call will return the + connection-like object as returned by the driver. + + .. versionadded:: 1.4.24 + + """ + raise NotImplementedError() class CreateEnginePlugin(object): @@ -1719,3 +1733,24 @@ class ExceptionContext(object): .. versionadded:: 1.0.3 """ + + +class AdaptedConnection(object): + """Interface of an adapted connection object to support the DBAPI protocol. + + Used by asyncio dialects to provide a sync-style pep-249 facade on top + of the asyncio connection/cursor API provided by the driver. + + .. versionadded:: 1.4.24 + + """ + + __slots__ = ("_connection",) + + @property + def driver_connection(self): + """The connection object as returned by the driver after a connect.""" + return self._connection + + def __repr__(self): + return "" % self._connection diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index ab29438ed0..a9e43a65f8 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -113,7 +113,6 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable): def connection(self): """Not implemented for async; call :meth:`_asyncio.AsyncConnection.get_raw_connection`. - """ raise exc.InvalidRequestError( "AsyncConnection.connection accessor is not implemented as the " @@ -125,9 +124,14 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable): """Return the pooled DBAPI-level connection in use by this :class:`_asyncio.AsyncConnection`. - This is typically the SQLAlchemy connection-pool proxied connection - which then has an attribute .connection that refers to the actual - DBAPI-level connection. + This is a SQLAlchemy connection-pool proxied connection + which then has the attribute + :attr:`_pool._ConnectionFairy.driver_connection` that refers to the + actual driver connection. Its + :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead + to an :class:`_engine.AdaptedConnection` instance that + adapts the driver connection to the DBAPI protocol. + """ conn = self._sync_connection() diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index db63dfec88..38b0f67cb8 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -52,6 +52,9 @@ class _ConnDialect(object): "passed to the connection pool." ) + def get_driver_connection(self, connection): + return connection + class _AsyncConnDialect(_ConnDialect): is_async = True @@ -374,15 +377,63 @@ class _ConnectionRecord(object): starttime = None - connection = None + dbapi_connection = None """A reference to the actual DBAPI connection being tracked. May be ``None`` if this :class:`._ConnectionRecord` has been marked as invalidated; a new DBAPI connection may replace it if the owning pool calls upon this :class:`._ConnectionRecord` to reconnect. + For adapted drivers, like the Asyncio implementations, this is a + :class:`.AdaptedConnection` that adapts the driver connection + to the DBAPI protocol. + Use :attr:`._ConnectionRecord.driver_connection` to obtain the + connection objected returned by the driver. + + .. versionadded:: 1.4.24 + """ + @property + def driver_connection(self): + """The connection object as returned by the driver after a connect. + + For normal sync drivers that support the DBAPI protocol, this object + is the same as the one referenced by + :attr:`._ConnectionRecord.dbapi_connection`. + + For adapted drivers, like the Asyncio ones, this is the actual object + that was returned by the driver ``connect`` call. + + As :attr:`._ConnectionRecord.dbapi_connection` it may be ``None`` + if this :class:`._ConnectionRecord` has been marked as invalidated. + + .. versionadded:: 1.4.24 + + """ + + if self.dbapi_connection is None: + return None + else: + return self.__pool._dialect.get_driver_connection( + self.dbapi_connection + ) + + @property + def connection(self): + """An alias to :attr:`._ConnectionRecord.dbapi_connection`. + + This alias is deprecated, please use the new name. + + .. deprecated:: 1.4.24 + + """ + return self.dbapi_connection + + @connection.setter + def connection(self, value): + self.dbapi_connection = value + _soft_invalidate_time = 0 @util.memoized_property @@ -461,7 +512,7 @@ class _ConnectionRecord(object): util.warn("Double checkin attempted on %s" % self) return self.fairy_ref = None - connection = self.connection + connection = self.dbapi_connection pool = self.__pool while self.finalize_callback: finalizer = self.finalize_callback.pop() @@ -480,11 +531,12 @@ class _ConnectionRecord(object): return self.starttime def close(self): - if self.connection is not None: + if self.dbapi_connection is not None: self.__close() def invalidate(self, e=None, soft=False): - """Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`. + """Invalidate the DBAPI connection held by this + :class:`._ConnectionRecord`. This method is called for all connection invalidations, including when the :meth:`._ConnectionFairy.invalidate` or @@ -492,10 +544,11 @@ class _ConnectionRecord(object): as well as when any so-called "automatic invalidation" condition occurs. - :param e: an exception object indicating a reason for the invalidation. + :param e: an exception object indicating a reason for the + invalidation. :param soft: if True, the connection isn't closed; instead, this - connection will be recycled on next checkout. + connection will be recycled on next checkout. .. versionadded:: 1.0.3 @@ -505,17 +558,19 @@ class _ConnectionRecord(object): """ # already invalidated - if self.connection is None: + if self.dbapi_connection is None: return if soft: - self.__pool.dispatch.soft_invalidate(self.connection, self, e) + self.__pool.dispatch.soft_invalidate( + self.dbapi_connection, self, e + ) else: - self.__pool.dispatch.invalidate(self.connection, self, e) + self.__pool.dispatch.invalidate(self.dbapi_connection, self, e) if e is not None: self.__pool.logger.info( "%sInvalidate connection %r (reason: %s:%s)", "Soft " if soft else "", - self.connection, + self.dbapi_connection, e.__class__.__name__, e, ) @@ -523,14 +578,14 @@ class _ConnectionRecord(object): self.__pool.logger.info( "%sInvalidate connection %r", "Soft " if soft else "", - self.connection, + self.dbapi_connection, ) if soft: self._soft_invalidate_time = time.time() else: self.__close() - self.connection = None + self.dbapi_connection = None def get_connection(self): recycle = False @@ -547,7 +602,7 @@ class _ConnectionRecord(object): # within 16 milliseconds accuracy, so unit tests for connection # invalidation need a sleep of at least this long between initial start # time and invalidation for the logic below to work reliably. - if self.connection is None: + if self.dbapi_connection is None: self.info.clear() self.__connect() elif ( @@ -555,21 +610,22 @@ class _ConnectionRecord(object): and time.time() - self.starttime > self.__pool._recycle ): self.__pool.logger.info( - "Connection %r exceeded timeout; recycling", self.connection + "Connection %r exceeded timeout; recycling", + self.dbapi_connection, ) recycle = True elif self.__pool._invalidate_time > self.starttime: self.__pool.logger.info( "Connection %r invalidated due to pool invalidation; " + "recycling", - self.connection, + self.dbapi_connection, ) recycle = True elif self._soft_invalidate_time > self.starttime: self.__pool.logger.info( "Connection %r invalidated due to local soft invalidation; " + "recycling", - self.connection, + self.dbapi_connection, ) recycle = True @@ -578,11 +634,11 @@ class _ConnectionRecord(object): self.info.clear() self.__connect() - return self.connection + return self.dbapi_connection def _is_hard_or_soft_invalidated(self): return ( - self.connection is None + self.dbapi_connection is None or self.__pool._invalidate_time > self.starttime or (self._soft_invalidate_time > self.starttime) ) @@ -590,21 +646,20 @@ class _ConnectionRecord(object): def __close(self): self.finalize_callback.clear() if self.__pool.dispatch.close: - self.__pool.dispatch.close(self.connection, self) - self.__pool._close_connection(self.connection) - self.connection = None + self.__pool.dispatch.close(self.dbapi_connection, self) + self.__pool._close_connection(self.dbapi_connection) + self.dbapi_connection = None def __connect(self): pool = self.__pool # ensure any existing connection is removed, so that if # creator fails, this attribute stays None - self.connection = None + self.dbapi_connection = None try: self.starttime = time.time() - connection = pool._invoke_creator(self) + self.dbapi_connection = connection = pool._invoke_creator(self) pool.logger.debug("Created new connection %r", connection) - self.connection = connection self.fresh = True except Exception as e: with util.safe_reraise(): @@ -615,17 +670,17 @@ class _ConnectionRecord(object): if pool.dispatch.first_connect: pool.dispatch.first_connect.for_modify( pool.dispatch - ).exec_once_unless_exception(self.connection, self) + ).exec_once_unless_exception(self.dbapi_connection, self) # init of the dialect now takes place within the connect # event, so ensure a mutex is used on the first run pool.dispatch.connect.for_modify( pool.dispatch - )._exec_w_sync_on_first_run(self.connection, self) + )._exec_w_sync_on_first_run(self.dbapi_connection, self) def _finalize_fairy( - connection, + dbapi_connection, connection_record, pool, ref, # this is None when called directly, not by the gc @@ -650,8 +705,8 @@ def _finalize_fairy( if ref is not None: if connection_record.fairy_ref is not ref: return - assert connection is None - connection = connection_record.connection + assert dbapi_connection is None + dbapi_connection = connection_record.dbapi_connection # null pool is not _is_asyncio but can be used also with async dialects dont_restore_gced = pool._dialect.is_async @@ -663,11 +718,11 @@ def _finalize_fairy( detach = not connection_record can_manipulate_connection = True - if connection is not None: + if dbapi_connection is not None: if connection_record and echo: pool.logger.debug( "Connection %r being returned to pool%s", - connection, + dbapi_connection, ", transaction state was already reset by caller" if not reset else "", @@ -675,9 +730,11 @@ def _finalize_fairy( try: fairy = fairy or _ConnectionFairy( - connection, connection_record, echo + dbapi_connection, + connection_record, + echo, ) - assert fairy.connection is connection + assert fairy.dbapi_connection is dbapi_connection if reset and can_manipulate_connection: fairy._reset(pool) @@ -688,9 +745,9 @@ def _finalize_fairy( if can_manipulate_connection: if pool.dispatch.close_detached: - pool.dispatch.close_detached(connection) + pool.dispatch.close_detached(dbapi_connection) - pool._close_connection(connection) + pool._close_connection(dbapi_connection) else: message = ( "The garbage collector is trying to clean up " @@ -700,7 +757,7 @@ def _finalize_fairy( "connections when they are no longer used, calling " "``close()`` or using a context manager to " "manage their lifetime." - ) % connection + ) % dbapi_connection pool.logger.error(message) util.warn(message) @@ -746,12 +803,24 @@ class _ConnectionFairy(object): """ def __init__(self, dbapi_connection, connection_record, echo): - self.connection = dbapi_connection + self.dbapi_connection = dbapi_connection self._connection_record = connection_record self._echo = echo - connection = None - """A reference to the actual DBAPI connection being tracked.""" + dbapi_connection = None + """A reference to the actual DBAPI connection being tracked. + + .. versionadded:: 1.4.24 + + .. seealso:: + + :attr:`._ConnectionFairy.driver_connection` + + :attr:`._ConnectionRecord.dbapi_connection` + + :ref:`faq_dbapi_connection` + + """ _connection_record = None """A reference to the :class:`._ConnectionRecord` object associated @@ -761,6 +830,38 @@ class _ConnectionFairy(object): """ + @property + def driver_connection(self): + """The connection object as returned by the driver after a connect. + + .. versionadded:: 1.4.24 + + .. seealso:: + + :attr:`._ConnectionFairy.dbapi_connection` + + :attr:`._ConnectionRecord.driver_connection` + + :ref:`faq_dbapi_connection` + + """ + return self._connection_record.driver_connection + + @property + def connection(self): + """An alias to :attr:`._ConnectionFairy.dbapi_connection`. + + This alias is deprecated, please use the new name. + + .. deprecated:: 1.4.24 + + """ + return self.dbapi_connection + + @connection.setter + def connection(self, value): + self.dbapi_connection = value + @classmethod def _checkout(cls, pool, threadconns=None, fairy=None): if not fairy: @@ -772,7 +873,7 @@ class _ConnectionFairy(object): if threadconns is not None: threadconns.current = weakref.ref(fairy) - if fairy.connection is None: + if fairy.dbapi_connection is None: raise exc.InvalidRequestError("This connection is closed") fairy._counter += 1 if ( @@ -795,25 +896,25 @@ class _ConnectionFairy(object): if fairy._echo: pool.logger.debug( "Pool pre-ping on connection %s", - fairy.connection, + fairy.dbapi_connection, ) - result = pool._dialect.do_ping(fairy.connection) + result = pool._dialect.do_ping(fairy.dbapi_connection) if not result: if fairy._echo: pool.logger.debug( "Pool pre-ping on connection %s failed, " "will invalidate pool", - fairy.connection, + fairy.dbapi_connection, ) raise exc.InvalidatePoolError() elif fairy._echo: pool.logger.debug( "Connection %s is fresh, skipping pre-ping", - fairy.connection, + fairy.dbapi_connection, ) pool.dispatch.checkout( - fairy.connection, fairy._connection_record, fairy + fairy.dbapi_connection, fairy._connection_record, fairy ) return fairy except exc.DisconnectionError as e: @@ -830,12 +931,12 @@ class _ConnectionFairy(object): pool.logger.info( "Disconnection detected on checkout, " "invalidating individual connection %s (reason: %r)", - fairy.connection, + fairy.dbapi_connection, e, ) fairy._connection_record.invalidate(e) try: - fairy.connection = ( + fairy.dbapi_connection = ( fairy._connection_record.get_connection() ) except Exception as err: @@ -863,7 +964,7 @@ class _ConnectionFairy(object): def _checkin(self, reset=True): _finalize_fairy( - self.connection, + self.dbapi_connection, self._connection_record, self._pool, None, @@ -871,7 +972,7 @@ class _ConnectionFairy(object): reset=reset, fairy=self, ) - self.connection = None + self.dbapi_connection = None self._connection_record = None _close = _checkin @@ -882,14 +983,14 @@ class _ConnectionFairy(object): if pool._reset_on_return is reset_rollback: if self._echo: pool.logger.debug( - "Connection %s rollback-on-return", self.connection + "Connection %s rollback-on-return", self.dbapi_connection ) pool._dialect.do_rollback(self) elif pool._reset_on_return is reset_commit: if self._echo: pool.logger.debug( "Connection %s commit-on-return", - self.connection, + self.dbapi_connection, ) pool._dialect.do_commit(self) @@ -902,7 +1003,7 @@ class _ConnectionFairy(object): """Return True if this :class:`._ConnectionFairy` still refers to an active DBAPI connection.""" - return self.connection is not None + return self.dbapi_connection is not None @util.memoized_property def info(self): @@ -963,13 +1064,13 @@ class _ConnectionFairy(object): """ - if self.connection is None: + if self.dbapi_connection is None: util.warn("Can't invalidate an already-closed connection.") return if self._connection_record: self._connection_record.invalidate(e=e, soft=soft) if not soft: - self.connection = None + self.dbapi_connection = None self._checkin() def cursor(self, *args, **kwargs): @@ -979,10 +1080,10 @@ class _ConnectionFairy(object): method. """ - return self.connection.cursor(*args, **kwargs) + return self.dbapi_connection.cursor(*args, **kwargs) def __getattr__(self, key): - return getattr(self.connection, key) + return getattr(self.dbapi_connection, key) def detach(self): """Separate this connection from its Pool. @@ -1000,14 +1101,14 @@ class _ConnectionFairy(object): if self._connection_record is not None: rec = self._connection_record rec.fairy_ref = None - rec.connection = None + rec.dbapi_connection = None # TODO: should this be _return_conn? self._pool._do_return_conn(self._connection_record) self.info = self.info.copy() self._connection_record = None if self._pool.dispatch.detach: - self._pool.dispatch.detach(self.connection, rec) + self._pool.dispatch.detach(self.dbapi_connection, rec) def close(self): self._counter -= 1 diff --git a/lib/sqlalchemy/pool/events.py b/lib/sqlalchemy/pool/events.py index 18ef28fa55..7c2cae7c5e 100644 --- a/lib/sqlalchemy/pool/events.py +++ b/lib/sqlalchemy/pool/events.py @@ -71,6 +71,7 @@ class PoolEvents(event.Events): to produce a new DBAPI connection. :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. :param connection_record: the :class:`._ConnectionRecord` managing the DBAPI connection. @@ -95,6 +96,7 @@ class PoolEvents(event.Events): encoding settings, collation settings, and many others. :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. :param connection_record: the :class:`._ConnectionRecord` managing the DBAPI connection. @@ -105,6 +107,7 @@ class PoolEvents(event.Events): """Called when a connection is retrieved from the Pool. :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. :param connection_record: the :class:`._ConnectionRecord` managing the DBAPI connection. @@ -132,6 +135,7 @@ class PoolEvents(event.Events): for detached connections. (They do not return to the pool.) :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. :param connection_record: the :class:`._ConnectionRecord` managing the DBAPI connection. @@ -153,6 +157,7 @@ class PoolEvents(event.Events): cases where the connection is discarded immediately after reset. :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. :param connection_record: the :class:`._ConnectionRecord` managing the DBAPI connection. @@ -176,6 +181,7 @@ class PoolEvents(event.Events): connection occurs. :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. :param connection_record: the :class:`._ConnectionRecord` managing the DBAPI connection. @@ -205,6 +211,15 @@ class PoolEvents(event.Events): .. versionadded:: 1.0.3 + :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. + + :param connection_record: the :class:`._ConnectionRecord` managing the + DBAPI connection. + + :param exception: the exception object corresponding to the reason + for this invalidation, if any. May be ``None``. + """ def close(self, dbapi_connection, connection_record): @@ -222,6 +237,12 @@ class PoolEvents(event.Events): .. versionadded:: 1.1 + :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. + + :param connection_record: the :class:`._ConnectionRecord` managing the + DBAPI connection. + """ def detach(self, dbapi_connection, connection_record): @@ -232,6 +253,12 @@ class PoolEvents(event.Events): .. versionadded:: 1.1 + :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. + + :param connection_record: the :class:`._ConnectionRecord` managing the + DBAPI connection. + """ def close_detached(self, dbapi_connection): @@ -245,4 +272,7 @@ class PoolEvents(event.Events): .. versionadded:: 1.1 + :param dbapi_connection: a DBAPI connection. + The :attr:`._ConnectionRecord.dbapi_connection` attribute. + """ diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py index 8a34123854..3ef33d02d2 100644 --- a/lib/sqlalchemy/pool/impl.py +++ b/lib/sqlalchemy/pool/impl.py @@ -410,7 +410,7 @@ class StaticPool(Pool): def dispose(self): if ( "connection" in self.__dict__ - and self.connection.connection is not None + and self.connection.dbapi_connection is not None ): self.connection.close() del self.__dict__["connection"] @@ -432,7 +432,7 @@ class StaticPool(Pool): # used by the test suite to make a new engine / pool without # losing the state of an existing SQLite :memory: connection self._invoke_creator = ( - lambda crec: other_static_pool.connection.connection + lambda crec: other_static_pool.connection.dbapi_connection ) def _create_connection(self): diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index 7d78dcc1af..a54f70c5e0 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -65,7 +65,7 @@ class ConnectionKiller(object): for rec in list(self.proxy_refs): if rec is not None and rec.is_valid: - self.dbapi_connections.discard(rec.connection) + self.dbapi_connections.discard(rec.dbapi_connection) self._safe(rec._checkin) # for fairy refs that were GCed and could not close the connection, diff --git a/lib/sqlalchemy/testing/plugin/pytestplugin.py b/lib/sqlalchemy/testing/plugin/pytestplugin.py index 4e82f10c19..d28048f70f 100644 --- a/lib/sqlalchemy/testing/plugin/pytestplugin.py +++ b/lib/sqlalchemy/testing/plugin/pytestplugin.py @@ -358,6 +358,7 @@ _current_class = None def pytest_runtest_setup(item): from sqlalchemy.testing import asyncio + from sqlalchemy.util import string_types if not isinstance(item, pytest.Function): return @@ -378,13 +379,38 @@ def pytest_runtest_setup(item): _current_class = item.parent.parent def finalize(): - global _current_class + global _current_class, _current_report _current_class = None - asyncio._maybe_async_provisioning( - plugin_base.stop_test_class_outside_fixtures, - item.parent.parent.cls, - ) + try: + asyncio._maybe_async_provisioning( + plugin_base.stop_test_class_outside_fixtures, + item.parent.parent.cls, + ) + except Exception as e: + # in case of an exception during teardown attach the original + # error to the exception message, otherwise it will get lost + if _current_report.failed: + if not e.args: + e.args = ( + "__Original test failure__:\n" + + _current_report.longreprtext, + ) + elif e.args[-1] and isinstance(e.args[-1], string_types): + args = list(e.args) + args[-1] += ( + "\n__Original test failure__:\n" + + _current_report.longreprtext + ) + e.args = tuple(args) + else: + e.args += ( + "__Original test failure__", + _current_report.longreprtext, + ) + raise + finally: + _current_report = None item.parent.parent.addfinalizer(finalize) @@ -404,6 +430,15 @@ def pytest_runtest_call(item): ) +_current_report = None + + +def pytest_runtest_logreport(report): + global _current_report + if report.when == "call": + _current_report = report + + def pytest_runtest_teardown(item, nextitem): # runs inside of pytest function fixture scope # after test function runs diff --git a/lib/sqlalchemy/util/langhelpers.py b/lib/sqlalchemy/util/langhelpers.py index 8036ea3e21..89ca4c1ebf 100644 --- a/lib/sqlalchemy/util/langhelpers.py +++ b/lib/sqlalchemy/util/langhelpers.py @@ -1453,7 +1453,7 @@ class hybridmethod(object): """Decorate a function as cls- or instance- level.""" def __init__(self, func): - self.func = func + self.func = self.__func__ = func self.clslevel = func def __get__(self, instance, owner): diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index a86da8ce7d..155019aaea 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -906,7 +906,7 @@ class MiscBackendTest( with testing.db.connect().execution_options( isolation_level="SERIALIZABLE" ) as conn: - dbapi_conn = conn.connection.connection + dbapi_conn = conn.connection.dbapi_connection is_false(dbapi_conn.autocommit) @@ -959,7 +959,7 @@ class MiscBackendTest( conn.execute(text("select 1")).scalar() - dbapi_conn = conn.connection.connection + dbapi_conn = conn.connection.dbapi_connection cursor = dbapi_conn.cursor() cursor.execute("show transaction_read_only") @@ -994,7 +994,7 @@ class MiscBackendTest( conn.execute(text("Select 1")).scalar() - dbapi_conn = conn.connection.connection + dbapi_conn = conn.connection.dbapi_connection cursor = dbapi_conn.cursor() cursor.execute("show transaction_deferrable") @@ -1026,7 +1026,7 @@ class MiscBackendTest( with engine.connect() as conn: conn.execute(text("select 1")).scalar() - dbapi_conn = conn.connection.connection + dbapi_conn = conn.connection.dbapi_connection cursor = dbapi_conn.cursor() cursor.execute("show transaction_read_only") @@ -1056,7 +1056,7 @@ class MiscBackendTest( with engine.connect() as conn: # asyncpg but not for deferrable? which the PG docs actually # state. weird - dbapi_conn = conn.connection.connection + dbapi_conn = conn.connection.dbapi_connection cursor = dbapi_conn.cursor() cursor.execute("show transaction_deferrable") diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 791e42bc0f..4a14cbcca0 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1577,7 +1577,8 @@ class EngineEventsTest(fixtures.TestBase): # but sqlite file backend will also have independent # connections here. its_the_same_connection = ( - c1.connection.connection is c2.connection.connection + c1.connection.dbapi_connection + is c2.connection.dbapi_connection ) c1.close() c2.close() @@ -2791,9 +2792,9 @@ class HandleErrorTest(fixtures.TestBase): for crec in crecs: if crec is target_crec or not set_to_false: - is_not(crec.connection, crec.get_connection()) + is_not(crec.dbapi_connection, crec.get_connection()) else: - is_(crec.connection, crec.get_connection()) + is_(crec.dbapi_connection, crec.get_connection()) def test_alter_invalidate_pool_to_false(self): self._test_alter_invalidate_pool_to_false(True) @@ -3136,11 +3137,10 @@ class OnConnectTest(fixtures.TestBase): ), ) eng = create_engine(u1, poolclass=QueuePool) - eq_( - eng.name, "sqlite" - ) # make sure other dialects aren't getting pulled in here + # make sure other dialects aren't getting pulled in here + eq_(eng.name, "sqlite") c = eng.connect() - dbapi_conn_one = c.connection.connection + dbapi_conn_one = c.connection.dbapi_connection c.close() eq_( @@ -3156,7 +3156,7 @@ class OnConnectTest(fixtures.TestBase): ) c2 = eng.connect() - dbapi_conn_two = c2.connection.connection + dbapi_conn_two = c2.connection.dbapi_connection is_not(dbapi_conn_one, dbapi_conn_two) @@ -3421,7 +3421,7 @@ class DialectEventTest(fixtures.TestBase): eq_(conn.info["bap"], "two") # returned our mock connection - is_(conn.connection.connection, m1.our_connect()) + is_(conn.connection.dbapi_connection, m1.our_connect()) def test_connect_do_connect_info_there_after_recycle(self): # test that info is maintained after the do_connect() diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index fa5bd8cffb..43ec9cc3ff 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -159,7 +159,7 @@ class PoolTest(PoolTestBase): self.assert_("foo2" in c.info) c2 = p.connect() - is_not(c.connection, c2.connection) + is_not(c.dbapi_connection, c2.dbapi_connection) assert not c2.info assert "foo2" in c.info @@ -200,9 +200,11 @@ class PoolTest(PoolTestBase): r1 = pool._ConnectionRecord(p1, connect=False) - assert not r1.connection + assert not r1.dbapi_connection c1 = r1.get_connection() + is_(c1, r1.dbapi_connection) is_(c1, r1.connection) + is_(c1, r1.driver_connection) def test_rec_close_reopen(self): # test that _ConnectionRecord.close() allows @@ -212,19 +214,19 @@ class PoolTest(PoolTestBase): r1 = pool._ConnectionRecord(p1) - c1 = r1.connection + c1 = r1.dbapi_connection c2 = r1.get_connection() is_(c1, c2) r1.close() - assert not r1.connection + assert not r1.dbapi_connection eq_(c1.mock_calls, [call.close()]) c2 = r1.get_connection() is_not(c1, c2) - is_(c2, r1.connection) + is_(c2, r1.dbapi_connection) eq_(c2.mock_calls, []) @@ -290,17 +292,17 @@ class PoolTest(PoolTestBase): (pool.StaticPool, None), (pool.AssertionPool, None), ) - def test_is_asyncio_from_dialect(self, pool_cls, is_async_king): + def test_is_asyncio_from_dialect(self, pool_cls, is_async_kind): p = pool_cls(creator=object()) for is_async in (True, False): if is_async: p._dialect = _AsyncConnDialect() else: - p._dialect = _ConnDialect - if is_async_king is None: + p._dialect = _ConnDialect() + if is_async_kind is None: eq_(p._is_asyncio, is_async) else: - eq_(p._is_asyncio, is_async_king) + eq_(p._is_asyncio, is_async_kind) @testing.combinations( (pool.QueuePool, False), @@ -314,6 +316,69 @@ class PoolTest(PoolTestBase): def test_is_asyncio_from_dialect_cls(self, pool_cls, is_async): eq_(pool_cls._is_asyncio, is_async) + def test_rec_fairy_default_dialect(self): + dbapi = MockDBAPI() + p1 = pool.Pool(creator=lambda: dbapi.connect("foo.db")) + + rec = pool._ConnectionRecord(p1) + + is_not_none(rec.dbapi_connection) + + is_(rec.connection, rec.dbapi_connection) + is_(rec.driver_connection, rec.dbapi_connection) + + fairy = pool._ConnectionFairy(rec.dbapi_connection, rec, False) + + is_not_none(fairy.dbapi_connection) + is_(fairy.connection, fairy.dbapi_connection) + is_(fairy.driver_connection, fairy.dbapi_connection) + + is_(fairy.dbapi_connection, rec.dbapi_connection) + is_(fairy.driver_connection, rec.driver_connection) + + def test_rec_fairy_adapted_dialect(self): + dbapi = MockDBAPI() + + mock_dc = object() + + class _AdaptedDialect(_ConnDialect): + def get_driver_connection(self, connection): + return mock_dc + + p1 = pool.Pool( + creator=lambda: dbapi.connect("foo.db"), dialect=_AdaptedDialect() + ) + + rec = pool._ConnectionRecord(p1) + + is_not_none(rec.dbapi_connection) + + is_(rec.connection, rec.dbapi_connection) + is_(rec.driver_connection, mock_dc) + + fairy = pool._ConnectionFairy(rec.dbapi_connection, rec, False) + + is_not_none(fairy.dbapi_connection) + is_(fairy.connection, fairy.dbapi_connection) + is_(fairy.driver_connection, mock_dc) + + is_(fairy.dbapi_connection, rec.dbapi_connection) + is_(fairy.driver_connection, mock_dc) + + def test_connection_setter(self): + dbapi = MockDBAPI() + p1 = pool.Pool(creator=lambda: dbapi.connect("foo.db")) + + rec = pool._ConnectionRecord(p1) + + is_not_none(rec.dbapi_connection) + + is_(rec.connection, rec.dbapi_connection) + rec.connection = 42 + is_(rec.connection, rec.dbapi_connection) + rec.dbapi_connection = 99 + is_(rec.connection, rec.dbapi_connection) + class PoolDialectTest(PoolTestBase): def _dialect(self): @@ -334,6 +399,9 @@ class PoolDialectTest(PoolTestBase): canary.append("CL") dbapi_connection.close() + def get_driver_connection(self, connection): + return connection + return PoolDialect(), canary def _do_test(self, pool_cls, assertion): @@ -469,7 +537,7 @@ class PoolEventsTest(PoolTestBase): c1 = p.connect() - connection = c1.connection + connection = c1.dbapi_connection rec = c1._connection_record c1.close() @@ -484,7 +552,7 @@ class PoolEventsTest(PoolTestBase): c1 = p.connect() - connection = c1.connection + connection = c1.dbapi_connection rec = c1._connection_record c1.detach() @@ -496,7 +564,7 @@ class PoolEventsTest(PoolTestBase): c1 = p.connect() - connection = c1.connection + connection = c1.dbapi_connection c1.detach() @@ -633,7 +701,7 @@ class PoolEventsTest(PoolTestBase): c1.close() assert not canary.called c1 = p.connect() - dbapi_con = c1.connection + dbapi_con = c1.dbapi_connection c1.invalidate(soft=True) assert canary.call_args_list[0][0][0] is dbapi_con assert canary.call_args_list[0][0][2] is None @@ -645,7 +713,7 @@ class PoolEventsTest(PoolTestBase): c1.close() assert not canary.called c1 = p.connect() - dbapi_con = c1.connection + dbapi_con = c1.dbapi_connection exc = Exception("hi") c1.invalidate(exc, soft=True) assert canary.call_args_list[0][0][0] is dbapi_con @@ -658,7 +726,7 @@ class PoolEventsTest(PoolTestBase): c1.close() assert not canary.called c1 = p.connect() - dbapi_con = c1.connection + dbapi_con = c1.dbapi_connection c1.invalidate() assert canary.call_args_list[0][0][0] is dbapi_con assert canary.call_args_list[0][0][2] is None @@ -670,7 +738,7 @@ class PoolEventsTest(PoolTestBase): c1.close() assert not canary.called c1 = p.connect() - dbapi_con = c1.connection + dbapi_con = c1.dbapi_connection exc = Exception("hi") c1.invalidate(exc) assert canary.call_args_list[0][0][0] is dbapi_con @@ -682,7 +750,7 @@ class PoolEventsTest(PoolTestBase): c1 = p.connect() - dbapi_connection = weakref.ref(c1.connection) + dbapi_connection = weakref.ref(c1.dbapi_connection) eq_(canary, []) del c1 @@ -1199,7 +1267,7 @@ class QueuePoolTest(PoolTestBase): """Test that a concurrent ConnectionRecord.invalidate() which occurs after the ConnectionFairy has called _ConnectionRecord.checkout() - but before the ConnectionFairy tests "fairy.connection is None" + but before the ConnectionFairy tests "fairy.dbapi_connection is None" will not result in an InvalidRequestError. This use case assumes that a listener on the checkout() event @@ -1231,7 +1299,7 @@ class QueuePoolTest(PoolTestBase): _decorate_existing_checkout, ): conn = p.connect() - is_(conn._connection_record.connection, None) + is_(conn._connection_record.dbapi_connection, None) conn.close() @testing.requires.threading_with_mock @@ -1282,8 +1350,8 @@ class QueuePoolTest(PoolTestBase): ) c1 = p.connect() c2 = p.connect() - c1_con = c1.connection - c2_con = c2.connection + c1_con = c1.dbapi_connection + c2_con = c2.dbapi_connection c1.close() @@ -1306,7 +1374,7 @@ class QueuePoolTest(PoolTestBase): # ...and that's the one we'll get back next. c3 = p.connect() - assert c3.connection is c2_con + assert c3.dbapi_connection is c2_con @testing.requires.threading_with_mock @testing.requires.timing_intensive @@ -1327,7 +1395,7 @@ class QueuePoolTest(PoolTestBase): def _conn(): c = p.connect() - strong_refs.add(c.connection) + strong_refs.add(c.dbapi_connection) return c for j in range(5): @@ -1355,44 +1423,44 @@ class QueuePoolTest(PoolTestBase): pool_size=1, max_overflow=0, recycle=30 ) c1 = p.connect() - c_ref = weakref.ref(c1.connection) + c_ref = weakref.ref(c1.dbapi_connection) c1.close() mock.return_value = 10001 c2 = p.connect() - is_(c2.connection, c_ref()) + is_(c2.dbapi_connection, c_ref()) c2.close() mock.return_value = 10035 c3 = p.connect() - is_not(c3.connection, c_ref()) + is_not(c3.dbapi_connection, c_ref()) @testing.requires.timing_intensive def test_recycle_on_invalidate(self): p = self._queuepool_fixture(pool_size=1, max_overflow=0) c1 = p.connect() - c_ref = weakref.ref(c1.connection) + c_ref = weakref.ref(c1.dbapi_connection) c1.close() c2 = p.connect() - is_(c2.connection, c_ref()) + is_(c2.dbapi_connection, c_ref()) c2_rec = c2._connection_record p._invalidate(c2) - assert c2_rec.connection is None + assert c2_rec.dbapi_connection is None c2.close() time.sleep(0.5) c3 = p.connect() - is_not(c3.connection, c_ref()) + is_not(c3.dbapi_connection, c_ref()) @testing.requires.timing_intensive def test_recycle_on_soft_invalidate(self): p = self._queuepool_fixture(pool_size=1, max_overflow=0) c1 = p.connect() - c_ref = weakref.ref(c1.connection) + c_ref = weakref.ref(c1.dbapi_connection) c1.close() c2 = p.connect() - is_(c2.connection, c_ref()) + is_(c2.dbapi_connection, c_ref()) c2_rec = c2._connection_record @@ -1401,14 +1469,14 @@ class QueuePoolTest(PoolTestBase): time.sleep(0.1) c2.invalidate(soft=True) - is_(c2_rec.connection, c2.connection) + is_(c2_rec.dbapi_connection, c2.dbapi_connection) c2.close() c3 = p.connect() - is_not(c3.connection, c_ref()) + is_not(c3.dbapi_connection, c_ref()) is_(c3._connection_record, c2_rec) - is_(c2_rec.connection, c3.connection) + is_(c2_rec.dbapi_connection, c3.dbapi_connection) def _no_wr_finalize(self): finalize_fairy = pool._finalize_fairy @@ -1500,7 +1568,7 @@ class QueuePoolTest(PoolTestBase): assert_raises(Exception, p.connect) p._pool.queue = collections.deque( - [c for c in p._pool.queue if c.connection is not None] + [c for c in p._pool.queue if c.dbapi_connection is not None] ) dbapi.shutdown(False) @@ -1546,7 +1614,7 @@ class QueuePoolTest(PoolTestBase): assert_raises(Exception, p.connect) p._pool.queue = collections.deque( - [c for c in p._pool.queue if c.connection is not None] + [c for c in p._pool.queue if c.dbapi_connection is not None] ) dbapi.shutdown(False) @@ -1581,7 +1649,7 @@ class QueuePoolTest(PoolTestBase): raise tsa.exc.DisconnectionError() conn = pool.connect() - old_dbapi_conn = conn.connection + old_dbapi_conn = conn.dbapi_connection conn.close() eq_(old_dbapi_conn.mock_calls, [call.rollback()]) @@ -1589,7 +1657,7 @@ class QueuePoolTest(PoolTestBase): old_dbapi_conn.boom = "yes" conn = pool.connect() - dbapi_conn = conn.connection + dbapi_conn = conn.dbapi_connection del conn gc_collect() @@ -1681,15 +1749,15 @@ class QueuePoolTest(PoolTestBase): def test_invalidate(self): p = self._queuepool_fixture(pool_size=1, max_overflow=0) c1 = p.connect() - c_id = c1.connection.id + c_id = c1.dbapi_connection.id c1.close() c1 = None c1 = p.connect() - assert c1.connection.id == c_id + assert c1.dbapi_connection.id == c_id c1.invalidate() c1 = None c1 = p.connect() - assert c1.connection.id != c_id + assert c1.dbapi_connection.id != c_id def test_recreate(self): p = self._queuepool_fixture( @@ -1707,16 +1775,16 @@ class QueuePoolTest(PoolTestBase): dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0) c1 = p.connect() - c_id = c1.connection.id + c_id = c1.dbapi_connection.id c1.close() c1 = None c1 = p.connect() - assert c1.connection.id == c_id + assert c1.dbapi_connection.id == c_id dbapi.raise_error = True c1.invalidate() c1 = None c1 = p.connect() - assert c1.connection.id != c_id + assert c1.dbapi_connection.id != c_id def test_detach(self): dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0) @@ -1725,7 +1793,7 @@ class QueuePoolTest(PoolTestBase): c2 = p.connect() # noqa eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")]) - c1_con = c1.connection + c1_con = c1.dbapi_connection assert c1_con is not None eq_(c1_con.close.call_count, 0) c1.close() @@ -1735,14 +1803,14 @@ class QueuePoolTest(PoolTestBase): dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0) c1 = p.connect() - c1_con = c1.connection + c1_con = c1.dbapi_connection c1.invalidate() - assert c1.connection is None + assert c1.dbapi_connection is None eq_(c1_con.close.call_count, 1) c2 = p.connect() - assert c2.connection is not c1_con - c2_con = c2.connection + assert c2.dbapi_connection is not c1_con + c2_con = c2.dbapi_connection c2.close() eq_(c2_con.close.call_count, 0) @@ -1776,21 +1844,21 @@ class QueuePoolTest(PoolTestBase): for i in range(5): pc1 = p.connect() - is_(pc1.connection, c3) + is_(pc1.dbapi_connection, c3) pc1.close() pc1 = p.connect() - is_(pc1.connection, c3) + is_(pc1.dbapi_connection, c3) pc2 = p.connect() - is_(pc2.connection, c2) + is_(pc2.dbapi_connection, c2) pc2.close() pc3 = p.connect() - is_(pc3.connection, c2) + is_(pc3.dbapi_connection, c2) pc2 = p.connect() - is_(pc2.connection, c1) + is_(pc2.dbapi_connection, c1) pc2.close() pc3.close() @@ -1814,21 +1882,21 @@ class QueuePoolTest(PoolTestBase): pc3.close() pc1 = p.connect() - is_(pc1.connection, c1) + is_(pc1.dbapi_connection, c1) pc1.close() pc1 = p.connect() - is_(pc1.connection, c2) + is_(pc1.dbapi_connection, c2) pc2 = p.connect() - is_(pc2.connection, c3) + is_(pc2.dbapi_connection, c3) pc2.close() pc3 = p.connect() - is_(pc3.connection, c1) + is_(pc3.dbapi_connection, c1) pc2 = p.connect() - is_(pc2.connection, c3) + is_(pc2.dbapi_connection, c3) pc2.close() pc3.close() @@ -1904,7 +1972,7 @@ class SingletonThreadPoolTest(PoolTestBase): def _conn(): c = p.connect() - sr.add(c.connection) + sr.add(c.dbapi_connection) return c else: @@ -1948,7 +2016,7 @@ class SingletonThreadPoolTest(PoolTestBase): p = pool.SingletonThreadPool(creator=creator, pool_size=3) c1 = p.connect() - mock_conn = c1.connection + mock_conn = c1.dbapi_connection c2 = p.connect() is_(c1, c2) @@ -2019,11 +2087,11 @@ class StaticPoolTest(PoolTestBase): p = pool.StaticPool(creator) c1 = p.connect() - conn = c1.connection + conn = c1.dbapi_connection c1.close() c2 = p.connect() - is_(conn, c2.connection) + is_(conn, c2.dbapi_connection) class CreatorCompatibilityTest(PoolTestBase): diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index ebcb8d520f..51da845b39 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -191,7 +191,7 @@ class PrePingMockTest(fixtures.TestBase): ) conn = pool.connect() - dbapi_conn = conn.connection + dbapi_conn = conn.dbapi_connection eq_(dbapi_conn.mock_calls, []) conn.close() @@ -199,7 +199,7 @@ class PrePingMockTest(fixtures.TestBase): eq_(dbapi_conn.mock_calls, [call.rollback()]) conn = pool.connect() - is_(conn.connection, dbapi_conn) + is_(conn.dbapi_connection, dbapi_conn) # ping, so cursor() call. eq_(dbapi_conn.mock_calls, [call.rollback(), call.cursor()]) @@ -207,7 +207,7 @@ class PrePingMockTest(fixtures.TestBase): conn.close() conn = pool.connect() - is_(conn.connection, dbapi_conn) + is_(conn.dbapi_connection, dbapi_conn) # ping, so cursor() call. eq_( @@ -223,33 +223,33 @@ class PrePingMockTest(fixtures.TestBase): ) conn = pool.connect() - dbapi_conn = conn.connection + dbapi_conn = conn.dbapi_connection conn_rec = conn._connection_record eq_(dbapi_conn.mock_calls, []) conn.close() conn = pool.connect() - is_(conn.connection, dbapi_conn) + is_(conn.dbapi_connection, dbapi_conn) # ping, so cursor() call. eq_(dbapi_conn.mock_calls, [call.rollback(), call.cursor()]) conn.invalidate() - is_(conn.connection, None) + is_(conn.dbapi_connection, None) # connect again, make sure we're on the same connection record conn = pool.connect() is_(conn._connection_record, conn_rec) # no ping - dbapi_conn = conn.connection + dbapi_conn = conn.dbapi_connection eq_(dbapi_conn.mock_calls, []) def test_connect_across_restart(self): pool = self._pool_fixture(pre_ping=True) conn = pool.connect() - stale_connection = conn.connection + stale_connection = conn.dbapi_connection conn.close() self.dbapi.shutdown("execute") @@ -316,7 +316,7 @@ class PrePingMockTest(fixtures.TestBase): pool = self._pool_fixture(pre_ping=True) conn = pool.connect() - old_dbapi_conn = conn.connection + old_dbapi_conn = conn.dbapi_connection conn.close() # no cursor() because no pre ping @@ -335,7 +335,7 @@ class PrePingMockTest(fixtures.TestBase): self.dbapi.restart() conn = pool.connect() - dbapi_conn = conn.connection + dbapi_conn = conn.dbapi_connection del conn gc_collect() @@ -1111,7 +1111,7 @@ class RealReconnectTest(fixtures.TestBase): def test_ensure_is_disconnect_gets_connection(self): def is_disconnect(e, conn, cursor): # connection is still present - assert conn.connection is not None + assert conn.dbapi_connection is not None # the error usually occurs on connection.cursor(), # though MySQLdb we get a non-working cursor. # assert cursor is None @@ -1308,7 +1308,7 @@ class PrePingRealTest(fixtures.TestBase): conn = engine.connect() eq_(conn.execute(select(1)).scalar(), 1) - stale_connection = conn.connection.connection + stale_connection = conn.connection.dbapi_connection conn.close() engine.test_shutdown() diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index 01e3e3040f..3c260f9e5d 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -391,7 +391,7 @@ class AsyncEngineTest(EngineFixture): connection_fairy = await conn.get_raw_connection() is_(connection_fairy.is_valid, True) - dbapi_connection = connection_fairy.connection + dbapi_connection = connection_fairy.dbapi_connection await conn.invalidate() @@ -399,7 +399,7 @@ class AsyncEngineTest(EngineFixture): assert dbapi_connection._connection.is_closed() new_fairy = await conn.get_raw_connection() - is_not(new_fairy.connection, dbapi_connection) + is_not(new_fairy.dbapi_connection, dbapi_connection) is_not(new_fairy, connection_fairy) is_(new_fairy.is_valid, True) is_(connection_fairy.is_valid, False) -- 2.47.2