]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Replace reset_agent with direct call from connection
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 5 Mar 2021 22:34:10 +0000 (17:34 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 7 Mar 2021 04:15:17 +0000 (23:15 -0500)
Fixed a regression where the "reset agent" of the connection pool wasn't
really being utilized by the :class:`_engine.Connection` when it were
closed, and also leading to a double-rollback scenario that was somewhat
wasteful.   The newer architecture of the engine has been updated so that
the connection pool "reset-on-return" logic will be skipped when the
:class:`_engine.Connection` explicitly closes out the transaction before
returning the pool to the connection.

Fixes: #6004
Change-Id: I5d2ac16cac71aa45a00b4b7481d7268bd828a168

doc/build/changelog/unreleased_14/6004.rst [new file with mode: 0644]
doc/build/core/pooling.rst
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/pool/base.py
test/engine/test_deprecations.py
test/engine/test_logging.py
test/engine/test_pool.py
test/engine/test_transaction.py

diff --git a/doc/build/changelog/unreleased_14/6004.rst b/doc/build/changelog/unreleased_14/6004.rst
new file mode 100644 (file)
index 0000000..11afbbd
--- /dev/null
@@ -0,0 +1,11 @@
+.. change::
+    :tags: bug, regression, engine
+    :tickets: 6004
+
+    Fixed a regression where the "reset agent" of the connection pool wasn't
+    really being utilized by the :class:`_engine.Connection` when it were
+    closed, and also leading to a double-rollback scenario that was somewhat
+    wasteful.   The newer architecture of the engine has been updated so that
+    the connection pool "reset-on-return" logic will be skipped when the
+    :class:`_engine.Connection` explicitly closes out the transaction before
+    returning the pool to the connection.
index 4c852de3210e715aeb30c8afa25c979d8bd179c3..796f09e2e0108ec6f31af6c8add7209fc0631d5f 100644 (file)
@@ -101,9 +101,9 @@ by any additional options::
 
     mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5)
 
-DBAPI connections can then be procured from the pool using the :meth:`_pool.Pool.connect`
-function.  The return value of this method is a DBAPI connection that's contained
-within a transparent proxy::
+DBAPI connections can then be procured from the pool using the
+:meth:`_pool.Pool.connect` function. The return value of this method is a DBAPI
+connection that's contained within a transparent proxy::
 
     # get a connection
     conn = mypool.connect()
@@ -120,23 +120,52 @@ pool::
     # it to the pool.
     conn.close()
 
-The proxy also returns its contained DBAPI connection to the pool
-when it is garbage collected,
-though it's not deterministic in Python that this occurs immediately (though
-it is typical with cPython).
-
-The ``close()`` step also performs the important step of calling the
-``rollback()`` method of the DBAPI connection.   This is so that any
-existing transaction on the connection is removed, not only ensuring
-that no existing state remains on next usage, but also so that table
-and row locks are released as well as that any isolated data snapshots
-are removed.   This behavior can be disabled using the ``reset_on_return``
-option of :class:`_pool.Pool`.
-
-A particular pre-created :class:`_pool.Pool` can be shared with one or more
-engines by passing it to the ``pool`` argument of :func:`_sa.create_engine`::
-
-    e = create_engine('postgresql://', pool=mypool)
+The proxy also returns its contained DBAPI connection to the pool when it is
+garbage collected, though it's not deterministic in Python that this occurs
+immediately (though it is typical with cPython). This usage is not recommended
+however and in particular is not supported with asyncio DBAPI drivers.
+
+.. _pool_reset_on_return:
+
+Reset On Return
+---------------
+
+The pool also includes the a "reset on return" feature which will call the
+``rollback()`` method of the DBAPI connection when the connection is returned
+to the pool. This is so that any existing
+transaction on the connection is removed, not only ensuring that no existing
+state remains on next usage, but also so that table and row locks are released
+as well as that any isolated data snapshots are removed.   This ``rollback()``
+occurs in most cases even when using an :class:`_engine.Engine` object,
+except in the case when the :class:`_engine.Connection` can guarantee
+that a ``rollback()`` has been called immediately before the connection
+is returned to the pool.
+
+For most DBAPIs, the call to ``rollback()`` is very inexpensive and if the
+DBAPI has already completed a transaction, the method should be a no-op.
+However, for DBAPIs that incur performance issues with ``rollback()`` even if
+there's no state on the connection, this behavior can be disabled using the
+``reset_on_return`` option of :class:`_pool.Pool`.   The behavior is safe
+to disable under the following conditions:
+
+* If the database does not support transactions at all, such as using
+  MySQL with the MyISAM engine, or the DBAPI is used in autocommit
+  mode only, the behavior can be disabled.
+* If the pool itself doesn't maintain a connection after it's checked in,
+  such as when using :class:`.NullPool`, the behavior can be disabled.
+* Otherwise, it must be ensured that:
+  * the application ensures that all :class:`_engine.Connection`
+    objects are explicitly closed out using a context manager (i.e. ``with``
+    block) or a ``try/finally`` style block
+  * connections are never allowed to be garbage collected before being explicitly
+    closed.
+  * the DBAPI connection itself, e.g. ``connection.connection``, is not used
+    directly, or the application ensures that ``.rollback()`` is called
+    on this connection before releasing it back to the connection pool.
+
+The "reset on return" step may be logged using the ``logging.DEBUG``
+log level along with the ``sqlalchemy.pool`` logger, or by setting
+``echo_pool='debug'`` with :func:`_sa.create_engine`.
 
 Pool Events
 -----------
