--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 5326
+
+ Fixed fairly critical issue where the DBAPI connection could be returned to
+ the connection pool while still in an un-rolled-back state. The reset agent
+ responsible for rolling back the connection could be corrupted in the case
+ that the transaction was "closed" without being rolled back or committed,
+ which can occur in some scenarios when using ORM sessions and emitting
+ .close() in a certain pattern involving savepoints. The fix ensures that
+ the reset agent is always active.
+
self._autobegin()
else:
self._transaction = RootTransaction(self)
+ self.connection._reset_agent = self._transaction
return self._transaction
trans = NestedTransaction(self, self._transaction)
if trans._is_root:
assert trans._parent is trans
self._transaction = None
+
+ # test suite w/ SingletonThreadPool will have cases
+ # where _reset_agent is on a different Connection
+ # entirely so we can't assert this here.
+ # if (
+ # not self._is_future
+ # and self._still_open_and_connection_is_valid
+ # ):
+ # assert self.__connection._reset_agent is None
else:
assert trans._parent is not trans
self._transaction = trans._parent
+ # not doing this assertion for now, however this is how
+ # it would look:
+ # if self._still_open_and_connection_is_valid:
+ # trans = self._transaction
+ # while not trans._is_root:
+ # trans = trans._parent
+ # assert self.__connection._reset_agent is trans
+
def _rollback_to_savepoint_impl(
self, name, context, deactivate_only=False
):
an enclosing transaction.
"""
- if not self._parent.is_active:
- return
- if self._parent is self:
+
+ if self._parent.is_active and self._parent is self:
self.rollback()
+ self.connection._discard_transaction(self)
def rollback(self):
"""Roll back this :class:`.Transaction`.
", via agent" if self._reset_agent else "",
)
if self._reset_agent:
- self._reset_agent.rollback()
+ if not self._reset_agent.is_active:
+ util.warn(
+ "Reset agent is not active. "
+ "This should not occur unless there was already "
+ "a connectivity error in progress."
+ )
+ pool._dialect.do_rollback(self)
+ else:
+ self._reset_agent.rollback()
else:
pool._dialect.do_rollback(self)
elif pool._reset_on_return is reset_commit:
", via agent" if self._reset_agent else "",
)
if self._reset_agent:
- self._reset_agent.commit()
+ if not self._reset_agent.is_active:
+ util.warn(
+ "Reset agent is not active. "
+ "This should not occur unless there was already "
+ "a connectivity error in progress."
+ )
+ pool._dialect.do_commit(self)
+ else:
+ self._reset_agent.commit()
else:
pool._dialect.do_commit(self)
def __init__(self, conn):
self.conn = conn
+ is_active = True
+
def rollback(self):
self.conn.special_rollback()
def __init__(self, conn):
self.conn = conn
+ is_active = True
+
def rollback(self):
self.conn.special_rollback()
trans.commit()
assert connection.connection._reset_agent is None
+ def test_trans_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans.close()
+ assert connection.connection._reset_agent is None
+
+ def test_trans_reset_agent_broken_ensure(self):
+ eng = testing_engine()
+ conn = eng.connect()
+ trans = conn.begin()
+ assert conn.connection._reset_agent is trans
+ trans.is_active = False
+
+ with expect_warnings("Reset agent is not active"):
+ conn.close()
+
+ def test_trans_commit_reset_agent_broken_ensure(self):
+ eng = testing_engine(options={"pool_reset_on_return": "commit"})
+ conn = eng.connect()
+ trans = conn.begin()
+ assert conn.connection._reset_agent is trans
+ trans.is_active = False
+
+ with expect_warnings("Reset agent is not active"):
+ conn.close()
+
+ @testing.requires.savepoints
+ def test_begin_nested_trans_close(self):
+ with testing.db.connect() as connection:
+ t1 = connection.begin()
+ assert connection.connection._reset_agent is t1
+ t2 = connection.begin_nested()
+ assert connection.connection._reset_agent is t1
+ assert connection._transaction is t2
+ t2.close()
+ assert connection._transaction is t1
+ assert connection.connection._reset_agent is t1
+ t1.close()
+ assert connection.connection._reset_agent is None
+ assert not t1.is_active
+
+ @testing.requires.savepoints
+ def test_begin_nested_trans_rollback(self):
+ with testing.db.connect() as connection:
+ t1 = connection.begin()
+ assert connection.connection._reset_agent is t1
+ t2 = connection.begin_nested()
+ assert connection.connection._reset_agent is t1
+ assert connection._transaction is t2
+ t2.close()
+ assert connection._transaction is t1
+ assert connection.connection._reset_agent is t1
+ t1.rollback()
+ assert connection.connection._reset_agent is None
+ assert not t1.is_active
+
@testing.requires.savepoints
def test_begin_nested_close(self):
with testing.db.connect() as connection:
sess.rollback,
)
+ @testing.requires.independent_connections
@testing.emits_warning(".*previous exception")
def test_failed_rollback_deactivates_transaction(self):
# test #4050
# outermost is active
eq_(session.transaction._state, _session.ACTIVE)
+ @testing.requires.independent_connections
@testing.emits_warning(".*previous exception")
def test_failed_rollback_deactivates_transaction_ctx_integration(self):
# test #4050 in the same context as that of oslo.db