]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- :class:`.Connection` now associates a new
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Jan 2014 00:43:13 +0000 (19:43 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Jan 2014 00:43:13 +0000 (19:43 -0500)
:class:`.RootTransaction` or :class:`.TwoPhaseTransaction`
with its immediate :class:`._ConnectionFairy` as a "reset handler"
for the span of that transaction, which takes over the task
of calling commit() or rollback() for the "reset on return" behavior
of :class:`.Pool` if the transaction was not otherwise completed.
This resolves the issue that a picky transaction
like that of MySQL two-phase will be
properly closed out when the connection is closed without an
explicit rollback or commit (e.g. no longer raises "XAER_RMFAIL"
in this case - note this only shows up in logging as the exception
is not propagated within pool reset).
This issue would arise e.g. when using an orm
:class:`.Session` with ``twophase`` set, and then
:meth:`.Session.close` is called without an explicit rollback or
commit.   The change also has the effect that you will now see
an explicit "ROLLBACK" in the logs when using a :class:`.Session`
object in non-autocommit mode regardless of how that session was
discarded.  Thanks to Jeff Dairiki and Laurence Rowe for isolating
the issue here. [ticket:2907]

doc/build/changelog/changelog_09.rst
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/pool.py
test/engine/test_pool.py
test/engine/test_transaction.py

index e6a77378a5edd1f7d7f1d8773dae2b07a82f5125..74f92da06aea9dd7e42ac7e9a0e13c91e2b35201 100644 (file)
 .. changelog::
     :version: 0.9.2
 
+    .. change::
+        :tags: bug, mysql, pool, engine
+        :tickets: 2907
+
+        :class:`.Connection` now associates a new
+        :class:`.RootTransaction` or :class:`.TwoPhaseTransaction`
+        with its immediate :class:`._ConnectionFairy` as a "reset handler"
+        for the span of that transaction, which takes over the task
+        of calling commit() or rollback() for the "reset on return" behavior
+        of :class:`.Pool` if the transaction was not otherwise completed.
+        This resolves the issue that a picky transaction
+        like that of MySQL two-phase will be
+        properly closed out when the connection is closed without an
+        explicit rollback or commit (e.g. no longer raises "XAER_RMFAIL"
+        in this case - note this only shows up in logging as the exception
+        is not propagated within pool reset).
+        This issue would arise e.g. when using an orm
+        :class:`.Session` with ``twophase`` set, and then
+        :meth:`.Session.close` is called without an explicit rollback or
+        commit.   The change also has the effect that you will now see
+        an explicit "ROLLBACK" in the logs when using a :class:`.Session`
+        object in non-autocommit mode regardless of how that session was
+        discarded.  Thanks to Jeff Dairiki and Laurence Rowe for isolating
+        the issue here.
+
     .. change::
         :tags: feature, pool, engine
 
index ff2e6e282678cc8d2b8d1ef9fdef9dff167baea8..5c66f4806e9e17b9fb9f52985f5e82f7b56c306b 100644 (file)
@@ -404,7 +404,7 @@ class Connection(Connectable):
         """
 
         if self.__transaction is None:
-            self.__transaction = RootTransaction(self)
+            self.__transaction = self.connection._reset_agent = RootTransaction(self)
             return self.__transaction
         else:
             return Transaction(self, self.__transaction)
@@ -425,7 +425,7 @@ class Connection(Connectable):
         """
 
         if self.__transaction is None:
-            self.__transaction = RootTransaction(self)
+            self.__transaction = self.connection._reset_agent = RootTransaction(self)
         else:
             self.__transaction = NestedTransaction(self, self.__transaction)
         return self.__transaction
@@ -453,7 +453,7 @@ class Connection(Connectable):
                 "is already in progress.")
         if xid is None:
             xid = self.engine.dialect.create_xid()
-        self.__transaction = TwoPhaseTransaction(self, xid)
+        self.__transaction = self.connection._reset_agent = TwoPhaseTransaction(self, xid)
         return self.__transaction
 
     def recover_twophase(self):