index b5a37f67e368eae15858edf1c1e5a0d70c46049c..362c811eeb60766fb62f8b1faf5fc3907ae32da9 100644 (file)
@@ -1067,20 +1067,19 @@ class Connection(Connectable):
 
         if self._transaction:
             self._transaction.close()
+            skip_reset = True
+        else:
+            skip_reset = False
 
         if self._dbapi_connection is not None:
             conn = self._dbapi_connection
 
-            # this will do a reset-on-return every time, even if we
-            # called rollback() already. it might be worth optimizing
-            # this for the case that we are able to close without issue
-            conn.close()
-
-            # this is in fact never true outside of a bunch of
-            # artificial scenarios created by the test suite and its
-            # fixtures.  the reset_agent should no longer be necessary.
-            if conn._reset_agent is self._transaction:
-                conn._reset_agent = None
+            # as we just closed the transaction, close the connection
+            # pool connection without doing an additional reset
+            if skip_reset:
+                conn._close_no_reset()
+            else:
+                conn.close()
 
             # There is a slight chance that conn.close() may have
             # triggered an invalidation here in which case
@@ -2309,36 +2308,14 @@ class RootTransaction(Transaction):
 
         self.is_active = True
 
-        # the SingletonThreadPool used with sqlite memory can share the same
-        # DBAPI connection / fairy among multiple Connection objects.  while
-        # this is not ideal, it is a still-supported use case which at the
-        # moment occurs in the test suite due to how some of pytest fixtures
-        # work out
-        if connection._dbapi_connection._reset_agent is None:
-            connection._dbapi_connection._reset_agent = self
-
     def _deactivate_from_connection(self):
         if self.is_active:
             assert self.connection._transaction is self
             self.is_active = False
 
-            if (
-                self.connection._dbapi_connection is not None
-                and self.connection._dbapi_connection._reset_agent is self
-            ):
-                self.connection._dbapi_connection._reset_agent = None
-
         elif self.connection._transaction is not self:
             util.warn("transaction already deassociated from connection")
 
-        # we have tests that want to make sure the pool handles this
-        # correctly.  TODO: how to disable internal assertions cleanly?
-        # else:
-        #    if self.connection._dbapi_connection is not None:
-        #        assert (
-        #            self.connection._dbapi_connection._reset_agent is not self
-        #        )
-
     def _do_deactivate(self):
         # called from a MarkerTransaction to cancel this root transaction.
         # the transaction stays in place as connection._transaction, but
index 9b4e61fc3fd336688b267017e92789e6cf8e8771..6ec4896046d6c5061ce3b6b4887e4be483177401 100644 (file)
@@ -106,7 +106,9 @@ class Pool(log.Identified):
              logging.
 
         :param reset_on_return: Determine steps to take on
-          connections as they are returned to the pool.
+          connections as they are returned to the pool, which were
+          not otherwise handled by a :class:`_engine.Connection`.
+
           reset_on_return can have any of these values:
 
           * ``"rollback"`` - call rollback() on the connection,
@@ -124,22 +126,19 @@ class Pool(log.Identified):
             any data changes present on the transaction
             are committed unconditionally.
           * ``None`` - don't do anything on the connection.
-            This setting should generally only be made on a database
-            that has no transaction support at all,
-            namely MySQL MyISAM; when used on this backend, performance
-            can be improved as the "rollback" call is still expensive on
-            MySQL.   It is **strongly recommended** that this setting not be
-            used for transaction-supporting databases in conjunction with
-            a persistent pool such as :class:`.QueuePool`, as it opens
-            the possibility for connections still in a transaction to be
-            idle in the pool.   The setting may be appropriate in the
-            case of :class:`.NullPool` or special circumstances where
-            the connection pool in use is not being used to maintain connection
-            lifecycle.
+            This setting is only appropriate if the database / DBAPI
+            works in pure "autocommit" mode at all times, or if the
+            application uses the :class:`_engine.Engine` with consistent
+            connectivity patterns.   See the section
+            :ref:`pool_reset_on_return` for more details.
 
           * ``False`` - same as None, this is here for
             backwards compatibility.
 
