.. changelog::
:version: 1.1.0
+ .. change::
+ :tags: bug, sql, mysql
+ :tickets: 3803
+
+ The ``BaseException`` exception class is now intercepted by the
+ exception-handling routines of :class:`.Connection`, and includes
+ handling by the :meth:`~.ConnectionEvents.handle_error`
+ event. The :class:`.Connection` is now **invalidated** by default in
+ the case of a system level exception that is not a subclass of
+ ``Exception``, including ``KeyboardInterrupt`` and the greenlet
+ ``GreenletExit`` class, to prevent further operations from occurring
+ upon a database connection that is in an unknown and possibly
+ corrupted state. The MySQL drivers are most targeted by this change
+ however the change is across all DBAPIs.
+
+ .. seealso::
+
+ :ref:`change_3803`
+
.. change::
:tags: bug, sql
:tickets: 3799
New Features and Improvements - Core
====================================
+.. _change_3803:
+
+Engines now invalidate connections, run error handlers for BaseException
+------------------------------------------------------------------------
+
+.. versionadded:: 1.1 this change is a late add to the 1.1 series just
+ prior to 1.1 final, and is not present in the 1.1 beta releases.
+
+The Python ``BaseException`` class is below that of ``Exception`` but is the
+identifiable base for system-level exceptions such as ``KeyboardInterrupt``,
+``SystemExit``, and notably the ``GreenletExit`` exception that's used by
+eventlet and gevent. This exception class is now intercepted by the exception-
+handling routines of :class:`.Connection`, and includes handling by the
+:meth:`~.ConnectionEvents.handle_error` event. The :class:`.Connection` is now
+**invalidated** by default in the case of a system level exception that is not
+a subclass of ``Exception``, as it is assumed an operation was interrupted and
+the connection may be in an unusable state. The MySQL drivers are most
+targeted by this change however the change is across all DBAPIs.
+
+Note that upon invalidation, the immediate DBAPI connection used by
+:class:`.Connection` is disposed, and the :class:`.Connection`, if still
+being used subsequent to the exception raise, will use a new
+DBAPI connection for subsequent operations upon next use; however, the state of
+any transaction in progress is lost and the appropriate ``.rollback()`` method
+must be called if applicable before this re-use can proceed.
+
+In order to identify this change, it was straightforward to demonstrate a pymysql or
+mysqlclient / MySQL-Python connection moving into a corrupted state when
+these exceptions occur in the middle of the connection doing its work;
+the connection would then be returned to the connection pool where subsequent
+uses would fail, or even before returning to the pool would cause secondary
+failures in context managers that call ``.rollback()`` upon the exception
+catch. The behavior here is expected to reduce
+the incidence of the MySQL error "commands out of sync", as well as the
+``ResourceClosedError`` which can occur when the MySQL driver fails to
+report ``cursor.description`` correctly, when running under greenlet
+conditions where greenlets are killed, or where ``KeyboardInterrupt`` exceptions
+are handled without exiting the program entirely.
+
+The behavior is distinct from the usual auto-invalidation feature, in that it
+does not assume that the backend database itself has been shut down or
+restarted; it does not recycle the entire connection pool as is the case
+for usual DBAPI disconnect exceptions.
+
+This change should be a net improvement for all users with the exception
+of **any application that currently intercepts ``KeyboardInterrupt`` or
+``GreenletExit`` and wishes to continue working within the same transaction**.
+Such an operation is theoretically possible with other DBAPIs that do not appear to be
+impacted by ``KeyboardInterrupt`` such as psycopg2. For these DBAPIs,
+the following workaround will disable the connection from being recycled
+for specific exceptions::
+
+
+ engine = create_engine("postgresql+psycopg2://")
+
+ @event.listens_for(engine, "handle_error")
+ def cancel_disconnect(ctx):
+ if isinstance(ctx.original_exception, KeyboardInterrupt):
+ ctx.is_disconnect = False
+
+:ticket:`3803`
+
+
.. _change_2551:
CTE Support for INSERT, UPDATE, DELETE
except AttributeError:
try:
return self._revalidate_connection()
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
def get_isolation_level(self):
"""
try:
return self.dialect.get_isolation_level(self.connection)
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
@property
self.engine.dialect.do_begin(self.connection)
if self.connection._reset_agent is None:
self.connection._reset_agent = transaction
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
def _rollback_impl(self):
self.engine.logger.info("ROLLBACK")
try:
self.engine.dialect.do_rollback(self.connection)
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
finally:
if not self.__invalid and \
self.engine.logger.info("COMMIT")
try:
self.engine.dialect.do_commit(self.connection)
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
finally:
if not self.__invalid and \
dialect = self.dialect
ctx = dialect.execution_ctx_cls._init_default(
dialect, self, conn)
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
ret = ctx._exec_default(default, None)
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(
e,
util.text_type(statement), parameters,
statement,
parameters,
context)
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(
e,
statement,
statement,
parameters,
context)
- except Exception as e:
+ except BaseException as e:
self._handle_dbapi_exception(
e,
statement,
if context and context.exception is None:
context.exception = e
+ is_exit_exception = not isinstance(e, Exception)
+
if not self._is_disconnect:
- self._is_disconnect = \
- isinstance(e, self.dialect.dbapi.Error) and \
- not self.closed and \
+ self._is_disconnect = (
+ isinstance(e, self.dialect.dbapi.Error) and
+ not self.closed and
self.dialect.is_disconnect(
e,
self.__connection if not self.invalidated else None,
cursor)
+ ) or (
+ is_exit_exception and not self.closed
+ )
+
if context:
context.is_disconnect = self._is_disconnect
- invalidate_pool_on_disconnect = True
+ invalidate_pool_on_disconnect = not is_exit_exception
if self._reentrant_error:
util.raise_from_cause(
# non-DBAPI error - if we already got a context,
# or there's no string statement, don't wrap it
should_wrap = isinstance(e, self.dialect.dbapi.Error) or \
- (statement is not None and context is None)
+ (statement is not None
+ and context is None and not is_exit_exception)
if should_wrap:
sqlalchemy_exception = exc.DBAPIError.instance(
ctx = ExceptionContextImpl(
e, sqlalchemy_exception, self.engine,
self, cursor, statement,
- parameters, context, self._is_disconnect)
+ parameters, context, self._is_disconnect,
+ invalidate_pool_on_disconnect)
for fn in self.dispatch.handle_error:
try:
newraise = _raised
break
- if sqlalchemy_exception and \
- self._is_disconnect != ctx.is_disconnect:
- sqlalchemy_exception.connection_invalidated = \
- self._is_disconnect = ctx.is_disconnect
+ if self._is_disconnect != ctx.is_disconnect:
+ self._is_disconnect = ctx.is_disconnect
+ if sqlalchemy_exception:
+ sqlalchemy_exception.connection_invalidated = \
+ ctx.is_disconnect
# set up potentially user-defined value for
# invalidate pool.
@classmethod
def _handle_dbapi_exception_noconnection(cls, e, dialect, engine):
-
exc_info = sys.exc_info()
is_disconnect = dialect.is_disconnect(e, None, None)
if engine._has_events:
ctx = ExceptionContextImpl(
e, sqlalchemy_exception, engine, None, None, None,
- None, None, is_disconnect)
+ None, None, is_disconnect, True)
for fn in engine.dispatch.handle_error:
try:
# handler returns an exception;
def __init__(self, exception, sqlalchemy_exception,
engine, connection, cursor, statement, parameters,
- context, is_disconnect):
+ context, is_disconnect, invalidate_pool_on_disconnect):
self.engine = engine
self.connection = connection
self.sqlalchemy_exception = sqlalchemy_exception
self.statement = statement
self.parameters = parameters
self.is_disconnect = is_disconnect
+ self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
class Transaction(object):
inputsizes.append(dbtype)
try:
self.cursor.setinputsizes(*inputsizes)
- except Exception as e:
+ except BaseException as e:
self.root_connection._handle_dbapi_exception(
e, None, None, None, self)
else:
inputsizes[key] = dbtype
try:
self.cursor.setinputsizes(**inputsizes)
- except Exception as e:
+ except BaseException as e:
self.root_connection._handle_dbapi_exception(
e, None, None, None, self)
"""
try:
return self.context.rowcount
- except Exception as e:
+ except BaseException as e:
self.connection._handle_dbapi_exception(
e, None, None, self.cursor, self.context)
"""
try:
return self._saved_cursor.lastrowid
- except Exception as e:
+ except BaseException as e:
self.connection._handle_dbapi_exception(
e, None, None,
self._saved_cursor, self.context)
l = self.process_rows(self._fetchall_impl())
self._soft_close()
return l
- except Exception as e:
+ except BaseException as e:
self.connection._handle_dbapi_exception(
e, None, None,
self.cursor, self.context)
if len(l) == 0:
self._soft_close()
return l
- except Exception as e:
+ except BaseException as e:
self.connection._handle_dbapi_exception(
e, None, None,
self.cursor, self.context)
else:
self._soft_close()
return None
- except Exception as e:
+ except BaseException as e:
self.connection._handle_dbapi_exception(
e, None, None,
self.cursor, self.context)
try:
row = self._fetchone_impl()
- except Exception as e:
+ except BaseException as e:
self.connection._handle_dbapi_exception(
e, None, None,
self.cursor, self.context)
* read-only, low-level exception handling for logging and
debugging purposes
* exception re-writing
+ * Establishing or disabling whether a connection or the owning
+ connection pool is invalidated or expired in response to a
+ specific exception.
The hook is called while the cursor from the failed operation
(if any) is still open and accessible. Special cleanup operations
.. versionadded:: 0.9.7 Added the
:meth:`.ConnectionEvents.handle_error` hook.
+ .. versionchanged:: 1.1 The :meth:`.handle_error` event will now
+ receive all exceptions that inherit from ``BaseException``, including
+ ``SystemExit`` and ``KeyboardInterrupt``. The setting for
+ :attr:`.ExceptionContext.is_disconnect` is ``True`` in this case
+ and the default for :attr:`.ExceptionContext.invalidate_pool_on_disconnect`
+ is ``False``.
+
.. versionchanged:: 1.0.0 The :meth:`.handle_error` event is now
invoked when an :class:`.Engine` fails during the initial
call to :meth:`.Engine.connect`, as well as when a
Connections with a start time prior to this pool's invalidation
time will be recycled upon next checkout.
"""
+
rec = getattr(connection, "_connection_record", None)
if not rec or self._invalidate_time < rec.starttime:
self._invalidate_time = time.time()
-from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.testing import eq_, ne_, assert_raises, assert_raises_message
import time
from sqlalchemy import (
select, MetaData, Integer, String, create_engine, pool, exc, util)
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.mock import Mock, call, patch
+from sqlalchemy import event
class MockError(Exception):
pass
+class MockExitIsh(BaseException):
+ pass
+
+
def mock_connection():
def mock_cursor():
def execute(*args, **kwargs):
if conn.explode == 'execute':
raise MockDisconnect("Lost the DB connection on execute")
- elif conn.explode in ('execute_no_disconnect', ):
+ elif conn.explode == 'interrupt':
+ conn.explode = "explode_no_disconnect"
+ raise MockExitIsh("Keyboard / greenlet / etc interruption")
+ elif conn.explode == 'interrupt_dont_break':
+ conn.explode = None
+ raise MockExitIsh("Keyboard / greenlet / etc interruption")
+ elif conn.explode in ('execute_no_disconnect',
+ 'explode_no_disconnect'):
raise MockError(
"something broke on execute but we didn't lose the "
"connection")
- elif conn.explode in ('rollback', 'rollback_no_disconnect'):
+ elif conn.explode in ('rollback', 'rollback_no_disconnect',
+ 'explode_no_disconnect'):
raise MockError(
"something broke on execute but we didn't lose the "
"connection")
c2 = engine.connect()
eq_(Dialect.initialize.call_count, 1)
+ def test_invalidate_conn_w_contextmanager_interrupt(self):
+ # test [ticket:3803]
+ pool = self.db.pool
+
+ conn = self.db.connect()
+ self.dbapi.shutdown("interrupt")
+
+ def go():
+ with conn.begin():
+ conn.execute(select([1]))
+
+ assert_raises(
+ MockExitIsh,
+ go
+ )
+
+ assert conn.invalidated
+
+ eq_(pool._invalidate_time, 0) # pool not invalidated
+
+ conn.execute(select([1]))
+ assert not conn.invalidated
+
+ def test_invalidate_conn_interrupt_nodisconnect_workaround(self):
+ # test [ticket:3803] workaround for no disconnect on keyboard interrupt
+
+ @event.listens_for(self.db, "handle_error")
+ def cancel_disconnect(ctx):
+ ctx.is_disconnect = False
+
+ pool = self.db.pool
+
+ conn = self.db.connect()
+ self.dbapi.shutdown("interrupt_dont_break")
+
+ def go():
+ with conn.begin():
+ conn.execute(select([1]))
+
+ assert_raises(
+ MockExitIsh,
+ go
+ )
+
+ assert not conn.invalidated
+
+ eq_(pool._invalidate_time, 0) # pool not invalidated
+
+ conn.execute(select([1]))
+ assert not conn.invalidated
+
+ def test_invalidate_conn_w_contextmanager_disconnect(self):
+ # test [ticket:3803] change maintains old behavior
+
+ pool = self.db.pool
+
+ conn = self.db.connect()
+ self.dbapi.shutdown("execute")
+
+ def go():
+ with conn.begin():
+ conn.execute(select([1]))
+
+ assert_raises(
+ exc.DBAPIError, # wraps a MockDisconnect
+ go
+ )
+
+ assert conn.invalidated
+
+ ne_(pool._invalidate_time, 0) # pool is invalidated
+
+ conn.execute(select([1]))
+ assert not conn.invalidated
+
class CursorErrTest(fixtures.TestBase):
# this isn't really a "reconnect" test, it's more of