@@ -491,11 +491,11 @@ class Connection(Connectable):
                 self.engine.logger.info("ROLLBACK")
             try:
                 self.engine.dialect.do_rollback(self.connection)
-                self.__transaction = None
+                self.__transaction = self.connection._reset_agent = None
             except Exception as e:
                 self._handle_dbapi_exception(e, None, None, None, None)
         else:
-            self.__transaction = None
+            self.__transaction = self.connection._reset_agent = None
 
     def _commit_impl(self, autocommit=False):
         if self._has_events:
@@ -505,7 +505,7 @@ class Connection(Connectable):
             self.engine.logger.info("COMMIT")
         try:
             self.engine.dialect.do_commit(self.connection)
-            self.__transaction = None
+            self.__transaction = self.connection._reset_agent = None
         except Exception as e:
             self._handle_dbapi_exception(e, None, None, None, None)
 
@@ -560,7 +560,7 @@ class Connection(Connectable):
         if self._still_open_and_connection_is_valid:
             assert isinstance(self.__transaction, TwoPhaseTransaction)
             self.engine.dialect.do_rollback_twophase(self, xid, is_prepared)
-        self.__transaction = None
+        self.__transaction = self.connection._reset_agent = None
 
     def _commit_twophase_impl(self, xid, is_prepared):
         if self._has_events:
@@ -569,7 +569,7 @@ class Connection(Connectable):
         if self._still_open_and_connection_is_valid:
             assert isinstance(self.__transaction, TwoPhaseTransaction)
             self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
-        self.__transaction = None
+        self.__transaction = self.connection._reset_agent = None
 
     def _autorollback(self):
         if not self.in_transaction():
index 0f0a2ac1084e5dd7049720ca74121de7708000af..f84f331d5f8824fb2bfd06aae4c980690806b82d 100644 (file)
@@ -479,18 +479,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
 
         try:
             fairy = fairy or _ConnectionFairy(connection, connection_record)
-            if pool.dispatch.reset:
-                pool.dispatch.reset(fairy, connection_record)
-            if pool._reset_on_return is reset_rollback:
-                if echo:
-                    pool.logger.debug("Connection %s rollback-on-return",
-                                                    connection)
-                pool._dialect.do_rollback(fairy)
-            elif pool._reset_on_return is reset_commit:
-                if echo:
-                    pool.logger.debug("Connection %s commit-on-return",
-                                                    connection)
-                pool._dialect.do_commit(fairy)
+            assert fairy.connection is connection
+            fairy._reset(pool, echo)
 
             # Immediately close detached instances
             if not connection_record:
