# [ODBC Microsoft Access Driver] Optional feature not implemented.
return
+AsyncIO dialects
+----------------
+
+As of version 1.4 SQLAlchemy supports also dialects that use
+asyncio drivers to interface with the database backend.
+
+SQLAlchemy's approach to asyncio drivers is that the connection and cursor
+objects of the driver (if any) are adapted into a pep-249 compliant interface,
+using the ``AdaptedConnection`` interface class. Refer to the internal asyncio
+driver implementations such as that of ``asyncpg``, ``asyncmy`` and
+``aiosqlite`` for examples.
+
Going Forward
==============
--- /dev/null
+.. change::
+ :tags: engine, asyncio
+ :tickets: 6832
+
+ Improve the interface used by adapted drivers, like the asyncio ones,
+ to access the actual connection object returned by the driver.
+
+ The :class:`._ConnectionFairy` object has two new attributes:
+
+ * :attr:`._ConnectionFairy.dbapi_connection` always represents a DBAPI
+ compatible object. For pep-249 drivers, this is the DBAPI connection as
+ it always has been, previously accessed under the ``.connection``
+ attribute. For asyncio drivers that SQLAlchemy adapts into a pep-249
+ interface, the returned object will normally be a SQLAlchemy adaption
+ object called :class:`_engine.AdaptedConnection`.
+ * :attr:`._ConnectionFairy.driver_connection` always represents the actual
+ connection object maintained by the third party pep-249 DBAPI or async
+ driver in use. For standard pep-249 DBAPIs, this will always be the same
+ object as that of the ``dbapi_connection``. For an asyncio driver, it
+ will be the underlying asyncio-only connection object.
+
+ The ``.connection`` attribute remains available and is now a legacy alias
+ of ``.dbapi_connection``.
+
+ .. seealso::
+
+ :ref:`faq_dbapi_connection`
+
so in any case the direct DBAPI calling pattern is always there for those
cases where it is needed.
+.. seealso::
+
+ :ref:`faq_dbapi_connection` - includes additional details about how
+ the DBAPI connection is accessed as well as the "driver" connection
+ when using asyncio drivers.
+
Some recipes for DBAPI connection use follow.
.. _stored_procedures:
:members:
+.. autoclass:: sqlalchemy.engine.AdaptedConnection
+ :members:
* If the pool itself doesn't maintain a connection after it's checked in,
such as when using :class:`.NullPool`, the behavior can be disabled.
* Otherwise, it must be ensured that:
+
* the application ensures that all :class:`_engine.Connection`
objects are explicitly closed out using a context manager (i.e. ``with``
block) or a ``try/finally`` style block
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
- connection_record.connection = connection_proxy.connection = None
+ connection_record.dbapi_connection = connection_proxy.dbapi_connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
:ref:`pysqlite_threading_pooling` - info on PySQLite's behavior.
+.. _faq_dbapi_connection:
+
How do I get at the raw DBAPI connection when using an Engine?
--------------------------------------------------------------
With a regular SA engine-level Connection, you can get at a pool-proxied
version of the DBAPI connection via the :attr:`_engine.Connection.connection` attribute on
:class:`_engine.Connection`, and for the really-real DBAPI connection you can call the
-:attr:`.ConnectionFairy.connection` attribute on that - but there should never be any need to access
-the non-pool-proxied DBAPI connection, as all methods are proxied through::
+:attr:`._ConnectionFairy.dbapi_connection` attribute on that. On regular sync drivers
+there is usually no need to access the non-pool-proxied DBAPI connection,
+as all methods are proxied through::
engine = create_engine(...)
conn = engine.connect()
- conn.connection.<do DBAPI things>
- cursor_obj = conn.connection.cursor(<DBAPI specific arguments..>)
+
+ # pep-249 style ConnectionFairy connection pool proxy object
+ connection_fairy = conn.connection
+
+ # typically to run statements one would get a cursor() from this
+ # object
+ cursor_obj = connection_fairy.cursor()
+ # ... work with cursor_obj
+
+ # to bypass "connection_fairy", such as to set attributes on the
+ # unproxied pep-249 DBAPI connection, use .dbapi_connection
+ raw_dbapi_connection = connection_fairy.dbapi_connection
+
+ # the same thing is available as .driver_connection (more on this
+ # in the next section)
+ also_raw_dbapi_connection = connection_fairy.driver_connection
+
+.. versionchanged:: 1.4.24 Added the
+ :attr:`._ConnectionFairy.dbapi_connection` attribute,
+ which supersedes the previous
+ :attr:`._ConnectionFairy.connection` attribute which still remains
+ available; this attribute always provides a pep-249 synchronous style
+ connection object. The :attr:`._ConnectionFairy.driver_connection`
+ attribute is also added which will always refer to the real driver-level
+ connection regardless of what API it presents.
+
+Accessing the underlying connnection for an asyncio driver
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When an asyncio driver is in use, there are two changes to the above
+scheme. The first is that when using an :class:`_asyncio.AsyncConnection`,
+the :class:`._ConnectionFairy` must be accessed using the awaitable method
+:meth:`_asyncio.AsyncConnection.get_raw_connection`. The
+returned :class:`._ConnectionFairy` in this case retains a sync-style
+pep-249 usage pattern, and the :attr:`._ConnectionFairy.dbapi_connection`
+attribute refers to a
+a SQLAlchemy-adapted connection object which adapts the asyncio
+connection to a sync style pep-249 API, in other words there are *two* levels
+of proxying going on when using an asyncio driver. The actual asyncio connection
+is available from the :class:`._ConnectionFairy.driver_connection` attribute.
+To restate the previous example in terms of asyncio looks like::
+
+ async def main():
+ engine = create_async_engine(...)
+ conn = await engine.connect()
+
+ # pep-249 style ConnectionFairy connection pool proxy object
+ # presents a sync interface
+ connection_fairy = await conn.get_raw_connection()
+
+ # beneath that proxy is a second proxy which adapts the
+ # asyncio driver into a pep-249 connection object, accessible
+ # via .dbapi_connection as is the same with a sync API
+ sqla_sync_conn = connection_fairy.dbapi_connection
+
+ # the really-real innermost driver connection is available
+ # from the .driver_connection attribute
+ raw_asyncio_connection = connection_fairy.driver_connection
+
+ # work with raw asyncio connection
+ result = await raw_asyncio_connection.execute(...)
+
+.. versionchanged:: 1.4.24 Added the
+ :attr:`._ConnectionFairy.dbapi_connection`
+ and :attr:`._ConnectionFairy.driver_connection` attributes to allow access
+ to pep-249 connections, pep-249 adaption layers, and underlying driver
+ connections using a consistent interface.
+
+When using asyncio drivers, the above "DBAPI" connection is actually a
+SQLAlchemy-adapted form of connection which presents a synchronous-style
+pep-249 style API. To access the actual
+asyncio driver connection, which will present the original asyncio API
+of the driver in use, this can be accessed via the
+:attr:`._ConnectionFairy.driver_connection` attribute of
+:class:`._ConnectionFairy`.
+For a standard pep-249 driver, :attr:`._ConnectionFairy.dbapi_connection`
+and :attr:`._ConnectionFairy.driver_connection` are synonymous.
You must ensure that you revert any isolation level settings or other
operation-specific settings on the connection back to normal before returning
it to the pool.
-As an alternative to reverting settings, you can call the :meth:`_engine.Connection.detach` method on
-either :class:`_engine.Connection` or the proxied connection, which will de-associate
-the connection from the pool such that it will be closed and discarded
-when :meth:`_engine.Connection.close` is called::
+As an alternative to reverting settings, you can call the
+:meth:`_engine.Connection.detach` method on either :class:`_engine.Connection`
+or the proxied connection, which will de-associate the connection from the pool
+such that it will be closed and discarded when :meth:`_engine.Connection.close`
+is called::
conn = engine.connect()
conn.detach() # detaches the DBAPI connection from the connection pool
# adjust for ConnectionFairy being present
# allows attribute set e.g. "connection.autocommit = True"
# to work properly
- if hasattr(connection, "connection"):
- connection = connection.connection
+ if hasattr(connection, "dbapi_connection"):
+ connection = connection.dbapi_connection
if level == "AUTOCOMMIT":
connection.autocommit = True
.. warning:: The aiomysql dialect as of September, 2021 appears to be unmaintained
and no longer functions for Python version 3.10. Please refer to the
- :ref:`asyncmy` dialect for current MySQL asyncio functionality.
+ :ref:`asyncmy` dialect for current MySQL/MariaDD asyncio functionality.
The aiomysql dialect is SQLAlchemy's second Python asyncio dialect.
from .pymysql import MySQLDialect_pymysql
from ... import pool
from ... import util
+from ...engine import AdaptedConnection
from ...util.concurrency import asyncio
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
return self.await_(self._cursor.fetchall())
-class AsyncAdapt_aiomysql_connection:
+class AsyncAdapt_aiomysql_connection(AdaptedConnection):
await_ = staticmethod(await_only)
__slots__ = ("dbapi", "_connection", "_execute_mutex")
return CLIENT.FOUND_ROWS
+ def get_driver_connection(self, connection):
+ return connection._connection
+
dialect = MySQLDialect_aiomysql
from .pymysql import MySQLDialect_pymysql
from ... import pool
from ... import util
+from ...engine import AdaptedConnection
from ...util.concurrency import asynccontextmanager
from ...util.concurrency import asyncio
from ...util.concurrency import await_fallback
return self.await_(self._cursor.fetchall())
-class AsyncAdapt_asyncmy_connection:
+class AsyncAdapt_asyncmy_connection(AdaptedConnection):
await_ = staticmethod(await_only)
__slots__ = ("dbapi", "_connection", "_execute_mutex", "_ss_cursors")
return CLIENT.FOUND_ROWS
+ def get_driver_connection(self, connection):
+ return connection._connection
+
dialect = MySQLDialect_asyncmy
# adjust for ConnectionFairy being present
# allows attribute set e.g. "connection.autocommit = True"
# to work properly
- if hasattr(connection, "connection"):
- connection = connection.connection
+ if hasattr(connection, "dbapi_connection"):
+ connection = connection.dbapi_connection
self._set_isolation_level(connection, level)
return result
def set_isolation_level(self, connection, level):
- if hasattr(connection, "connection"):
- dbapi_connection = connection.connection
+ if hasattr(connection, "dbapi_connection"):
+ dbapi_connection = connection.dbapi_connection
else:
dbapi_connection = connection
if level == "AUTOCOMMIT":
from ... import pool
from ... import processors
from ... import util
+from ...engine import AdaptedConnection
from ...sql import sqltypes
from ...util.concurrency import asyncio
from ...util.concurrency import await_fallback
)
-class AsyncAdapt_asyncpg_connection:
+class AsyncAdapt_asyncpg_connection(AdaptedConnection):
__slots__ = (
"dbapi",
"_connection",
return connect
+ def get_driver_connection(self, connection):
+ return connection._connection
+
dialect = PGDialect_asyncpg
level = level.replace("_", " ")
# adjust for ConnectionFairy possibly being present
- if hasattr(connection, "connection"):
- connection = connection.connection
+ if hasattr(connection, "dbapi_connection"):
+ connection = connection.dbapi_connection
if level == "AUTOCOMMIT":
connection.autocommit = True
def set_client_encoding(self, connection, client_encoding):
# adjust for ConnectionFairy possibly being present
- if hasattr(connection, "connection"):
- connection = connection.connection
+ if hasattr(connection, "dbapi_connection"):
+ connection = connection.dbapi_connection
cursor = connection.cursor()
cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'")
@util.memoized_instancemethod
def _hstore_oids(self, conn):
extras = self._psycopg2_extras()
- if hasattr(conn, "connection"):
- conn = conn.connection
+ if hasattr(conn, "dbapi_connection"):
+ conn = conn.dbapi_connection
oids = extras.HstoreAdapter.get_oids(conn)
if oids is not None and oids[0]:
return oids[0:2]
from .pysqlite import SQLiteDialect_pysqlite
from ... import pool
from ... import util
+from ...engine import AdaptedConnection
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
return self.await_(self._cursor.fetchall())
-class AsyncAdapt_aiosqlite_connection:
+class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
await_ = staticmethod(await_only)
__slots__ = ("dbapi", "_connection")
return super().is_disconnect(e, connection, cursor)
+ def get_driver_connection(self, connection):
+ return connection._connection
+
dialect = SQLiteDialect_aiosqlite
)
def set_isolation_level(self, connection, level):
- if hasattr(connection, "connection"):
- dbapi_connection = connection.connection
+ if hasattr(connection, "dbapi_connection"):
+ dbapi_connection = connection.dbapi_connection
else:
dbapi_connection = connection
return re.search(a, b) is not None
def set_regexp(connection):
- if hasattr(connection, "connection"):
- dbapi_connection = connection.connection
+ if hasattr(connection, "dbapi_connection"):
+ dbapi_connection = connection.dbapi_connection
else:
dbapi_connection = connection
dbapi_connection.create_function(
from .cursor import FullyBufferedResultProxy
from .cursor import LegacyCursorResult
from .cursor import ResultProxy
+from .interfaces import AdaptedConnection
from .interfaces import Compiled
from .interfaces import Connectable
from .interfaces import CreateEnginePlugin
def connection(self):
"""The underlying DB-API connection managed by this Connection.
+ This is a SQLAlchemy connection-pool proxied connection
+ which then has the attribute
+ :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
+ actual driver connection.
+
.. seealso::
% (", ".join(name for name, obj in trans_objs))
)
- dbapi_connection = connection.connection.connection
+ dbapi_connection = connection.connection.dbapi_connection
for name, characteristic, value in characteristic_values:
characteristic.set_characteristic(self, dbapi_connection, value)
connection.connection._connection_record.finalize_callback.append(
name = unicode(name) # noqa
return name
+ def get_driver_connection(self, connection):
+ return connection
+
class _RendersLiteral(object):
def literal_processor(self, dialect):
.. versionadded:: 1.0.3
"""
- pass
+
+ def get_driver_connection(self, connection):
+ """Returns the connection object as returned by the external driver
+ package.
+
+ For normal dialects that use a DBAPI compliant driver this call
+ will just return the ``connection`` passed as argument.
+ For dialects that instead adapt a non DBAPI compliant driver, like
+ when adapting an asyncio driver, this call will return the
+ connection-like object as returned by the driver.
+
+ .. versionadded:: 1.4.24
+
+ """
+ raise NotImplementedError()
class CreateEnginePlugin(object):
.. versionadded:: 1.0.3
"""
+
+
+class AdaptedConnection(object):
+ """Interface of an adapted connection object to support the DBAPI protocol.
+
+ Used by asyncio dialects to provide a sync-style pep-249 facade on top
+ of the asyncio connection/cursor API provided by the driver.
+
+ .. versionadded:: 1.4.24
+
+ """
+
+ __slots__ = ("_connection",)
+
+ @property
+ def driver_connection(self):
+ """The connection object as returned by the driver after a connect."""
+ return self._connection
+
+ def __repr__(self):
+ return "<AdaptedConnection %s>" % self._connection
def connection(self):
"""Not implemented for async; call
:meth:`_asyncio.AsyncConnection.get_raw_connection`.
-
"""
raise exc.InvalidRequestError(
"AsyncConnection.connection accessor is not implemented as the "
"""Return the pooled DBAPI-level connection in use by this
:class:`_asyncio.AsyncConnection`.
- This is typically the SQLAlchemy connection-pool proxied connection
- which then has an attribute .connection that refers to the actual
- DBAPI-level connection.
+ This is a SQLAlchemy connection-pool proxied connection
+ which then has the attribute
+ :attr:`_pool._ConnectionFairy.driver_connection` that refers to the
+ actual driver connection. Its
+ :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead
+ to an :class:`_engine.AdaptedConnection` instance that
+ adapts the driver connection to the DBAPI protocol.
+
"""
conn = self._sync_connection()
"passed to the connection pool."
)
+ def get_driver_connection(self, connection):
+ return connection
+
class _AsyncConnDialect(_ConnDialect):
is_async = True
starttime = None
- connection = None
+ dbapi_connection = None
"""A reference to the actual DBAPI connection being tracked.
May be ``None`` if this :class:`._ConnectionRecord` has been marked
as invalidated; a new DBAPI connection may replace it if the owning
pool calls upon this :class:`._ConnectionRecord` to reconnect.
+ For adapted drivers, like the Asyncio implementations, this is a
+ :class:`.AdaptedConnection` that adapts the driver connection
+ to the DBAPI protocol.
+ Use :attr:`._ConnectionRecord.driver_connection` to obtain the
+ connection objected returned by the driver.
+
+ .. versionadded:: 1.4.24
+
"""
+ @property
+ def driver_connection(self):
+ """The connection object as returned by the driver after a connect.
+
+ For normal sync drivers that support the DBAPI protocol, this object
+ is the same as the one referenced by
+ :attr:`._ConnectionRecord.dbapi_connection`.
+
+ For adapted drivers, like the Asyncio ones, this is the actual object
+ that was returned by the driver ``connect`` call.
+
+ As :attr:`._ConnectionRecord.dbapi_connection` it may be ``None``
+ if this :class:`._ConnectionRecord` has been marked as invalidated.
+
+ .. versionadded:: 1.4.24
+
+ """
+
+ if self.dbapi_connection is None:
+ return None
+ else:
+ return self.__pool._dialect.get_driver_connection(
+ self.dbapi_connection
+ )
+
+ @property
+ def connection(self):
+ """An alias to :attr:`._ConnectionRecord.dbapi_connection`.
+
+ This alias is deprecated, please use the new name.
+
+ .. deprecated:: 1.4.24
+
+ """
+ return self.dbapi_connection
+
+ @connection.setter
+ def connection(self, value):
+ self.dbapi_connection = value
+
_soft_invalidate_time = 0
@util.memoized_property
util.warn("Double checkin attempted on %s" % self)
return
self.fairy_ref = None
- connection = self.connection
+ connection = self.dbapi_connection
pool = self.__pool
while self.finalize_callback:
finalizer = self.finalize_callback.pop()
return self.starttime
def close(self):
- if self.connection is not None:
+ if self.dbapi_connection is not None:
self.__close()
def invalidate(self, e=None, soft=False):
- """Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`.
+ """Invalidate the DBAPI connection held by this
+ :class:`._ConnectionRecord`.
This method is called for all connection invalidations, including
when the :meth:`._ConnectionFairy.invalidate` or
as well as when any
so-called "automatic invalidation" condition occurs.
- :param e: an exception object indicating a reason for the invalidation.
+ :param e: an exception object indicating a reason for the
+ invalidation.
:param soft: if True, the connection isn't closed; instead, this
- connection will be recycled on next checkout.
+ connection will be recycled on next checkout.
.. versionadded:: 1.0.3
"""
# already invalidated
- if self.connection is None:
+ if self.dbapi_connection is None:
return
if soft:
- self.__pool.dispatch.soft_invalidate(self.connection, self, e)
+ self.__pool.dispatch.soft_invalidate(
+ self.dbapi_connection, self, e
+ )
else:
- self.__pool.dispatch.invalidate(self.connection, self, e)
+ self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
if e is not None:
self.__pool.logger.info(
"%sInvalidate connection %r (reason: %s:%s)",
"Soft " if soft else "",
- self.connection,
+ self.dbapi_connection,
e.__class__.__name__,
e,
)
self.__pool.logger.info(
"%sInvalidate connection %r",
"Soft " if soft else "",
- self.connection,
+ self.dbapi_connection,
)
if soft:
self._soft_invalidate_time = time.time()
else:
self.__close()
- self.connection = None
+ self.dbapi_connection = None
def get_connection(self):
recycle = False
# within 16 milliseconds accuracy, so unit tests for connection
# invalidation need a sleep of at least this long between initial start
# time and invalidation for the logic below to work reliably.
- if self.connection is None:
+ if self.dbapi_connection is None:
self.info.clear()
self.__connect()
elif (
and time.time() - self.starttime > self.__pool._recycle
):
self.__pool.logger.info(
- "Connection %r exceeded timeout; recycling", self.connection
+ "Connection %r exceeded timeout; recycling",
+ self.dbapi_connection,
)
recycle = True
elif self.__pool._invalidate_time > self.starttime:
self.__pool.logger.info(
"Connection %r invalidated due to pool invalidation; "
+ "recycling",
- self.connection,
+ self.dbapi_connection,
)
recycle = True
elif self._soft_invalidate_time > self.starttime:
self.__pool.logger.info(
"Connection %r invalidated due to local soft invalidation; "
+ "recycling",
- self.connection,
+ self.dbapi_connection,
)
recycle = True
self.info.clear()
self.__connect()
- return self.connection
+ return self.dbapi_connection
def _is_hard_or_soft_invalidated(self):
return (
- self.connection is None
+ self.dbapi_connection is None
or self.__pool._invalidate_time > self.starttime
or (self._soft_invalidate_time > self.starttime)
)
def __close(self):
self.finalize_callback.clear()
if self.__pool.dispatch.close:
- self.__pool.dispatch.close(self.connection, self)
- self.__pool._close_connection(self.connection)
- self.connection = None
+ self.__pool.dispatch.close(self.dbapi_connection, self)
+ self.__pool._close_connection(self.dbapi_connection)
+ self.dbapi_connection = None
def __connect(self):
pool = self.__pool
# ensure any existing connection is removed, so that if
# creator fails, this attribute stays None
- self.connection = None
+ self.dbapi_connection = None
try:
self.starttime = time.time()
- connection = pool._invoke_creator(self)
+ self.dbapi_connection = connection = pool._invoke_creator(self)
pool.logger.debug("Created new connection %r", connection)
- self.connection = connection
self.fresh = True
except Exception as e:
with util.safe_reraise():
if pool.dispatch.first_connect:
pool.dispatch.first_connect.for_modify(
pool.dispatch
- ).exec_once_unless_exception(self.connection, self)
+ ).exec_once_unless_exception(self.dbapi_connection, self)
# init of the dialect now takes place within the connect
# event, so ensure a mutex is used on the first run
pool.dispatch.connect.for_modify(
pool.dispatch
- )._exec_w_sync_on_first_run(self.connection, self)
+ )._exec_w_sync_on_first_run(self.dbapi_connection, self)
def _finalize_fairy(
- connection,
+ dbapi_connection,
connection_record,
pool,
ref, # this is None when called directly, not by the gc
if ref is not None:
if connection_record.fairy_ref is not ref:
return
- assert connection is None
- connection = connection_record.connection
+ assert dbapi_connection is None
+ dbapi_connection = connection_record.dbapi_connection
# null pool is not _is_asyncio but can be used also with async dialects
dont_restore_gced = pool._dialect.is_async
detach = not connection_record
can_manipulate_connection = True
- if connection is not None:
+ if dbapi_connection is not None:
if connection_record and echo:
pool.logger.debug(
"Connection %r being returned to pool%s",
- connection,
+ dbapi_connection,
", transaction state was already reset by caller"
if not reset
else "",
try:
fairy = fairy or _ConnectionFairy(
- connection, connection_record, echo
+ dbapi_connection,
+ connection_record,
+ echo,
)
- assert fairy.connection is connection
+ assert fairy.dbapi_connection is dbapi_connection
if reset and can_manipulate_connection:
fairy._reset(pool)
if can_manipulate_connection:
if pool.dispatch.close_detached:
- pool.dispatch.close_detached(connection)
+ pool.dispatch.close_detached(dbapi_connection)
- pool._close_connection(connection)
+ pool._close_connection(dbapi_connection)
else:
message = (
"The garbage collector is trying to clean up "
"connections when they are no longer used, calling "
"``close()`` or using a context manager to "
"manage their lifetime."
- ) % connection
+ ) % dbapi_connection
pool.logger.error(message)
util.warn(message)
"""
def __init__(self, dbapi_connection, connection_record, echo):
- self.connection = dbapi_connection
+ self.dbapi_connection = dbapi_connection
self._connection_record = connection_record
self._echo = echo
- connection = None
- """A reference to the actual DBAPI connection being tracked."""
+ dbapi_connection = None
+ """A reference to the actual DBAPI connection being tracked.
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :attr:`._ConnectionFairy.driver_connection`
+
+ :attr:`._ConnectionRecord.dbapi_connection`
+
+ :ref:`faq_dbapi_connection`
+
+ """
_connection_record = None
"""A reference to the :class:`._ConnectionRecord` object associated
"""
+ @property
+ def driver_connection(self):
+ """The connection object as returned by the driver after a connect.
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :attr:`._ConnectionFairy.dbapi_connection`
+
+ :attr:`._ConnectionRecord.driver_connection`
+
+ :ref:`faq_dbapi_connection`
+
+ """
+ return self._connection_record.driver_connection
+
+ @property
+ def connection(self):
+ """An alias to :attr:`._ConnectionFairy.dbapi_connection`.
+
+ This alias is deprecated, please use the new name.
+
+ .. deprecated:: 1.4.24
+
+ """
+ return self.dbapi_connection
+
+ @connection.setter
+ def connection(self, value):
+ self.dbapi_connection = value
+
@classmethod
def _checkout(cls, pool, threadconns=None, fairy=None):
if not fairy:
if threadconns is not None:
threadconns.current = weakref.ref(fairy)
- if fairy.connection is None:
+ if fairy.dbapi_connection is None:
raise exc.InvalidRequestError("This connection is closed")
fairy._counter += 1
if (
if fairy._echo:
pool.logger.debug(
"Pool pre-ping on connection %s",
- fairy.connection,
+ fairy.dbapi_connection,
)
- result = pool._dialect.do_ping(fairy.connection)
+ result = pool._dialect.do_ping(fairy.dbapi_connection)
if not result:
if fairy._echo:
pool.logger.debug(
"Pool pre-ping on connection %s failed, "
"will invalidate pool",
- fairy.connection,
+ fairy.dbapi_connection,
)
raise exc.InvalidatePoolError()
elif fairy._echo:
pool.logger.debug(
"Connection %s is fresh, skipping pre-ping",
- fairy.connection,
+ fairy.dbapi_connection,
)
pool.dispatch.checkout(
- fairy.connection, fairy._connection_record, fairy
+ fairy.dbapi_connection, fairy._connection_record, fairy
)
return fairy
except exc.DisconnectionError as e:
pool.logger.info(
"Disconnection detected on checkout, "
"invalidating individual connection %s (reason: %r)",
- fairy.connection,
+ fairy.dbapi_connection,
e,
)
fairy._connection_record.invalidate(e)
try:
- fairy.connection = (
+ fairy.dbapi_connection = (
fairy._connection_record.get_connection()
)
except Exception as err:
def _checkin(self, reset=True):
_finalize_fairy(
- self.connection,
+ self.dbapi_connection,
self._connection_record,
self._pool,
None,
reset=reset,
fairy=self,
)
- self.connection = None
+ self.dbapi_connection = None
self._connection_record = None
_close = _checkin
if pool._reset_on_return is reset_rollback:
if self._echo:
pool.logger.debug(
- "Connection %s rollback-on-return", self.connection
+ "Connection %s rollback-on-return", self.dbapi_connection
)
pool._dialect.do_rollback(self)
elif pool._reset_on_return is reset_commit:
if self._echo:
pool.logger.debug(
"Connection %s commit-on-return",
- self.connection,
+ self.dbapi_connection,
)
pool._dialect.do_commit(self)
"""Return True if this :class:`._ConnectionFairy` still refers
to an active DBAPI connection."""
- return self.connection is not None
+ return self.dbapi_connection is not None
@util.memoized_property
def info(self):
"""
- if self.connection is None:
+ if self.dbapi_connection is None:
util.warn("Can't invalidate an already-closed connection.")
return
if self._connection_record:
self._connection_record.invalidate(e=e, soft=soft)
if not soft:
- self.connection = None
+ self.dbapi_connection = None
self._checkin()
def cursor(self, *args, **kwargs):
method.
"""
- return self.connection.cursor(*args, **kwargs)
+ return self.dbapi_connection.cursor(*args, **kwargs)
def __getattr__(self, key):
- return getattr(self.connection, key)
+ return getattr(self.dbapi_connection, key)
def detach(self):
"""Separate this connection from its Pool.
if self._connection_record is not None:
rec = self._connection_record
rec.fairy_ref = None
- rec.connection = None
+ rec.dbapi_connection = None
# TODO: should this be _return_conn?
self._pool._do_return_conn(self._connection_record)
self.info = self.info.copy()
self._connection_record = None
if self._pool.dispatch.detach:
- self._pool.dispatch.detach(self.connection, rec)
+ self._pool.dispatch.detach(self.dbapi_connection, rec)
def close(self):
self._counter -= 1
to produce a new DBAPI connection.
:param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
:param connection_record: the :class:`._ConnectionRecord` managing the
DBAPI connection.
encoding settings, collation settings, and many others.
:param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
:param connection_record: the :class:`._ConnectionRecord` managing the
DBAPI connection.
"""Called when a connection is retrieved from the Pool.
:param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
:param connection_record: the :class:`._ConnectionRecord` managing the
DBAPI connection.
for detached connections. (They do not return to the pool.)
:param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
:param connection_record: the :class:`._ConnectionRecord` managing the
DBAPI connection.
cases where the connection is discarded immediately after reset.
:param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
:param connection_record: the :class:`._ConnectionRecord` managing the
DBAPI connection.
connection occurs.
:param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
:param connection_record: the :class:`._ConnectionRecord` managing the
DBAPI connection.
.. versionadded:: 1.0.3
+ :param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
+
+ :param connection_record: the :class:`._ConnectionRecord` managing the
+ DBAPI connection.
+
+ :param exception: the exception object corresponding to the reason
+ for this invalidation, if any. May be ``None``.
+
"""
def close(self, dbapi_connection, connection_record):
.. versionadded:: 1.1
+ :param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
+
+ :param connection_record: the :class:`._ConnectionRecord` managing the
+ DBAPI connection.
+
"""
def detach(self, dbapi_connection, connection_record):
.. versionadded:: 1.1
+ :param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
+
+ :param connection_record: the :class:`._ConnectionRecord` managing the
+ DBAPI connection.
+
"""
def close_detached(self, dbapi_connection):
.. versionadded:: 1.1
+ :param dbapi_connection: a DBAPI connection.
+ The :attr:`._ConnectionRecord.dbapi_connection` attribute.
+
"""
def dispose(self):
if (
"connection" in self.__dict__
- and self.connection.connection is not None
+ and self.connection.dbapi_connection is not None
):
self.connection.close()
del self.__dict__["connection"]
# used by the test suite to make a new engine / pool without
# losing the state of an existing SQLite :memory: connection
self._invoke_creator = (
- lambda crec: other_static_pool.connection.connection
+ lambda crec: other_static_pool.connection.dbapi_connection
)
def _create_connection(self):
for rec in list(self.proxy_refs):
if rec is not None and rec.is_valid:
- self.dbapi_connections.discard(rec.connection)
+ self.dbapi_connections.discard(rec.dbapi_connection)
self._safe(rec._checkin)
# for fairy refs that were GCed and could not close the connection,
def pytest_runtest_setup(item):
from sqlalchemy.testing import asyncio
+ from sqlalchemy.util import string_types
if not isinstance(item, pytest.Function):
return
_current_class = item.parent.parent
def finalize():
- global _current_class
+ global _current_class, _current_report
_current_class = None
- asyncio._maybe_async_provisioning(
- plugin_base.stop_test_class_outside_fixtures,
- item.parent.parent.cls,
- )
+ try:
+ asyncio._maybe_async_provisioning(
+ plugin_base.stop_test_class_outside_fixtures,
+ item.parent.parent.cls,
+ )
+ except Exception as e:
+ # in case of an exception during teardown attach the original
+ # error to the exception message, otherwise it will get lost
+ if _current_report.failed:
+ if not e.args:
+ e.args = (
+ "__Original test failure__:\n"
+ + _current_report.longreprtext,
+ )
+ elif e.args[-1] and isinstance(e.args[-1], string_types):
+ args = list(e.args)
+ args[-1] += (
+ "\n__Original test failure__:\n"
+ + _current_report.longreprtext
+ )
+ e.args = tuple(args)
+ else:
+ e.args += (
+ "__Original test failure__",
+ _current_report.longreprtext,
+ )
+ raise
+ finally:
+ _current_report = None
item.parent.parent.addfinalizer(finalize)
)
+_current_report = None
+
+
+def pytest_runtest_logreport(report):
+ global _current_report
+ if report.when == "call":
+ _current_report = report
+
+
def pytest_runtest_teardown(item, nextitem):
# runs inside of pytest function fixture scope
# after test function runs
"""Decorate a function as cls- or instance- level."""
def __init__(self, func):
- self.func = func
+ self.func = self.__func__ = func
self.clslevel = func
def __get__(self, instance, owner):
with testing.db.connect().execution_options(
isolation_level="SERIALIZABLE"
) as conn:
- dbapi_conn = conn.connection.connection
+ dbapi_conn = conn.connection.dbapi_connection
is_false(dbapi_conn.autocommit)
conn.execute(text("select 1")).scalar()
- dbapi_conn = conn.connection.connection
+ dbapi_conn = conn.connection.dbapi_connection
cursor = dbapi_conn.cursor()
cursor.execute("show transaction_read_only")
conn.execute(text("Select 1")).scalar()
- dbapi_conn = conn.connection.connection
+ dbapi_conn = conn.connection.dbapi_connection
cursor = dbapi_conn.cursor()
cursor.execute("show transaction_deferrable")
with engine.connect() as conn:
conn.execute(text("select 1")).scalar()
- dbapi_conn = conn.connection.connection
+ dbapi_conn = conn.connection.dbapi_connection
cursor = dbapi_conn.cursor()
cursor.execute("show transaction_read_only")
with engine.connect() as conn:
# asyncpg but not for deferrable? which the PG docs actually
# state. weird
- dbapi_conn = conn.connection.connection
+ dbapi_conn = conn.connection.dbapi_connection
cursor = dbapi_conn.cursor()
cursor.execute("show transaction_deferrable")
# but sqlite file backend will also have independent
# connections here.
its_the_same_connection = (
- c1.connection.connection is c2.connection.connection
+ c1.connection.dbapi_connection
+ is c2.connection.dbapi_connection
)
c1.close()
c2.close()
for crec in crecs:
if crec is target_crec or not set_to_false:
- is_not(crec.connection, crec.get_connection())
+ is_not(crec.dbapi_connection, crec.get_connection())
else:
- is_(crec.connection, crec.get_connection())
+ is_(crec.dbapi_connection, crec.get_connection())
def test_alter_invalidate_pool_to_false(self):
self._test_alter_invalidate_pool_to_false(True)
),
)
eng = create_engine(u1, poolclass=QueuePool)
- eq_(
- eng.name, "sqlite"
- ) # make sure other dialects aren't getting pulled in here
+ # make sure other dialects aren't getting pulled in here
+ eq_(eng.name, "sqlite")
c = eng.connect()
- dbapi_conn_one = c.connection.connection
+ dbapi_conn_one = c.connection.dbapi_connection
c.close()
eq_(
)
c2 = eng.connect()
- dbapi_conn_two = c2.connection.connection
+ dbapi_conn_two = c2.connection.dbapi_connection
is_not(dbapi_conn_one, dbapi_conn_two)
eq_(conn.info["bap"], "two")
# returned our mock connection
- is_(conn.connection.connection, m1.our_connect())
+ is_(conn.connection.dbapi_connection, m1.our_connect())
def test_connect_do_connect_info_there_after_recycle(self):
# test that info is maintained after the do_connect()
self.assert_("foo2" in c.info)
c2 = p.connect()
- is_not(c.connection, c2.connection)
+ is_not(c.dbapi_connection, c2.dbapi_connection)
assert not c2.info
assert "foo2" in c.info
r1 = pool._ConnectionRecord(p1, connect=False)
- assert not r1.connection
+ assert not r1.dbapi_connection
c1 = r1.get_connection()
+ is_(c1, r1.dbapi_connection)
is_(c1, r1.connection)
+ is_(c1, r1.driver_connection)
def test_rec_close_reopen(self):
# test that _ConnectionRecord.close() allows
r1 = pool._ConnectionRecord(p1)
- c1 = r1.connection
+ c1 = r1.dbapi_connection
c2 = r1.get_connection()
is_(c1, c2)
r1.close()
- assert not r1.connection
+ assert not r1.dbapi_connection
eq_(c1.mock_calls, [call.close()])
c2 = r1.get_connection()
is_not(c1, c2)
- is_(c2, r1.connection)
+ is_(c2, r1.dbapi_connection)
eq_(c2.mock_calls, [])
(pool.StaticPool, None),
(pool.AssertionPool, None),
)
- def test_is_asyncio_from_dialect(self, pool_cls, is_async_king):
+ def test_is_asyncio_from_dialect(self, pool_cls, is_async_kind):
p = pool_cls(creator=object())
for is_async in (True, False):
if is_async:
p._dialect = _AsyncConnDialect()
else:
- p._dialect = _ConnDialect
- if is_async_king is None:
+ p._dialect = _ConnDialect()
+ if is_async_kind is None:
eq_(p._is_asyncio, is_async)
else:
- eq_(p._is_asyncio, is_async_king)
+ eq_(p._is_asyncio, is_async_kind)
@testing.combinations(
(pool.QueuePool, False),
def test_is_asyncio_from_dialect_cls(self, pool_cls, is_async):
eq_(pool_cls._is_asyncio, is_async)
+ def test_rec_fairy_default_dialect(self):
+ dbapi = MockDBAPI()
+ p1 = pool.Pool(creator=lambda: dbapi.connect("foo.db"))
+
+ rec = pool._ConnectionRecord(p1)
+
+ is_not_none(rec.dbapi_connection)
+
+ is_(rec.connection, rec.dbapi_connection)
+ is_(rec.driver_connection, rec.dbapi_connection)
+
+ fairy = pool._ConnectionFairy(rec.dbapi_connection, rec, False)
+
+ is_not_none(fairy.dbapi_connection)
+ is_(fairy.connection, fairy.dbapi_connection)
+ is_(fairy.driver_connection, fairy.dbapi_connection)
+
+ is_(fairy.dbapi_connection, rec.dbapi_connection)
+ is_(fairy.driver_connection, rec.driver_connection)
+
+ def test_rec_fairy_adapted_dialect(self):
+ dbapi = MockDBAPI()
+
+ mock_dc = object()
+
+ class _AdaptedDialect(_ConnDialect):
+ def get_driver_connection(self, connection):
+ return mock_dc
+
+ p1 = pool.Pool(
+ creator=lambda: dbapi.connect("foo.db"), dialect=_AdaptedDialect()
+ )
+
+ rec = pool._ConnectionRecord(p1)
+
+ is_not_none(rec.dbapi_connection)
+
+ is_(rec.connection, rec.dbapi_connection)
+ is_(rec.driver_connection, mock_dc)
+
+ fairy = pool._ConnectionFairy(rec.dbapi_connection, rec, False)
+
+ is_not_none(fairy.dbapi_connection)
+ is_(fairy.connection, fairy.dbapi_connection)
+ is_(fairy.driver_connection, mock_dc)
+
+ is_(fairy.dbapi_connection, rec.dbapi_connection)
+ is_(fairy.driver_connection, mock_dc)
+
+ def test_connection_setter(self):
+ dbapi = MockDBAPI()
+ p1 = pool.Pool(creator=lambda: dbapi.connect("foo.db"))
+
+ rec = pool._ConnectionRecord(p1)
+
+ is_not_none(rec.dbapi_connection)
+
+ is_(rec.connection, rec.dbapi_connection)
+ rec.connection = 42
+ is_(rec.connection, rec.dbapi_connection)
+ rec.dbapi_connection = 99
+ is_(rec.connection, rec.dbapi_connection)
+
class PoolDialectTest(PoolTestBase):
def _dialect(self):
canary.append("CL")
dbapi_connection.close()
+ def get_driver_connection(self, connection):
+ return connection
+
return PoolDialect(), canary
def _do_test(self, pool_cls, assertion):
c1 = p.connect()
- connection = c1.connection
+ connection = c1.dbapi_connection
rec = c1._connection_record
c1.close()
c1 = p.connect()
- connection = c1.connection
+ connection = c1.dbapi_connection
rec = c1._connection_record
c1.detach()
c1 = p.connect()
- connection = c1.connection
+ connection = c1.dbapi_connection
c1.detach()
c1.close()
assert not canary.called
c1 = p.connect()
- dbapi_con = c1.connection
+ dbapi_con = c1.dbapi_connection
c1.invalidate(soft=True)
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is None
c1.close()
assert not canary.called
c1 = p.connect()
- dbapi_con = c1.connection
+ dbapi_con = c1.dbapi_connection
exc = Exception("hi")
c1.invalidate(exc, soft=True)
assert canary.call_args_list[0][0][0] is dbapi_con
c1.close()
assert not canary.called
c1 = p.connect()
- dbapi_con = c1.connection
+ dbapi_con = c1.dbapi_connection
c1.invalidate()
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is None
c1.close()
assert not canary.called
c1 = p.connect()
- dbapi_con = c1.connection
+ dbapi_con = c1.dbapi_connection
exc = Exception("hi")
c1.invalidate(exc)
assert canary.call_args_list[0][0][0] is dbapi_con
c1 = p.connect()
- dbapi_connection = weakref.ref(c1.connection)
+ dbapi_connection = weakref.ref(c1.dbapi_connection)
eq_(canary, [])
del c1
"""Test that a concurrent ConnectionRecord.invalidate() which
occurs after the ConnectionFairy has called
_ConnectionRecord.checkout()
- but before the ConnectionFairy tests "fairy.connection is None"
+ but before the ConnectionFairy tests "fairy.dbapi_connection is None"
will not result in an InvalidRequestError.
This use case assumes that a listener on the checkout() event
_decorate_existing_checkout,
):
conn = p.connect()
- is_(conn._connection_record.connection, None)
+ is_(conn._connection_record.dbapi_connection, None)
conn.close()
@testing.requires.threading_with_mock
)
c1 = p.connect()
c2 = p.connect()
- c1_con = c1.connection
- c2_con = c2.connection
+ c1_con = c1.dbapi_connection
+ c2_con = c2.dbapi_connection
c1.close()
# ...and that's the one we'll get back next.
c3 = p.connect()
- assert c3.connection is c2_con
+ assert c3.dbapi_connection is c2_con
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
def _conn():
c = p.connect()
- strong_refs.add(c.connection)
+ strong_refs.add(c.dbapi_connection)
return c
for j in range(5):
pool_size=1, max_overflow=0, recycle=30
)
c1 = p.connect()
- c_ref = weakref.ref(c1.connection)
+ c_ref = weakref.ref(c1.dbapi_connection)
c1.close()
mock.return_value = 10001
c2 = p.connect()
- is_(c2.connection, c_ref())
+ is_(c2.dbapi_connection, c_ref())
c2.close()
mock.return_value = 10035
c3 = p.connect()
- is_not(c3.connection, c_ref())
+ is_not(c3.dbapi_connection, c_ref())
@testing.requires.timing_intensive
def test_recycle_on_invalidate(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
- c_ref = weakref.ref(c1.connection)
+ c_ref = weakref.ref(c1.dbapi_connection)
c1.close()
c2 = p.connect()
- is_(c2.connection, c_ref())
+ is_(c2.dbapi_connection, c_ref())
c2_rec = c2._connection_record
p._invalidate(c2)
- assert c2_rec.connection is None
+ assert c2_rec.dbapi_connection is None
c2.close()
time.sleep(0.5)
c3 = p.connect()
- is_not(c3.connection, c_ref())
+ is_not(c3.dbapi_connection, c_ref())
@testing.requires.timing_intensive
def test_recycle_on_soft_invalidate(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
- c_ref = weakref.ref(c1.connection)
+ c_ref = weakref.ref(c1.dbapi_connection)
c1.close()
c2 = p.connect()
- is_(c2.connection, c_ref())
+ is_(c2.dbapi_connection, c_ref())
c2_rec = c2._connection_record
time.sleep(0.1)
c2.invalidate(soft=True)
- is_(c2_rec.connection, c2.connection)
+ is_(c2_rec.dbapi_connection, c2.dbapi_connection)
c2.close()
c3 = p.connect()
- is_not(c3.connection, c_ref())
+ is_not(c3.dbapi_connection, c_ref())
is_(c3._connection_record, c2_rec)
- is_(c2_rec.connection, c3.connection)
+ is_(c2_rec.dbapi_connection, c3.dbapi_connection)
def _no_wr_finalize(self):
finalize_fairy = pool._finalize_fairy
assert_raises(Exception, p.connect)
p._pool.queue = collections.deque(
- [c for c in p._pool.queue if c.connection is not None]
+ [c for c in p._pool.queue if c.dbapi_connection is not None]
)
dbapi.shutdown(False)
assert_raises(Exception, p.connect)
p._pool.queue = collections.deque(
- [c for c in p._pool.queue if c.connection is not None]
+ [c for c in p._pool.queue if c.dbapi_connection is not None]
)
dbapi.shutdown(False)
raise tsa.exc.DisconnectionError()
conn = pool.connect()
- old_dbapi_conn = conn.connection
+ old_dbapi_conn = conn.dbapi_connection
conn.close()
eq_(old_dbapi_conn.mock_calls, [call.rollback()])
old_dbapi_conn.boom = "yes"
conn = pool.connect()
- dbapi_conn = conn.connection
+ dbapi_conn = conn.dbapi_connection
del conn
gc_collect()
def test_invalidate(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
- c_id = c1.connection.id
+ c_id = c1.dbapi_connection.id
c1.close()
c1 = None
c1 = p.connect()
- assert c1.connection.id == c_id
+ assert c1.dbapi_connection.id == c_id
c1.invalidate()
c1 = None
c1 = p.connect()
- assert c1.connection.id != c_id
+ assert c1.dbapi_connection.id != c_id
def test_recreate(self):
p = self._queuepool_fixture(
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
- c_id = c1.connection.id
+ c_id = c1.dbapi_connection.id
c1.close()
c1 = None
c1 = p.connect()
- assert c1.connection.id == c_id
+ assert c1.dbapi_connection.id == c_id
dbapi.raise_error = True
c1.invalidate()
c1 = None
c1 = p.connect()
- assert c1.connection.id != c_id
+ assert c1.dbapi_connection.id != c_id
def test_detach(self):
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
c2 = p.connect() # noqa
eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
- c1_con = c1.connection
+ c1_con = c1.dbapi_connection
assert c1_con is not None
eq_(c1_con.close.call_count, 0)
c1.close()
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
- c1_con = c1.connection
+ c1_con = c1.dbapi_connection
c1.invalidate()
- assert c1.connection is None
+ assert c1.dbapi_connection is None
eq_(c1_con.close.call_count, 1)
c2 = p.connect()
- assert c2.connection is not c1_con
- c2_con = c2.connection
+ assert c2.dbapi_connection is not c1_con
+ c2_con = c2.dbapi_connection
c2.close()
eq_(c2_con.close.call_count, 0)
for i in range(5):
pc1 = p.connect()
- is_(pc1.connection, c3)
+ is_(pc1.dbapi_connection, c3)
pc1.close()
pc1 = p.connect()
- is_(pc1.connection, c3)
+ is_(pc1.dbapi_connection, c3)
pc2 = p.connect()
- is_(pc2.connection, c2)
+ is_(pc2.dbapi_connection, c2)
pc2.close()
pc3 = p.connect()
- is_(pc3.connection, c2)
+ is_(pc3.dbapi_connection, c2)
pc2 = p.connect()
- is_(pc2.connection, c1)
+ is_(pc2.dbapi_connection, c1)
pc2.close()
pc3.close()
pc3.close()
pc1 = p.connect()
- is_(pc1.connection, c1)
+ is_(pc1.dbapi_connection, c1)
pc1.close()
pc1 = p.connect()
- is_(pc1.connection, c2)
+ is_(pc1.dbapi_connection, c2)
pc2 = p.connect()
- is_(pc2.connection, c3)
+ is_(pc2.dbapi_connection, c3)
pc2.close()
pc3 = p.connect()
- is_(pc3.connection, c1)
+ is_(pc3.dbapi_connection, c1)
pc2 = p.connect()
- is_(pc2.connection, c3)
+ is_(pc2.dbapi_connection, c3)
pc2.close()
pc3.close()
def _conn():
c = p.connect()
- sr.add(c.connection)
+ sr.add(c.dbapi_connection)
return c
else:
p = pool.SingletonThreadPool(creator=creator, pool_size=3)
c1 = p.connect()
- mock_conn = c1.connection
+ mock_conn = c1.dbapi_connection
c2 = p.connect()
is_(c1, c2)
p = pool.StaticPool(creator)
c1 = p.connect()
- conn = c1.connection
+ conn = c1.dbapi_connection
c1.close()
c2 = p.connect()
- is_(conn, c2.connection)
+ is_(conn, c2.dbapi_connection)
class CreatorCompatibilityTest(PoolTestBase):
)
conn = pool.connect()
- dbapi_conn = conn.connection
+ dbapi_conn = conn.dbapi_connection
eq_(dbapi_conn.mock_calls, [])
conn.close()
eq_(dbapi_conn.mock_calls, [call.rollback()])
conn = pool.connect()
- is_(conn.connection, dbapi_conn)
+ is_(conn.dbapi_connection, dbapi_conn)
# ping, so cursor() call.
eq_(dbapi_conn.mock_calls, [call.rollback(), call.cursor()])
conn.close()
conn = pool.connect()
- is_(conn.connection, dbapi_conn)
+ is_(conn.dbapi_connection, dbapi_conn)
# ping, so cursor() call.
eq_(
)
conn = pool.connect()
- dbapi_conn = conn.connection
+ dbapi_conn = conn.dbapi_connection
conn_rec = conn._connection_record
eq_(dbapi_conn.mock_calls, [])
conn.close()
conn = pool.connect()
- is_(conn.connection, dbapi_conn)
+ is_(conn.dbapi_connection, dbapi_conn)
# ping, so cursor() call.
eq_(dbapi_conn.mock_calls, [call.rollback(), call.cursor()])
conn.invalidate()
- is_(conn.connection, None)
+ is_(conn.dbapi_connection, None)
# connect again, make sure we're on the same connection record
conn = pool.connect()
is_(conn._connection_record, conn_rec)
# no ping
- dbapi_conn = conn.connection
+ dbapi_conn = conn.dbapi_connection
eq_(dbapi_conn.mock_calls, [])
def test_connect_across_restart(self):
pool = self._pool_fixture(pre_ping=True)
conn = pool.connect()
- stale_connection = conn.connection
+ stale_connection = conn.dbapi_connection
conn.close()
self.dbapi.shutdown("execute")
pool = self._pool_fixture(pre_ping=True)
conn = pool.connect()
- old_dbapi_conn = conn.connection
+ old_dbapi_conn = conn.dbapi_connection
conn.close()
# no cursor() because no pre ping
self.dbapi.restart()
conn = pool.connect()
- dbapi_conn = conn.connection
+ dbapi_conn = conn.dbapi_connection
del conn
gc_collect()
def test_ensure_is_disconnect_gets_connection(self):
def is_disconnect(e, conn, cursor):
# connection is still present
- assert conn.connection is not None
+ assert conn.dbapi_connection is not None
# the error usually occurs on connection.cursor(),
# though MySQLdb we get a non-working cursor.
# assert cursor is None
conn = engine.connect()
eq_(conn.execute(select(1)).scalar(), 1)
- stale_connection = conn.connection.connection
+ stale_connection = conn.connection.dbapi_connection
conn.close()
engine.test_shutdown()
connection_fairy = await conn.get_raw_connection()
is_(connection_fairy.is_valid, True)
- dbapi_connection = connection_fairy.connection
+ dbapi_connection = connection_fairy.dbapi_connection
await conn.invalidate()
assert dbapi_connection._connection.is_closed()
new_fairy = await conn.get_raw_connection()
- is_not(new_fairy.connection, dbapi_connection)
+ is_not(new_fairy.dbapi_connection, dbapi_connection)
is_not(new_fairy, connection_fairy)
is_(new_fairy.is_valid, True)
is_(connection_fairy.is_valid, False)