]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
ensure event handlers called for all do_ping
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 24 Feb 2023 21:15:21 +0000 (16:15 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 4 Mar 2023 16:00:48 +0000 (11:00 -0500)
The support for pool ping listeners to receive exception events via the
:meth:`.ConnectionEvents.handle_error` event added in 2.0.0b1 for
:ticket:`5648` failed to take into account dialect-specific ping routines
such as that of MySQL and PostgreSQL. The dialect feature has been reworked
so that all dialects participate within event handling.   Additionally,
a new boolean element :attr:`.ExceptionContext.is_pre_ping` is added
which identifies if this operation is occurring within the pre-ping
operation.

For this release, third party dialects which implement a custom
:meth:`_engine.Dialect.do_ping` method can opt in to the newly improved
behavior by having their method no longer catch exceptions or check
exceptions for "is_disconnect", instead just propagating all exceptions
outwards. Checking the exception for "is_disconnect" is now done by an
enclosing method on the default dialect, which ensures that the event hook
is invoked for all exception scenarios before testing the exception as a
"disconnect" exception. If an existing ``do_ping()`` method continues to
catch exceptions and check "is_disconnect", it will continue to work as it
did previously, but ``handle_error`` hooks will not have access to the
exception if it isn't propagated outwards.

Fixes: #5648
Change-Id: I6535d5cb389e1a761aad8c37cfeb332c548b876d

doc/build/changelog/unreleased_20/5648.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mysql/mysqlconnector.py
lib/sqlalchemy/dialects/mysql/mysqldb.py
lib/sqlalchemy/dialects/postgresql/_psycopg_common.py
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/interfaces.py
lib/sqlalchemy/pool/base.py
lib/sqlalchemy/testing/engines.py
test/engine/test_reconnect.py

diff --git a/doc/build/changelog/unreleased_20/5648.rst b/doc/build/changelog/unreleased_20/5648.rst
new file mode 100644 (file)
index 0000000..acc1251
--- /dev/null
@@ -0,0 +1,24 @@
+.. change::
+    :tags: bug, mysql, postgresql
+    :tickets: 5648
+
+    The support for pool ping listeners to receive exception events via the
+    :meth:`.ConnectionEvents.handle_error` event added in 2.0.0b1 for
+    :ticket:`5648` failed to take into account dialect-specific ping routines
+    such as that of MySQL and PostgreSQL. The dialect feature has been reworked
+    so that all dialects participate within event handling.   Additionally,
+    a new boolean element :attr:`.ExceptionContext.is_pre_ping` is added
+    which identifies if this operation is occurring within the pre-ping
+    operation.
+
+    For this release, third party dialects which implement a custom
+    :meth:`_engine.Dialect.do_ping` method can opt in to the newly improved
+    behavior by having their method no longer catch exceptions or check
+    exceptions for "is_disconnect", instead just propagating all exceptions
+    outwards. Checking the exception for "is_disconnect" is now done by an
+    enclosing method on the default dialect, which ensures that the event hook
+    is invoked for all exception scenarios before testing the exception as a
+    "disconnect" exception. If an existing ``do_ping()`` method continues to
+    catch exceptions and check "is_disconnect", it will continue to work as it
+    did previously, but ``handle_error`` hooks will not have access to the
+    exception if it isn't propagated outwards.
index 26013ffce97477800c0c8640628611804041cc54..fc90c65d2ad4dc9c283116e2bbe68eae4aeb3cdd 100644 (file)
@@ -85,15 +85,8 @@ class MySQLDialect_mysqlconnector(MySQLDialect):
         return connector
 
     def do_ping(self, dbapi_connection):
-        try:
-            dbapi_connection.ping(False)
-        except self.dbapi.Error as err:
-            if self.is_disconnect(err, dbapi_connection, None):
-                return False
-            else:
-                raise
-        else:
-            return True
+        dbapi_connection.ping(False)
+        return True
 
     def create_connect_args(self, url):
         opts = url.translate_connect_args(username="user")
index 5c9d11a53dec6fd756c56b0967eaf36746b2fbf4..0868401d436b6a0c6dbc165b1358d016f6b734e9 100644 (file)
@@ -168,15 +168,8 @@ class MySQLDialect_mysqldb(MySQLDialect):
         return on_connect
 
     def do_ping(self, dbapi_connection):
-        try:
-            dbapi_connection.ping(False)
-        except self.dbapi.Error as err:
-            if self.is_disconnect(err, dbapi_connection, None):
-                return False
-            else:
-                raise
-        else:
-            return True
+        dbapi_connection.ping(False)
+        return True
 
     def do_executemany(self, cursor, statement, parameters, context=None):
         rowcount = cursor.executemany(statement, parameters)
index d9ddefd385b6b655b5b26257c6e80cb1c233b77a..739cbc5a9d0cf09b850cb32b79e944bd66ba4261 100644 (file)
@@ -178,20 +178,15 @@ class _PGDialect_common_psycopg(PGDialect):
     def do_ping(self, dbapi_connection):
         cursor = None
         before_autocommit = dbapi_connection.autocommit
+
+        if not before_autocommit:
+            dbapi_connection.autocommit = True
+        cursor = dbapi_connection.cursor()
         try:
-            if not before_autocommit:
-                self._do_autocommit(dbapi_connection, True)
-            cursor = dbapi_connection.cursor()
-            try:
-                cursor.execute(self._dialect_specific_select_one)
-            finally:
-                cursor.close()
-                if not before_autocommit and not dbapi_connection.closed:
-                    self._do_autocommit(dbapi_connection, before_autocommit)
-        except self.dbapi.Error as err:
-            if self.is_disconnect(err, dbapi_connection, cursor):
-                return False
-            else:
-                raise
-        else:
-            return True
+            cursor.execute(self._dialect_specific_select_one)
+        finally:
+            cursor.close()
+            if not before_autocommit and not dbapi_connection.closed:
+                dbapi_connection.autocommit = before_autocommit
+
+        return True
index e0058450331a6e22a0e7926bd8fdcfe8498254ad..2acc5fea300e0259c92d70e9080c8e70355a9fc2 100644 (file)
@@ -992,15 +992,8 @@ class PGDialect_asyncpg(PGDialect):
         return ([], opts)
 
     def do_ping(self, dbapi_connection):
-        try:
-            dbapi_connection.ping()
-        except self.dbapi.Error as err:
-            if self.is_disconnect(err, dbapi_connection, None):
-                return False
-            else:
-                raise
-        else:
-            return True
+        dbapi_connection.ping()
+        return True
 
     @classmethod
     def get_pool_class(cls, url):
index f6c637aa892714e1b4f5edbfb39a321fa3604142..09610b069125d62373eb0806dfee276c4da6929d 100644 (file)
@@ -2275,6 +2275,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
                     context,
                     self._is_disconnect,
                     invalidate_pool_on_disconnect,
+                    False,
                 )
 
                 for fn in self.dialect.dispatch.handle_error:
@@ -2345,6 +2346,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
         engine: Optional[Engine] = None,
         is_disconnect: Optional[bool] = None,
         invalidate_pool_on_disconnect: bool = True,
+        is_pre_ping: bool = False,
     ) -> NoReturn:
         exc_info = sys.exc_info()
 
@@ -2385,6 +2387,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
                 None,
                 is_disconnect,
                 invalidate_pool_on_disconnect,
+                is_pre_ping,
             )
             for fn in dialect.dispatch.handle_error:
                 try:
@@ -2443,6 +2446,7 @@ class ExceptionContextImpl(ExceptionContext):
         "execution_context",
         "is_disconnect",
         "invalidate_pool_on_disconnect",
+        "is_pre_ping",
     )
 
     def __init__(
@@ -2458,6 +2462,7 @@ class ExceptionContextImpl(ExceptionContext):
         context: Optional[ExecutionContext],
         is_disconnect: bool,
         invalidate_pool_on_disconnect: bool,
+        is_pre_ping: bool,
     ):
         self.engine = engine
         self.dialect = dialect
@@ -2469,6 +2474,7 @@ class ExceptionContextImpl(ExceptionContext):
         self.parameters = parameters
         self.is_disconnect = is_disconnect
         self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
+        self.is_pre_ping = is_pre_ping
 
 
 class Transaction(TransactionalContext):
index f8126fa30c3471146b30849bbb1b746fdf06ef69..3e4e6fb9ae65d6be9222e83e368a413f51fc2f7d 100644 (file)
@@ -669,16 +669,11 @@ class DefaultDialect(Dialect):
     def _dialect_specific_select_one(self):
         return str(expression.select(1).compile(dialect=self))
 
-    def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
-        cursor = None
+    def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
         try:
-            cursor = dbapi_connection.cursor()
-            try:
-                cursor.execute(self._dialect_specific_select_one)
-            finally:
-                cursor.close()
+            return self.do_ping(dbapi_connection)
         except self.loaded_dbapi.Error as err:
-            is_disconnect = self.is_disconnect(err, dbapi_connection, cursor)
+            is_disconnect = self.is_disconnect(err, dbapi_connection, None)
 
             if self._has_events:
                 try:
@@ -687,19 +682,25 @@ class DefaultDialect(Dialect):
                         self,
                         is_disconnect=is_disconnect,
                         invalidate_pool_on_disconnect=False,
+                        is_pre_ping=True,
                     )
                 except exc.StatementError as new_err:
                     is_disconnect = new_err.connection_invalidated
 
-                # other exceptions modified by the event handler will be
-                # thrown
-
             if is_disconnect:
                 return False
             else:
                 raise
-        else:
-            return True
+
+    def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+        cursor = None
+
+        cursor = dbapi_connection.cursor()
+        try:
+            cursor.execute(self._dialect_specific_select_one)
+        finally:
+            cursor.close()
+        return True
 
     def create_xid(self):
         """Create a random two-phase transaction ID.
index c1de13221b08d49c41da7fef4559035679ed2af8..9952a85e3a005754e9f7daf065d8a71326f6ee4b 100644 (file)
@@ -1967,6 +1967,9 @@ class Dialect(EventTarget):
 
         raise NotImplementedError()
 
+    def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
+        raise NotImplementedError()
+
     def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
         """ping the DBAPI connection and return True if the connection is
         usable."""
@@ -3291,6 +3294,17 @@ class ExceptionContext:
 
     """
 
+    is_pre_ping: bool
+    """Indicates if this error is occurring within the "pre-ping" step
+    performed when :paramref:`_sa.create_engine.pool_pre_ping` is set to
+    ``True``.  In this mode, the :attr:`.ExceptionContext.engine` attribute
+    will be ``None``.  The dialect in use is accessible via the
+    :attr:`.ExceptionContext.dialect` attribute.
+
+    .. versionadded:: 2.0.5
+
+    """
+
 
 class AdaptedConnection:
     """Interface of an adapted connection object to support the DBAPI protocol.
index d67f32442674564c76256e1d748c36d29f9f7657..ac487452c4df56ef5b40beeaca87896ed951bd5d 100644 (file)
@@ -132,7 +132,7 @@ class _ConnDialect:
     def do_close(self, dbapi_connection: DBAPIConnection) -> None:
         dbapi_connection.close()
 
-    def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+    def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
         raise NotImplementedError(
             "The ping feature requires that a dialect is "
             "passed to the connection pool."
@@ -1266,6 +1266,7 @@ class _ConnectionFairy(PoolProxiedConnection):
         threadconns: Optional[threading.local] = None,
         fairy: Optional[_ConnectionFairy] = None,
     ) -> _ConnectionFairy:
+
         if not fairy:
             fairy = _ConnectionRecord.checkout(pool)
 
@@ -1304,7 +1305,9 @@ class _ConnectionFairy(PoolProxiedConnection):
                                 "Pool pre-ping on connection %s",
                                 fairy.dbapi_connection,
                             )
-                        result = pool._dialect.do_ping(fairy.dbapi_connection)
+                        result = pool._dialect._do_ping_w_event(
+                            fairy.dbapi_connection
+                        )
                         if not result:
                             if fairy._echo:
                                 pool.logger.debug(
index 546b2f16af032e9dcb197327d8a2d49ec68e4111..ece54cb5205cac689493d9115f7c6cf88b0de6fc 100644 (file)
@@ -454,8 +454,8 @@ class DBAPIProxyConnection:
 
     """
 
-    def __init__(self, engine, cursor_cls):
-        self.conn = engine.pool._creator()
+    def __init__(self, engine, conn, cursor_cls):
+        self.conn = conn
         self.engine = engine
         self.cursor_cls = cursor_cls
 
index 2a6b21e6bdeb08932fcedbbf1528f040235062ff..8ff34da9877791f79e81a9db1e7f332777d3d291 100644 (file)
@@ -1,3 +1,4 @@
+import itertools
 import time
 from unittest.mock import call
 from unittest.mock import Mock
@@ -28,6 +29,8 @@ from sqlalchemy.testing import is_false
 from sqlalchemy.testing import is_true
 from sqlalchemy.testing import mock
 from sqlalchemy.testing import ne_
+from sqlalchemy.testing.engines import DBAPIProxyConnection
+from sqlalchemy.testing.engines import DBAPIProxyCursor
 from sqlalchemy.testing.engines import testing_engine
 from sqlalchemy.testing.schema import Column
 from sqlalchemy.testing.schema import Table
@@ -1002,6 +1005,155 @@ def _assert_invalidated(fn, *args):
             raise
 
 