+         .. seealso::
+
+            :ref:`pool_reset_on_return`
+
         :param events: a list of 2-tuples, each of the form
          ``(callable, target)`` which will be passed to :func:`.event.listen`
          upon construction.   Provided here so that event listeners
@@ -429,7 +428,7 @@ class _ConnectionRecord(object):
         rec.fairy_ref = ref = weakref.ref(
             fairy,
             lambda ref: _finalize_fairy
-            and _finalize_fairy(None, rec, pool, ref, echo),
+            and _finalize_fairy(None, rec, pool, ref, echo, True),
         )
         _strong_ref_connection_records[ref] = rec
         if echo:
@@ -612,6 +611,7 @@ def _finalize_fairy(
     pool,
     ref,  # this is None when called directly, not by the gc
     echo,
+    reset=True,
     fairy=None,
 ):
     """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
@@ -647,7 +647,11 @@ def _finalize_fairy(
     if connection is not None:
         if connection_record and echo:
             pool.logger.debug(
-                "Connection %r being returned to pool", connection
+                "Connection %r being returned to pool%s",
+                connection,
+                ", transaction state was already reset by caller"
+                if not reset
+                else "",
             )
 
         try:
@@ -655,7 +659,7 @@ def _finalize_fairy(
                 connection, connection_record, echo
             )
             assert fairy.connection is connection
-            if can_manipulate_connection:
+            if reset and can_manipulate_connection:
                 fairy._reset(pool)
 
             if detach:
@@ -738,24 +742,6 @@ class _ConnectionFairy(object):
 
     """
 
-    _reset_agent = None
-    """Refer to an object with a ``.commit()`` and ``.rollback()`` method;
-    if non-None, the "reset-on-return" feature will call upon this object
-    rather than directly against the dialect-level do_rollback() and
-    do_commit() methods.
-
-    In practice, a :class:`_engine.Connection` assigns a :class:`.Transaction`
-    object
-    to this variable when one is in scope so that the :class:`.Transaction`
-    takes the job of committing or rolling back on return if
-    :meth:`_engine.Connection.close` is called while the :class:`.Transaction`
-    still exists.
-
-    This is essentially an "event handler" of sorts but is simplified as an
-    instance variable both for performance/simplicity as well as that there
-    can only be one "reset agent" at a time.
-    """
-
     @classmethod
     def _checkout(cls, pool, threadconns=None, fairy=None):
         if not fairy:
@@ -856,13 +842,14 @@ class _ConnectionFairy(object):
     def _checkout_existing(self):
         return _ConnectionFairy._checkout(self._pool, fairy=self)
 
-    def _checkin(self):
+    def _checkin(self, reset=True):
         _finalize_fairy(
             self.connection,
             self._connection_record,
             self._pool,
             None,
             self._echo,
+            reset=reset,
             fairy=self,
         )
         self.connection = None
@@ -876,41 +863,16 @@ class _ConnectionFairy(object):
         if pool._reset_on_return is reset_rollback:
             if self._echo:
                 pool.logger.debug(
-                    "Connection %s rollback-on-return%s",
-                    self.connection,
-                    ", via agent" if self._reset_agent else "",
+                    "Connection %s rollback-on-return", self.connection
                 )
-            if self._reset_agent:
-                if not self._reset_agent.is_active:
-                    util.warn(
-                        "Reset agent is not active.  "
-                        "This should not occur unless there was already "
-                        "a connectivity error in progress."
-                    )
-                    pool._dialect.do_rollback(self)
-                else:
-                    self._reset_agent.rollback()
-            else:
-                pool._dialect.do_rollback(self)
+            pool._dialect.do_rollback(self)
         elif pool._reset_on_return is reset_commit:
             if self._echo:
                 pool.logger.debug(
-                    "Connection %s commit-on-return%s",
+                    "Connection %s commit-on-return",
                     self.connection,
-                    ", via agent" if self._reset_agent else "",
                 )
-            if self._reset_agent:
-                if not self._reset_agent.is_active:
-                    util.warn(
-                        "Reset agent is not active.  "
-                        "This should not occur unless there was already "
-                        "a connectivity error in progress."
-                    )
-                    pool._dialect.do_commit(self)
-                else:
-                    self._reset_agent.commit()
-            else:
-                pool._dialect.do_commit(self)
+            pool._dialect.do_commit(self)
 
     @property
     def _logger(self):
@@ -1032,3 +994,8 @@ class _ConnectionFairy(object):
         self._counter -= 1
         if self._counter == 0:
             self._checkin()
