Fixed fairly critical issue where the DBAPI connection could be returned to
the connection pool while still in an un-rolled-back state. The reset agent
responsible for rolling back the connection could be corrupted in the case
that the transaction was "closed" without being rolled back or committed,
which can occur in some scenarios when using ORM sessions and emitting
.close() in a certain pattern involving savepoints. The fix ensures that
the reset agent is always active.
Note for 1.3 the cherry-pick modifies the approach from master as
transaction handling has diverged.
Fixes: #5326
Change-Id: If056870ea70a2d9a1749768988d5e023f3061b31
(cherry picked from commit
4d161ac9c28986e1e022dfb93785767f28d6bfc8)
--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 5326
+
+ Fixed fairly critical issue where the DBAPI connection could be returned to
+ the connection pool while still in an un-rolled-back state. The reset agent
+ responsible for rolling back the connection could be corrupted in the case
+ that the transaction was "closed" without being rolled back or committed,
+ which can occur in some scenarios when using ORM sessions and emitting
+ .close() in a certain pattern involving savepoints. The fix ensures that
+ the reset agent is always active.
+
self.engine.dialect.do_savepoint(self, name)
return name
+ def _discard_transaction(self, trans):
+ if trans is self.__transaction:
+ if trans._parent is trans:
+ self.__transaction = None
+ if self._still_open_and_connection_is_valid:
+ assert self.__connection._reset_agent is None
+ else:
+ self.__transaction = trans._parent
+
+ # not doing this assertion for now, however this is how
+ # it would look:
+ # if self._still_open_and_connection_is_valid:
+ # trans = self._transaction
+ # while not trans._is_root:
+ # trans = trans._parent
+ # assert self.__connection._reset_agent is trans
+
def _rollback_to_savepoint_impl(self, name, context):
assert not self.__branch_from
an enclosing transaction.
"""
- if not self._parent.is_active:
- return
- if self._parent is self:
+
+ if self._parent.is_active and self._parent is self:
self.rollback()
+ self.connection._discard_transaction(self)
def rollback(self):
"""Roll back this :class:`.Transaction`.
"""
- if not self._parent.is_active:
- return
- self._do_rollback()
- self.is_active = False
+ if self._parent.is_active:
+ self._do_rollback()
+ self.is_active = False
def _do_rollback(self):
self._parent.rollback()
", via agent" if self._reset_agent else "",
)
if self._reset_agent:
- self._reset_agent.rollback()
+ if not self._reset_agent.is_active:
+ util.warn(
+ "Reset agent is not active. "
+ "This should not occur unless there was already "
+ "a connectivity error in progress."
+ )
+ pool._dialect.do_rollback(self)
+ else:
+ self._reset_agent.rollback()
else:
pool._dialect.do_rollback(self)
elif pool._reset_on_return is reset_commit:
", via agent" if self._reset_agent else "",
)
if self._reset_agent:
- self._reset_agent.commit()
+ if not self._reset_agent.is_active:
+ util.warn(
+ "Reset agent is not active. "
+ "This should not occur unless there was already "
+ "a connectivity error in progress."
+ )
+ pool._dialect.do_commit(self)
+ else:
+ self._reset_agent.commit()
else:
pool._dialect.do_commit(self)
def __init__(self, conn):
self.conn = conn
+ is_active = True
+
def rollback(self):
self.conn.special_rollback()
def __init__(self, conn):
self.conn = conn
+ is_active = True
+
def rollback(self):
self.conn.special_rollback()
trans.commit()
assert connection.connection._reset_agent is None
+ def test_trans_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans.close()
+ assert connection.connection._reset_agent is None
+
+ def test_trans_reset_agent_broken_ensure(self):
+ eng = testing_engine()
+ conn = eng.connect()
+ trans = conn.begin()
+ assert conn.connection._reset_agent is trans
+ trans.is_active = False
+
+ with expect_warnings("Reset agent is not active"):
+ conn.close()
+
+ def test_trans_commit_reset_agent_broken_ensure(self):
+ eng = testing_engine(options={"pool_reset_on_return": "commit"})
+ conn = eng.connect()
+ trans = conn.begin()
+ assert conn.connection._reset_agent is trans
+ trans.is_active = False
+
+ with expect_warnings("Reset agent is not active"):
+ conn.close()
+
+ @testing.requires.savepoints
+ def test_begin_nested_trans_close(self):
+ with testing.db.connect() as connection:
+ t1 = connection.begin()
+ assert connection.connection._reset_agent is t1
+ t2 = connection.begin_nested()
+ assert connection.connection._reset_agent is t1
+ assert connection._Connection__transaction is t2
+ t2.close()
+ assert connection._Connection__transaction is t1
+ assert connection.connection._reset_agent is t1
+ t1.close()
+ assert connection.connection._reset_agent is None
+ assert not t1.is_active
+
+ @testing.requires.savepoints
+ def test_begin_nested_trans_rollback(self):
+ with testing.db.connect() as connection:
+ t1 = connection.begin()
+ assert connection.connection._reset_agent is t1
+ t2 = connection.begin_nested()
+ assert connection.connection._reset_agent is t1
+ assert connection._Connection__transaction is t2
+ t2.close()
+ assert connection._Connection__transaction is t1
+ assert connection.connection._reset_agent is t1
+ t1.rollback()
+ assert connection.connection._reset_agent is None
+ assert not t1.is_active
+
@testing.requires.savepoints
def test_begin_nested_close(self):
with testing.db.connect() as connection:
sess.rollback,
)
+ @testing.requires.independent_connections
@testing.emits_warning(".*previous exception")
def test_failed_rollback_deactivates_transaction(self):
# test #4050
# outermost is active
eq_(session.transaction._state, _session.ACTIVE)
+ @testing.requires.independent_connections
@testing.emits_warning(".*previous exception")
def test_failed_rollback_deactivates_transaction_ctx_integration(self):
# test #4050 in the same context as that of oslo.db