+class RealPrePingEventHandlerTest(fixtures.TestBase):
+    """real test for issue #5648, which had to be revisited for 2.0 as the
+    initial version was not adequately tested and non-implementation for
+    mysql, postgresql was not caught
+
+    """
+
+    __backend__ = True
+    __requires__ = "graceful_disconnects", "ad_hoc_engines"
+
+    @testing.fixture
+    def ping_fixture(self, testing_engine):
+        engine = testing_engine(
+            options={"pool_pre_ping": True, "_initialize": False}
+        )
+
+        existing_connect = engine.dialect.dbapi.connect
+
+        fail = False
+        fail_count = itertools.count()
+        DBAPIError = engine.dialect.dbapi.Error
+
+        class ExplodeConnection(DBAPIProxyConnection):
+            def ping(self, *arg, **kw):
+                if fail and next(fail_count) < 1:
+                    raise DBAPIError("unhandled disconnect situation")
+                else:
+                    return True
+
+        class ExplodeCursor(DBAPIProxyCursor):
+            def execute(self, stmt, parameters=None, **kw):
+                if fail and next(fail_count) < 1:
+                    raise DBAPIError("unhandled disconnect situation")
+                else:
+                    return super().execute(stmt, parameters=parameters, **kw)
+
+        def mock_connect(*arg, **kw):
+            real_connection = existing_connect(*arg, **kw)
+            return ExplodeConnection(engine, real_connection, ExplodeCursor)
+
+        with mock.patch.object(
+            engine.dialect.loaded_dbapi, "connect", mock_connect
+        ):
+
+            # set up initial connection.  pre_ping works on subsequent connects
+            engine.connect().close()
+
+            # ping / exec will fail
+            fail = True
+
+            yield engine
+
+    @testing.fixture
+    def ping_fixture_all_errs_disconnect(self, ping_fixture):
+        engine = ping_fixture
+
+        with mock.patch.object(
+            engine.dialect, "is_disconnect", lambda *arg, **kw: True
+        ):
+            yield engine
+
+    def test_control(self, ping_fixture):
+        """test the fixture raises on connect"""
+        engine = ping_fixture
+
+        with expect_raises_message(
+            exc.DBAPIError, "unhandled disconnect situation"
+        ):
+            engine.connect()
+
+    def test_downgrade_control(self, ping_fixture_all_errs_disconnect):
+        """test the disconnect fixture doesn't raise, since it considers
+        all errors to be disconnect errors.
+
+        """
+
+        engine = ping_fixture_all_errs_disconnect
+
+        conn = engine.connect()
+        conn.close()
+
+    def test_event_handler_didnt_upgrade_disconnect(self, ping_fixture):
+        """test that having an event handler that doesn't do anything
+        keeps the behavior in place for a fatal error.
+
+        """
+        engine = ping_fixture
+
+        @event.listens_for(engine, "handle_error")
+        def setup_disconnect(ctx):
+            assert not ctx.is_disconnect
+
+        with expect_raises_message(
+            exc.DBAPIError, "unhandled disconnect situation"
+        ):
+            engine.connect()
+
+    def test_event_handler_didnt_downgrade_disconnect(
+        self, ping_fixture_all_errs_disconnect
+    ):
+        """test that having an event handler that doesn't do anything
+        keeps the behavior in place for a disconnect error.
+
+        """
+        engine = ping_fixture_all_errs_disconnect
+
+        @event.listens_for(engine, "handle_error")
+        def setup_disconnect(ctx):
+            assert ctx.is_pre_ping
+            assert ctx.is_disconnect
+
+        conn = engine.connect()
+        conn.close()
+
+    def test_event_handler_can_upgrade_disconnect(self, ping_fixture):
+        """test that an event hook can receive a fatal error and convert
+        it to be a disconnect error during pre-ping"""
+
+        engine = ping_fixture
+
+        @event.listens_for(engine, "handle_error")
+        def setup_disconnect(ctx):
+            assert ctx.is_pre_ping
+            ctx.is_disconnect = True
+
+        conn = engine.connect()
+        # no error
+        conn.close()
+
+    def test_event_handler_can_downgrade_disconnect(
+        self, ping_fixture_all_errs_disconnect
+    ):
+        """test that an event hook can receive a disconnect error and convert
+        it to be a fatal error during pre-ping"""
+
+        engine = ping_fixture_all_errs_disconnect
+
+        @event.listens_for(engine, "handle_error")
+        def setup_disconnect(ctx):
+            assert ctx.is_disconnect
+            if ctx.is_pre_ping:
+                ctx.is_disconnect = False
+
+        with expect_raises_message(
+            exc.DBAPIError, "unhandled disconnect situation"
+        ):
+            engine.connect()
+
+
 class RealReconnectTest(fixtures.TestBase):
     __backend__ = True
     __requires__ = "graceful_disconnects", "ad_hoc_engines"