+
+    def _close_no_reset(self):
+        self._counter -= 1
+        if self._counter == 0:
+            self._checkin(reset=False)
index 4758d49eb6c7a0f8d0be47bb9ab8411d720f7854..a08725921a415d909953f9deb15d0add32fe39c5 100644 (file)
@@ -40,6 +40,7 @@ from sqlalchemy.testing.engines import testing_engine
 from sqlalchemy.testing.mock import Mock
 from sqlalchemy.testing.schema import Column
 from sqlalchemy.testing.schema import Table
+from .test_transaction import ResetFixture
 
 
 def _string_deprecation_expect():
@@ -424,7 +425,7 @@ class CreateEngineTest(fixtures.TestBase):
             )
 
 
-class TransactionTest(fixtures.TablesTest):
+class TransactionTest(ResetFixture, fixtures.TablesTest):
     __backend__ = True
 
     @classmethod
@@ -475,33 +476,43 @@ class TransactionTest(fixtures.TablesTest):
         with testing.db.connect() as conn:
             eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
 
-    def test_begin_begin_rollback_rollback(self):
-        with testing.db.connect() as connection:
+    def test_begin_begin_rollback_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
             with testing.expect_deprecated_20(
                 r"Calling .begin\(\) when a transaction is already "
                 "begun, creating a 'sub' transaction"
             ):
                 trans2 = connection.begin()
-            assert connection.connection._reset_agent is trans
             trans2.rollback()
-            assert connection.connection._reset_agent is None
             trans.rollback()
-            assert connection.connection._reset_agent is None
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
 
