--- /dev/null
+.. change::
+ :tags: bug, mysql, postgresql
+ :tickets: 5648
+
+ The support for pool ping listeners to receive exception events via the
+ :meth:`.ConnectionEvents.handle_error` event added in 2.0.0b1 for
+ :ticket:`5648` failed to take into account dialect-specific ping routines
+ such as that of MySQL and PostgreSQL. The dialect feature has been reworked
+ so that all dialects participate within event handling. Additionally,
+ a new boolean element :attr:`.ExceptionContext.is_pre_ping` is added
+ which identifies if this operation is occurring within the pre-ping
+ operation.
+
+ For this release, third party dialects which implement a custom
+ :meth:`_engine.Dialect.do_ping` method can opt in to the newly improved
+ behavior by having their method no longer catch exceptions or check
+ exceptions for "is_disconnect", instead just propagating all exceptions
+ outwards. Checking the exception for "is_disconnect" is now done by an
+ enclosing method on the default dialect, which ensures that the event hook
+ is invoked for all exception scenarios before testing the exception as a
+ "disconnect" exception. If an existing ``do_ping()`` method continues to
+ catch exceptions and check "is_disconnect", it will continue to work as it
+ did previously, but ``handle_error`` hooks will not have access to the
+ exception if it isn't propagated outwards.
return connector
def do_ping(self, dbapi_connection):
- try:
- dbapi_connection.ping(False)
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, None):
- return False
- else:
- raise
- else:
- return True
+ dbapi_connection.ping(False)
+ return True
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
return on_connect
def do_ping(self, dbapi_connection):
- try:
- dbapi_connection.ping(False)
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, None):
- return False
- else:
- raise
- else:
- return True
+ dbapi_connection.ping(False)
+ return True
def do_executemany(self, cursor, statement, parameters, context=None):
rowcount = cursor.executemany(statement, parameters)
def do_ping(self, dbapi_connection):
cursor = None
before_autocommit = dbapi_connection.autocommit
+
+ if not before_autocommit:
+ dbapi_connection.autocommit = True
+ cursor = dbapi_connection.cursor()
try:
- if not before_autocommit:
- self._do_autocommit(dbapi_connection, True)
- cursor = dbapi_connection.cursor()
- try:
- cursor.execute(self._dialect_specific_select_one)
- finally:
- cursor.close()
- if not before_autocommit and not dbapi_connection.closed:
- self._do_autocommit(dbapi_connection, before_autocommit)
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, cursor):
- return False
- else:
- raise
- else:
- return True
+ cursor.execute(self._dialect_specific_select_one)
+ finally:
+ cursor.close()
+ if not before_autocommit and not dbapi_connection.closed:
+ dbapi_connection.autocommit = before_autocommit
+
+ return True
return ([], opts)
def do_ping(self, dbapi_connection):
- try:
- dbapi_connection.ping()
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, None):
- return False
- else:
- raise
- else:
- return True
+ dbapi_connection.ping()
+ return True
@classmethod
def get_pool_class(cls, url):
context,
self._is_disconnect,
invalidate_pool_on_disconnect,
+ False,
)
for fn in self.dialect.dispatch.handle_error:
engine: Optional[Engine] = None,
is_disconnect: Optional[bool] = None,
invalidate_pool_on_disconnect: bool = True,
+ is_pre_ping: bool = False,
) -> NoReturn:
exc_info = sys.exc_info()
None,
is_disconnect,
invalidate_pool_on_disconnect,
+ is_pre_ping,
)
for fn in dialect.dispatch.handle_error:
try:
"execution_context",
"is_disconnect",
"invalidate_pool_on_disconnect",
+ "is_pre_ping",
)
def __init__(
context: Optional[ExecutionContext],
is_disconnect: bool,
invalidate_pool_on_disconnect: bool,
+ is_pre_ping: bool,
):
self.engine = engine
self.dialect = dialect
self.parameters = parameters
self.is_disconnect = is_disconnect
self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
+ self.is_pre_ping = is_pre_ping
class Transaction(TransactionalContext):
def _dialect_specific_select_one(self):
return str(expression.select(1).compile(dialect=self))
- def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
- cursor = None
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
try:
- cursor = dbapi_connection.cursor()
- try:
- cursor.execute(self._dialect_specific_select_one)
- finally:
- cursor.close()
+ return self.do_ping(dbapi_connection)
except self.loaded_dbapi.Error as err:
- is_disconnect = self.is_disconnect(err, dbapi_connection, cursor)
+ is_disconnect = self.is_disconnect(err, dbapi_connection, None)
if self._has_events:
try:
self,
is_disconnect=is_disconnect,
invalidate_pool_on_disconnect=False,
+ is_pre_ping=True,
)
except exc.StatementError as new_err:
is_disconnect = new_err.connection_invalidated
- # other exceptions modified by the event handler will be
- # thrown
-
if is_disconnect:
return False
else:
raise
- else:
- return True
+
+ def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+ cursor = None
+
+ cursor = dbapi_connection.cursor()
+ try:
+ cursor.execute(self._dialect_specific_select_one)
+ finally:
+ cursor.close()
+ return True
def create_xid(self):
"""Create a random two-phase transaction ID.
raise NotImplementedError()
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
+ raise NotImplementedError()
+
def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
"""ping the DBAPI connection and return True if the connection is
usable."""
"""
+ is_pre_ping: bool
+ """Indicates if this error is occurring within the "pre-ping" step
+ performed when :paramref:`_sa.create_engine.pool_pre_ping` is set to
+ ``True``. In this mode, the :attr:`.ExceptionContext.engine` attribute
+ will be ``None``. The dialect in use is accessible via the
+ :attr:`.ExceptionContext.dialect` attribute.
+
+ .. versionadded:: 2.0.5
+
+ """
+
class AdaptedConnection:
"""Interface of an adapted connection object to support the DBAPI protocol.
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
dbapi_connection.close()
- def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
raise NotImplementedError(
"The ping feature requires that a dialect is "
"passed to the connection pool."
threadconns: Optional[threading.local] = None,
fairy: Optional[_ConnectionFairy] = None,
) -> _ConnectionFairy:
+
if not fairy:
fairy = _ConnectionRecord.checkout(pool)
"Pool pre-ping on connection %s",
fairy.dbapi_connection,
)
- result = pool._dialect.do_ping(fairy.dbapi_connection)
+ result = pool._dialect._do_ping_w_event(
+ fairy.dbapi_connection
+ )
if not result:
if fairy._echo:
pool.logger.debug(
"""
- def __init__(self, engine, cursor_cls):
- self.conn = engine.pool._creator()
+ def __init__(self, engine, conn, cursor_cls):
+ self.conn = conn
self.engine = engine
self.cursor_cls = cursor_cls
+import itertools
import time
from unittest.mock import call
from unittest.mock import Mock
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
+from sqlalchemy.testing.engines import DBAPIProxyConnection
+from sqlalchemy.testing.engines import DBAPIProxyCursor
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
raise
+class RealPrePingEventHandlerTest(fixtures.TestBase):
+ """real test for issue #5648, which had to be revisited for 2.0 as the
+ initial version was not adequately tested and non-implementation for
+ mysql, postgresql was not caught
+
+ """
+
+ __backend__ = True
+ __requires__ = "graceful_disconnects", "ad_hoc_engines"
+
+ @testing.fixture
+ def ping_fixture(self, testing_engine):
+ engine = testing_engine(
+ options={"pool_pre_ping": True, "_initialize": False}
+ )
+
+ existing_connect = engine.dialect.dbapi.connect
+
+ fail = False
+ fail_count = itertools.count()
+ DBAPIError = engine.dialect.dbapi.Error
+
+ class ExplodeConnection(DBAPIProxyConnection):
+ def ping(self, *arg, **kw):
+ if fail and next(fail_count) < 1:
+ raise DBAPIError("unhandled disconnect situation")
+ else:
+ return True
+
+ class ExplodeCursor(DBAPIProxyCursor):
+ def execute(self, stmt, parameters=None, **kw):
+ if fail and next(fail_count) < 1:
+ raise DBAPIError("unhandled disconnect situation")
+ else:
+ return super().execute(stmt, parameters=parameters, **kw)
+
+ def mock_connect(*arg, **kw):
+ real_connection = existing_connect(*arg, **kw)
+ return ExplodeConnection(engine, real_connection, ExplodeCursor)
+
+ with mock.patch.object(
+ engine.dialect.loaded_dbapi, "connect", mock_connect
+ ):
+
+ # set up initial connection. pre_ping works on subsequent connects
+ engine.connect().close()
+
+ # ping / exec will fail
+ fail = True
+
+ yield engine
+
+ @testing.fixture
+ def ping_fixture_all_errs_disconnect(self, ping_fixture):
+ engine = ping_fixture
+
+ with mock.patch.object(
+ engine.dialect, "is_disconnect", lambda *arg, **kw: True
+ ):
+ yield engine
+
+ def test_control(self, ping_fixture):
+ """test the fixture raises on connect"""
+ engine = ping_fixture
+
+ with expect_raises_message(
+ exc.DBAPIError, "unhandled disconnect situation"
+ ):
+ engine.connect()
+
+ def test_downgrade_control(self, ping_fixture_all_errs_disconnect):
+ """test the disconnect fixture doesn't raise, since it considers
+ all errors to be disconnect errors.
+
+ """
+
+ engine = ping_fixture_all_errs_disconnect
+
+ conn = engine.connect()
+ conn.close()
+
+ def test_event_handler_didnt_upgrade_disconnect(self, ping_fixture):
+ """test that having an event handler that doesn't do anything
+ keeps the behavior in place for a fatal error.
+
+ """
+ engine = ping_fixture
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert not ctx.is_disconnect
+
+ with expect_raises_message(
+ exc.DBAPIError, "unhandled disconnect situation"
+ ):
+ engine.connect()
+
+ def test_event_handler_didnt_downgrade_disconnect(
+ self, ping_fixture_all_errs_disconnect
+ ):
+ """test that having an event handler that doesn't do anything
+ keeps the behavior in place for a disconnect error.
+
+ """
+ engine = ping_fixture_all_errs_disconnect
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert ctx.is_pre_ping
+ assert ctx.is_disconnect
+
+ conn = engine.connect()
+ conn.close()
+
+ def test_event_handler_can_upgrade_disconnect(self, ping_fixture):
+ """test that an event hook can receive a fatal error and convert
+ it to be a disconnect error during pre-ping"""
+
+ engine = ping_fixture
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert ctx.is_pre_ping
+ ctx.is_disconnect = True
+
+ conn = engine.connect()
+ # no error
+ conn.close()
+
+ def test_event_handler_can_downgrade_disconnect(
+ self, ping_fixture_all_errs_disconnect
+ ):
+ """test that an event hook can receive a disconnect error and convert
+ it to be a fatal error during pre-ping"""
+
+ engine = ping_fixture_all_errs_disconnect
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert ctx.is_disconnect
+ if ctx.is_pre_ping:
+ ctx.is_disconnect = False
+
+ with expect_raises_message(
+ exc.DBAPIError, "unhandled disconnect situation"
+ ):
+ engine.connect()
+
+
class RealReconnectTest(fixtures.TestBase):
__backend__ = True
__requires__ = "graceful_disconnects", "ad_hoc_engines"