@@ -542,6 +532,23 @@ class _ConnectionFairy(object):
 
     """
 
+    _reset_agent = None
+    """Refer to an object with a ``.commit()`` and ``.rollback()`` method;
+    if non-None, the "reset-on-return" feature will call upon this object
+    rather than directly against the dialect-level do_rollback() and do_commit()
+    methods.
+
+    In practice, a :class:`.Connection` assigns a :class:`.Transaction` object
+    to this variable when one is in scope so that the :class:`.Transaction`
+    takes the job of committing or rolling back on return if
+    :meth:`.Connection.close` is called while the :class:`.Transaction`
+    still exists.
+
+    This is essentially an "event handler" of sorts but is simplified as an
+    instance variable both for performance/simplicity as well as that there
+    can only be one "reset agent" at a time.
+    """
+
     @classmethod
     def _checkout(cls, pool, threadconns=None, fairy=None):
         if not fairy:
@@ -591,6 +598,30 @@ class _ConnectionFairy(object):
 
     _close = _checkin
 
+    def _reset(self, pool, echo):
+        if pool.dispatch.reset:
+            pool.dispatch.reset(self, self._connection_record)
+        if pool._reset_on_return is reset_rollback:
+            if echo:
+                pool.logger.debug("Connection %s rollback-on-return%s",
+                                                self.connection,
+                                                ", via agent"
+                                                if self._reset_agent else "")
+            if self._reset_agent:
+                self._reset_agent.rollback()
+            else:
+                pool._dialect.do_rollback(self)
+        elif pool._reset_on_return is reset_commit:
+            if echo:
+                pool.logger.debug("Connection %s commit-on-return%s",
+                                                self.connection,
+                                                ", via agent"
+                                                if self._reset_agent else "")
+            if self._reset_agent:
+                self._reset_agent.commit()
+            else:
+                pool._dialect.do_commit(self)
+
     @property
     def _logger(self):
         return self._pool.logger
index 10f490b48927bf41cf0f5e6ff60b64c983035e02..2e4c2dc486359c1cb0428b101c5bf87ef5268f45 100644 (file)
@@ -1328,6 +1328,96 @@ class QueuePoolTest(PoolTestBase):
         c2 = p.connect()
         assert c2.connection is not None
 
+class ResetOnReturnTest(PoolTestBase):
+    def _fixture(self, **kw):
+        dbapi = Mock()
+        return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+
+    def test_plain_rollback(self):
+        dbapi, p = self._fixture(reset_on_return='rollback')
+
+        c1 = p.connect()
+        c1.close()
+        assert dbapi.connect().rollback.called
+        assert not dbapi.connect().commit.called
+
+    def test_plain_commit(self):
+        dbapi, p = self._fixture(reset_on_return='commit')
+
+        c1 = p.connect()
+        c1.close()
+        assert not dbapi.connect().rollback.called
+        assert dbapi.connect().commit.called
+
+    def test_plain_none(self):
+        dbapi, p = self._fixture(reset_on_return=None)
+
+        c1 = p.connect()
+        c1.close()
+        assert not dbapi.connect().rollback.called
+        assert not dbapi.connect().commit.called
+
+    def test_agent_rollback(self):
+        dbapi, p = self._fixture(reset_on_return='rollback')
+
+        class Agent(object):
+            def __init__(self, conn):
+                self.conn = conn
+
+            def rollback(self):
+                self.conn.special_rollback()
+
+            def commit(self):
+                self.conn.special_commit()
+
+        c1 = p.connect()
+        c1._reset_agent = Agent(c1)
+        c1.close()
+
+        assert dbapi.connect().special_rollback.called
+        assert not dbapi.connect().special_commit.called
+
+        assert not dbapi.connect().rollback.called
+        assert not dbapi.connect().commit.called
+
+        c1 = p.connect()
+        c1.close()
+        eq_(dbapi.connect().special_rollback.call_count, 1)
+        eq_(dbapi.connect().special_commit.call_count, 0)
+
+        assert dbapi.connect().rollback.called
+        assert not dbapi.connect().commit.called
+
+    def test_agent_commit(self):
+        dbapi, p = self._fixture(reset_on_return='commit')
+
+        class Agent(object):
+            def __init__(self, conn):
+                self.conn = conn
+
+            def rollback(self):
+                self.conn.special_rollback()
+
+            def commit(self):
+                self.conn.special_commit()
+
+        c1 = p.connect()
+        c1._reset_agent = Agent(c1)
+        c1.close()
+        assert not dbapi.connect().special_rollback.called
+        assert dbapi.connect().special_commit.called
+
+        assert not dbapi.connect().rollback.called
+        assert not dbapi.connect().commit.called
+
+        c1 = p.connect()
+        c1.close()
+
+        eq_(dbapi.connect().special_rollback.call_count, 0)
+        eq_(dbapi.connect().special_commit.call_count, 1)
+        assert not dbapi.connect().rollback.called
+        assert dbapi.connect().commit.called
+
 class SingletonThreadPoolTest(PoolTestBase):
 
     @testing.requires.threading_with_mock
index 9c45cfddcf51ac68da39c05049c197c823c6b51e..e3f5fc25211be5589231a6c4d8d371ac7acb6c5b 100644 (file)
@@ -3,6 +3,7 @@ from sqlalchemy.testing import eq_, assert_raises, \
 import sys
 import time
 import threading
+from sqlalchemy import event
 from sqlalchemy.testing.engines import testing_engine
 from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \
     select, Integer, String, func, text, exc
@@ -378,6 +379,138 @@ class TransactionTest(fixtures.TestBase):
         eq_(result.fetchall(), [('user1', ), ('user4', )])
         conn.close()
 
+    @testing.requires.two_phase_transactions
+    def test_reset_rollback_two_phase_no_rollback(self):
+        # test [ticket:2907], essentially that the
+        # TwoPhaseTransaction is given the job of "reset on return"
+        # so that picky backends like MySQL correctly clear out
+        # their state when a connection is closed without handling
+        # the transaction explicitly.
+
+        eng = testing_engine()
+
+        # MySQL raises if you call straight rollback() on
+        # a connection with an XID present
+        @event.listens_for(eng, "invalidate")
+        def conn_invalidated(dbapi_con, con_record, exception):
+            dbapi_con.close()
+            raise exception
+
+        with eng.connect() as conn:
+            rec = conn.connection._connection_record
+            raw_dbapi_con = rec.connection
+            xa = conn.begin_twophase()
+            conn.execute(users.insert(), user_id=1, user_name='user1')
+
+        assert rec.connection is raw_dbapi_con
+
+        with eng.connect() as conn:
+            result = \
+                conn.execute(select([users.c.user_name]).
+                    order_by(users.c.user_id))
+            eq_(result.fetchall(), [])
+
+class ResetAgentTest(fixtures.TestBase):
+    def test_begin_close(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            assert connection.connection._reset_agent is trans
+        assert not trans.is_active
+
+    def test_begin_rollback(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            assert connection.connection._reset_agent is trans
+            trans.rollback()
+            assert connection.connection._reset_agent is None
+
+    def test_begin_commit(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            assert connection.connection._reset_agent is trans
+            trans.commit()
+            assert connection.connection._reset_agent is None
+
+    @testing.requires.savepoints
+    def test_begin_nested_close(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin_nested()
+            assert connection.connection._reset_agent is trans
+        assert not trans.is_active
+
+    @testing.requires.savepoints
+    def test_begin_begin_nested_close(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin_nested()
+            assert connection.connection._reset_agent is trans
+        assert trans2.is_active  # was never closed
+        assert not trans.is_active
+
+    @testing.requires.savepoints
+    def test_begin_begin_nested_rollback_commit(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin_nested()
+            assert connection.connection._reset_agent is trans
+            trans2.rollback()
+            assert connection.connection._reset_agent is trans
+            trans.commit()
+            assert connection.connection._reset_agent is None
+
+    @testing.requires.savepoints
+    def test_begin_begin_nested_rollback_rollback(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin_nested()
+            assert connection.connection._reset_agent is trans
+            trans2.rollback()
+            assert connection.connection._reset_agent is trans
+            trans.rollback()
+            assert connection.connection._reset_agent is None
+
+    def test_begin_begin_rollback_rollback(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin()
+            assert connection.connection._reset_agent is trans
+            trans2.rollback()
+            assert connection.connection._reset_agent is None
+            trans.rollback()
+            assert connection.connection._reset_agent is None
+
+    def test_begin_begin_commit_commit(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin()
+            assert connection.connection._reset_agent is trans
+            trans2.commit()
+            assert connection.connection._reset_agent is trans
+            trans.commit()
+            assert connection.connection._reset_agent is None
+
+    @testing.requires.two_phase_transactions
+    def test_reset_via_agent_begin_twophase(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin_twophase()
+            assert connection.connection._reset_agent is trans
+
+    @testing.requires.two_phase_transactions
+    def test_reset_via_agent_begin_twophase_commit(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin_twophase()
+            assert connection.connection._reset_agent is trans
+            trans.commit()
+            assert connection.connection._reset_agent is None
+
+    @testing.requires.two_phase_transactions
+    def test_reset_via_agent_begin_twophase_rollback(self):
+        with testing.db.connect() as connection:
+            trans = connection.begin_twophase()
+            assert connection.connection._reset_agent is trans
+            trans.rollback()
+            assert connection.connection._reset_agent is None
+
 class AutoRollbackTest(fixtures.TestBase):
 
     @classmethod