-    def test_begin_begin_commit_commit(self):
-        with testing.db.connect() as connection:
+    def test_begin_begin_commit_commit(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
             with testing.expect_deprecated_20(
                 r"Calling .begin\(\) when a transaction is already "
                 "begun, creating a 'sub' transaction"
             ):
                 trans2 = connection.begin()
-            assert connection.connection._reset_agent is trans
             trans2.commit()
-            assert connection.connection._reset_agent is trans
             trans.commit()
-            assert connection.connection._reset_agent is None
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.commit(connection),
+                mock.call.do_commit(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
 
     def test_branch_nested_rollback(self, local_connection):
         connection = local_connection
index 2c90b9bf11442ea27dc9d716692938fe7430051f..c5f8b69b64d38f6274f4c9f9854b280df012aed6 100644 (file)
@@ -460,14 +460,14 @@ class PoolLoggingTest(fixtures.TestBase):
             [
                 "Created new connection %r",
                 "Connection %r checked out from pool",
-                "Connection %r being returned to pool",
-                "Connection %s rollback-on-return%s",
+                "Connection %r being returned to pool%s",
+                "Connection %s rollback-on-return",
                 "Connection %r checked out from pool",
-                "Connection %r being returned to pool",
-                "Connection %s rollback-on-return%s",
+                "Connection %r being returned to pool%s",
+                "Connection %s rollback-on-return",
                 "Connection %r checked out from pool",
-                "Connection %r being returned to pool",
-                "Connection %s rollback-on-return%s",
+                "Connection %r being returned to pool%s",
+                "Connection %s rollback-on-return",
                 "Closing connection %r",
             ]
             + (["Pool disposed. %s"] if dispose else []),
index f29373e955dcf7f026f86a2aad65cf812c85fda3..3c2257331daf325355f292080b94de2851695ef4 100644 (file)
@@ -1817,92 +1817,6 @@ class ResetOnReturnTest(PoolTestBase):
         assert not dbapi.connect().rollback.called
         assert not dbapi.connect().commit.called
 
-    def test_agent_rollback(self):
-        dbapi, p = self._fixture(reset_on_return="rollback")
-
-        class Agent(object):
-            def __init__(self, conn):
-                self.conn = conn
-
-            is_active = True
-
-            def rollback(self):
-                self.conn.special_rollback()
-
-            def commit(self):
-                self.conn.special_commit()
-
-        c1 = p.connect()
-        c1._reset_agent = Agent(c1)
-        c1.close()
-
-        assert dbapi.connect().special_rollback.called
-        assert not dbapi.connect().special_commit.called
-
-        assert not dbapi.connect().rollback.called
-        assert not dbapi.connect().commit.called
-
-        c1 = p.connect()
-        c1.close()
-        eq_(dbapi.connect().special_rollback.call_count, 1)
-        eq_(dbapi.connect().special_commit.call_count, 0)
-
-        assert dbapi.connect().rollback.called
-        assert not dbapi.connect().commit.called
-
-    def test_agent_commit(self):
-        dbapi, p = self._fixture(reset_on_return="commit")
-
-        class Agent(object):
-            def __init__(self, conn):
-                self.conn = conn
-
-            is_active = True
-
-            def rollback(self):
-                self.conn.special_rollback()
-
-            def commit(self):
-                self.conn.special_commit()
-
-        c1 = p.connect()
-        c1._reset_agent = Agent(c1)
-        c1.close()
-        assert not dbapi.connect().special_rollback.called
-        assert dbapi.connect().special_commit.called
-
-        assert not dbapi.connect().rollback.called
-        assert not dbapi.connect().commit.called
-
-        c1 = p.connect()
-        c1.close()
-
-        eq_(dbapi.connect().special_rollback.call_count, 0)
-        eq_(dbapi.connect().special_commit.call_count, 1)
-        assert not dbapi.connect().rollback.called
-        assert dbapi.connect().commit.called
-
-    def test_reset_agent_disconnect(self):
-        dbapi, p = self._fixture(reset_on_return="rollback")
-
-        class Agent(object):
-            def __init__(self, conn):
-                self.conn = conn
-
-            def rollback(self):
-                p._invalidate(self.conn)
-                raise Exception("hi")
-
-            def commit(self):
-                self.conn.commit()
-
-        c1 = p.connect()
-        c1._reset_agent = Agent(c1)
-        c1.close()
-
-        # no warning raised.  We know it would warn due to
-        # QueuePoolTest.test_no_double_checkin
-
 
 class SingletonThreadPoolTest(PoolTestBase):
     @testing.requires.threading_with_mock
index 7ba189d3d6dc80b9469199d4e9743573fad75257..07408e386eb5540ba26af85bb093e02344cc22cb 100644 (file)
@@ -477,174 +477,6 @@ class TransactionTest(fixtures.TablesTest):
             eq_(result.fetchall(), [])
 
 
-class ResetAgentTest(fixtures.TestBase):
-    __backend__ = True
-
-    def test_begin_close(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin()
-            assert connection.connection._reset_agent is trans
-        assert not trans.is_active
-
-    def test_begin_rollback(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin()
-            assert connection.connection._reset_agent is trans
-            trans.rollback()
-            assert connection.connection._reset_agent is None
-
-    def test_begin_commit(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin()
-            assert connection.connection._reset_agent is trans
-            trans.commit()
-            assert connection.connection._reset_agent is None
-
-    def test_trans_close(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin()
-            assert connection.connection._reset_agent is trans
-            trans.close()
-            assert connection.connection._reset_agent is None
-
-    def test_trans_reset_agent_broken_ensure(self):
-        eng = testing_engine()
-        conn = eng.connect()
-        trans = conn.begin()
-        assert conn.connection._reset_agent is trans
-        trans.is_active = False
-
-        with expect_warnings("Reset agent is not active"):
-            conn.close()
-
-    def test_trans_commit_reset_agent_broken_ensure_pool(self):
-        eng = testing_engine(options={"pool_reset_on_return": "commit"})
-        conn = eng.connect()
-        trans = conn.begin()
-        assert conn.connection._reset_agent is trans
-        trans.is_active = False
-
-        with expect_warnings("Reset agent is not active"):
-            conn.close()
-
-    @testing.requires.savepoints
-    def test_begin_nested_trans_close_one(self):
-        with testing.db.connect() as connection:
-            t1 = connection.begin()
-            assert connection.connection._reset_agent is t1
-            t2 = connection.begin_nested()
-            assert connection.connection._reset_agent is t1
-            assert connection._nested_transaction is t2
-            assert connection._transaction is t1
-            t2.close()
-            assert connection._nested_transaction is None
-            assert connection._transaction is t1
-            assert connection.connection._reset_agent is t1
-            t1.close()
-            assert connection.connection._reset_agent is None
-        assert not t1.is_active
-
-    @testing.requires.savepoints
-    def test_begin_nested_trans_close_two(self):
-        with testing.db.connect() as connection:
-            t1 = connection.begin()
-            assert connection.connection._reset_agent is t1
-            t2 = connection.begin_nested()
-            assert connection.connection._reset_agent is t1
-            assert connection._nested_transaction is t2
-            assert connection._transaction is t1
-
-            assert connection.connection._reset_agent is t1
-            t1.close()
-
-            assert connection._nested_transaction is None
-            assert connection._transaction is None
-
-            assert connection.connection._reset_agent is None
-        assert not t1.is_active
-
-    @testing.requires.savepoints
-    def test_begin_nested_trans_rollback(self):
-        with testing.db.connect() as connection:
-            t1 = connection.begin()
-            assert connection.connection._reset_agent is t1
-            t2 = connection.begin_nested()
-            assert connection.connection._reset_agent is t1
-            assert connection._nested_transaction is t2
-            assert connection._transaction is t1
-            t2.close()
-            assert connection._nested_transaction is None
-            assert connection._transaction is t1
-            assert connection.connection._reset_agent is t1
-            t1.rollback()
-            assert connection._transaction is None
-            assert connection.connection._reset_agent is None
-        assert not t2.is_active
-        assert not t1.is_active
-
-    @testing.requires.savepoints
-    def test_begin_nested_close(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin_nested()
-            assert (
-                connection.connection._reset_agent is connection._transaction
-            )
-        assert not trans.is_active
-
-    @testing.requires.savepoints
-    def test_begin_begin_nested_close(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin()
-            trans2 = connection.begin_nested()
-            assert connection.connection._reset_agent is trans
-        assert not trans2.is_active
-        assert not trans.is_active
-
-    @testing.requires.savepoints
-    def test_begin_begin_nested_rollback_commit(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin()
-            trans2 = connection.begin_nested()
-            assert connection.connection._reset_agent is trans
-            trans2.rollback()
-            assert connection.connection._reset_agent is trans
-            trans.commit()
-            assert connection.connection._reset_agent is None
-
-    @testing.requires.savepoints
-    def test_begin_begin_nested_rollback_rollback(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin()
-            trans2 = connection.begin_nested()
-            assert connection.connection._reset_agent is trans
-            trans2.rollback()
-            assert connection.connection._reset_agent is trans
-            trans.rollback()
-            assert connection.connection._reset_agent is None
-
-    @testing.requires.two_phase_transactions
-    def test_reset_via_agent_begin_twophase(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin_twophase()
-            assert connection.connection._reset_agent is trans
-
-    @testing.requires.two_phase_transactions
-    def test_reset_via_agent_begin_twophase_commit(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin_twophase()
-            assert connection.connection._reset_agent is trans
-            trans.commit()
-            assert connection.connection._reset_agent is None
-
-    @testing.requires.two_phase_transactions
-    def test_reset_via_agent_begin_twophase_rollback(self):
-        with testing.db.connect() as connection:
-            trans = connection.begin_twophase()
-            assert connection.connection._reset_agent is trans
-            trans.rollback()
-            assert connection.connection._reset_agent is None
-
-
 class AutoRollbackTest(fixtures.TestBase):
     __backend__ = True
 
@@ -1071,164 +903,414 @@ class ConnectionCharacteristicTest(fixtures.TestBase):
         )
 
 
-class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
-    """Still some debate over if the "reset agent" should apply to the
-    future connection or not.
+class ResetFixture(object):
+    @testing.fixture()
+    def reset_agent(self, testing_engine):
+        engine = testing_engine()
+        engine.connect().close()
 
+        harness = mock.Mock(
+            do_rollback=mock.Mock(side_effect=testing.db.dialect.do_rollback),
+            do_commit=mock.Mock(side_effect=testing.db.dialect.do_commit),
+            engine=engine,
+        )
+        event.listen(engine, "rollback", harness.rollback)
+        event.listen(engine, "commit", harness.commit)
+        event.listen(engine, "rollback_savepoint", harness.rollback_savepoint)
+        event.listen(engine, "rollback_twophase", harness.rollback_twophase)
+        event.listen(engine, "commit_twophase", harness.commit_twophase)
 
-    """
+        with mock.patch.object(
+            engine.dialect, "do_rollback", harness.do_rollback
+        ), mock.patch.object(engine.dialect, "do_commit", harness.do_commit):
+            yield harness
+
+        event.remove(engine, "rollback", harness.rollback)
+        event.remove(engine, "commit", harness.commit)
+        event.remove(engine, "rollback_savepoint", harness.rollback_savepoint)
+        event.remove(engine, "rollback_twophase", harness.rollback_twophase)
+        event.remove(engine, "commit_twophase", harness.commit_twophase)
+
+
+class ResetAgentTest(ResetFixture, fixtures.TestBase):
+    # many of these tests illustate rollback-on-return being redundant
+    # vs. what the transaction just did, however this is to ensure
+    # even if statements were invoked on the DBAPI connection directly,
+    # the state is cleared.    options to optimize this with clear
+    # docs etc. should be added.
 
     __backend__ = True
 
-    def test_begin_close(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary)
+    def test_begin_close(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin()
+        assert not trans.is_active
+        eq_(
+            reset_agent.mock_calls,
+            [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)],
+        )
+
+    def test_begin_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin()
+            trans.rollback()
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    def test_begin_commit(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin()
+            trans.commit()
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.commit(connection),
+                mock.call.do_commit(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    def test_trans_close(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
-            assert connection.connection._reset_agent is trans
+            trans.close()
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
 
+    @testing.requires.savepoints
+    def test_begin_nested_trans_close_one(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            t1 = connection.begin()
+            t2 = connection.begin_nested()
+            assert connection._nested_transaction is t2
+            assert connection._transaction is t1
+            t2.close()
+            assert connection._nested_transaction is None
+            assert connection._transaction is t1
+            t1.close()
+        assert not t1.is_active
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    @testing.requires.savepoints
+    def test_begin_nested_trans_close_two(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            t1 = connection.begin()
+            t2 = connection.begin_nested()
+            assert connection._nested_transaction is t2
+            assert connection._transaction is t1
+
+            t1.close()
+
+            assert connection._nested_transaction is None
+            assert connection._transaction is None
+
+        assert not t1.is_active
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    @testing.requires.savepoints
+    def test_begin_nested_trans_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            t1 = connection.begin()
+            t2 = connection.begin_nested()
+            assert connection._nested_transaction is t2
+            assert connection._transaction is t1
+            t2.close()
+            assert connection._nested_transaction is None
+            assert connection._transaction is t1
+            t1.rollback()
+            assert connection._transaction is None
+        assert not t2.is_active
+        assert not t1.is_active
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    @testing.requires.savepoints
+    def test_begin_nested_close(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin_nested()
         assert not trans.is_active
-        eq_(canary.mock_calls, [mock.call(connection)])
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
 
-    def test_begin_rollback(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary)
+    @testing.requires.savepoints
+    def test_begin_begin_nested_close(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin_nested()
+        assert not trans2.is_active
+        assert not trans.is_active
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    @testing.requires.savepoints
+    def test_begin_begin_nested_rollback_commit(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin_nested()
+            trans2.rollback()
+            trans.commit()
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
+                mock.call.commit(connection),
+                mock.call.do_commit(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    @testing.requires.savepoints
+    def test_begin_begin_nested_rollback_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin()
+            trans2 = connection.begin_nested()
+            trans2.rollback()
+            trans.rollback()
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+    @testing.requires.two_phase_transactions
+    def test_reset_via_agent_begin_twophase(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin_twophase()  # noqa
+
+        # pg8000 rolls back via the rollback_twophase
+        eq_(
+            reset_agent.mock_calls[0],
+            mock.call.rollback_twophase(connection, mock.ANY, mock.ANY),
+        )
+
+    @testing.requires.two_phase_transactions
+    def test_reset_via_agent_begin_twophase_commit(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin_twophase()
+            trans.commit()
+        eq_(
+            reset_agent.mock_calls[0],
+            mock.call.commit_twophase(connection, mock.ANY, mock.ANY),
+        )
+
+        eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
+
+    @testing.requires.two_phase_transactions
+    def test_reset_via_agent_begin_twophase_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin_twophase()
+            trans.rollback()
+        eq_(
+            reset_agent.mock_calls[0:2],
+            [
+                mock.call.rollback_twophase(connection, mock.ANY, mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
+
+        eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
+
+
+class FutureResetAgentTest(
+    ResetFixture, fixtures.FutureEngineMixin, fixtures.TestBase
+):
+
+    __backend__ = True
+
+    def test_reset_agent_no_conn_transaction(self, reset_agent):
+        with reset_agent.engine.connect():
+            pass
+
+        eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)])
+
+    def test_begin_close(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
+            trans = connection.begin()
+
+        assert not trans.is_active
+        eq_(
+            reset_agent.mock_calls,
+            [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)],
+        )
+
+    def test_begin_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
-            assert connection.connection._reset_agent is trans
             trans.rollback()
-            assert connection.connection._reset_agent is None
         assert not trans.is_active
-        eq_(canary.mock_calls, [mock.call(connection)])
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
 
-    def test_begin_commit(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(connection, "commit", canary.commit)
+    def test_begin_commit(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
-            assert connection.connection._reset_agent is trans
             trans.commit()
-            assert connection.connection._reset_agent is None
         assert not trans.is_active
-        eq_(canary.mock_calls, [mock.call.commit(connection)])
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.commit(connection),
+                mock.call.do_commit(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
+            ],
+        )
 
     @testing.requires.savepoints
-    def test_begin_nested_close(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(connection, "commit", canary.commit)
+    def test_begin_nested_close(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin_nested()
-            assert (
-                connection.connection._reset_agent is connection._transaction
-            )
         # it's a savepoint, but root made sure it closed
         assert not trans.is_active
-        eq_(canary.mock_calls, [mock.call.rollback(connection)])
+        eq_(
+            reset_agent.mock_calls,
+            [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)],
+        )
 
     @testing.requires.savepoints
-    def test_begin_begin_nested_close(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(connection, "commit", canary.commit)
+    def test_begin_begin_nested_close(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
             trans2 = connection.begin_nested()
-            assert connection.connection._reset_agent is trans
         assert not trans2.is_active
         assert not trans.is_active
-        eq_(canary.mock_calls, [mock.call.rollback(connection)])
+        eq_(
+            reset_agent.mock_calls,
+            [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)],
+        )
 
     @testing.requires.savepoints
-    def test_begin_begin_nested_rollback_commit(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(
-                connection, "rollback_savepoint", canary.rollback_savepoint
-            )
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(connection, "commit", canary.commit)
+    def test_begin_begin_nested_rollback_commit(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
             trans2 = connection.begin_nested()
-            assert connection.connection._reset_agent is trans
             trans2.rollback()  # this is not a connection level event
-            assert connection.connection._reset_agent is trans
             trans.commit()
-            assert connection.connection._reset_agent is None
         eq_(
-            canary.mock_calls,
+            reset_agent.mock_calls,
             [
                 mock.call.rollback_savepoint(connection, mock.ANY, None),
                 mock.call.commit(connection),
+                mock.call.do_commit(mock.ANY),
+                mock.call.do_rollback(mock.ANY),
             ],
         )
 
     @testing.requires.savepoints
-    def test_begin_begin_nested_rollback_rollback(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(connection, "commit", canary.commit)
+    def test_begin_begin_nested_rollback_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin()
             trans2 = connection.begin_nested()
-            assert connection.connection._reset_agent is trans
             trans2.rollback()
-            assert connection.connection._reset_agent is trans
             trans.rollback()
-            assert connection.connection._reset_agent is None
-        eq_(canary.mock_calls, [mock.call.rollback(connection)])
+        eq_(
+            reset_agent.mock_calls,
+            [
+                mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
+                mock.call.rollback(connection),
+                mock.call.do_rollback(mock.ANY),
+                mock.call.do_rollback(mock.ANY),  # this is the reset on return
+            ],
+        )
 
     @testing.requires.two_phase_transactions
-    def test_reset_via_agent_begin_twophase(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(
-                connection, "rollback_twophase", canary.rollback_twophase
-            )
-            event.listen(connection, "commit", canary.commit)
+    def test_reset_via_agent_begin_twophase(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin_twophase()
-            assert connection.connection._reset_agent is trans
         assert not trans.is_active
+        # pg8000 uses the rollback_twophase as the full rollback.
         eq_(
-            canary.mock_calls,
-            [mock.call.rollback_twophase(connection, mock.ANY, False)],
+            reset_agent.mock_calls[0],
+            mock.call.rollback_twophase(connection, mock.ANY, False),
         )
 
     @testing.requires.two_phase_transactions
-    def test_reset_via_agent_begin_twophase_commit(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(connection, "commit", canary.commit)
-            event.listen(connection, "commit_twophase", canary.commit_twophase)
+    def test_reset_via_agent_begin_twophase_commit(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin_twophase()
-            assert connection.connection._reset_agent is trans
             trans.commit()
-            assert connection.connection._reset_agent is None
+
+        # again pg8000 vs. other PG drivers have different API
         eq_(
-            canary.mock_calls,
-            [mock.call.commit_twophase(connection, mock.ANY, False)],
+            reset_agent.mock_calls[0],
+            mock.call.commit_twophase(connection, mock.ANY, False),
         )
 
+        eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
+
     @testing.requires.two_phase_transactions
-    def test_reset_via_agent_begin_twophase_rollback(self):
-        canary = mock.Mock()
-        with testing.db.connect() as connection:
-            event.listen(connection, "rollback", canary.rollback)
-            event.listen(
-                connection, "rollback_twophase", canary.rollback_twophase
-            )
-            event.listen(connection, "commit", canary.commit)
+    def test_reset_via_agent_begin_twophase_rollback(self, reset_agent):
+        with reset_agent.engine.connect() as connection:
             trans = connection.begin_twophase()
-            assert connection.connection._reset_agent is trans
             trans.rollback()
-            assert connection.connection._reset_agent is None
+
+        # pg8000 vs. the other postgresql drivers have different
+        # twophase implementations.  the base postgresql driver emits
+        # "ROLLBACK PREPARED" explicitly then calls do_rollback().
+        # pg8000 has a dedicated API method.  so we get either one or
+        # two do_rollback() at the end, just need at least one.
         eq_(
-            canary.mock_calls,
-            [mock.call.rollback_twophase(connection, mock.ANY, False)],
+            reset_agent.mock_calls[0:2],
+            [
+                mock.call.rollback_twophase(connection, mock.ANY, False),
+                mock.call.do_rollback(mock.ANY),
+                # mock.call.do_rollback(mock.ANY),
+            ],
         )
+        eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
 
 
 class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):