From: Mike Bayer Date: Fri, 5 Mar 2021 22:34:10 +0000 (-0500) Subject: Replace reset_agent with direct call from connection X-Git-Tag: rel_1_4_0~15^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3a378f0b22e1745509d88b923123dc38d8014274;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Replace reset_agent with direct call from connection Fixed a regression where the "reset agent" of the connection pool wasn't really being utilized by the :class:`_engine.Connection` when it were closed, and also leading to a double-rollback scenario that was somewhat wasteful. The newer architecture of the engine has been updated so that the connection pool "reset-on-return" logic will be skipped when the :class:`_engine.Connection` explicitly closes out the transaction before returning the pool to the connection. Fixes: #6004 Change-Id: I5d2ac16cac71aa45a00b4b7481d7268bd828a168 --- diff --git a/doc/build/changelog/unreleased_14/6004.rst b/doc/build/changelog/unreleased_14/6004.rst new file mode 100644 index 0000000000..11afbbd5ba --- /dev/null +++ b/doc/build/changelog/unreleased_14/6004.rst @@ -0,0 +1,11 @@ +.. change:: + :tags: bug, regression, engine + :tickets: 6004 + + Fixed a regression where the "reset agent" of the connection pool wasn't + really being utilized by the :class:`_engine.Connection` when it were + closed, and also leading to a double-rollback scenario that was somewhat + wasteful. The newer architecture of the engine has been updated so that + the connection pool "reset-on-return" logic will be skipped when the + :class:`_engine.Connection` explicitly closes out the transaction before + returning the pool to the connection. diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index 4c852de321..796f09e2e0 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -101,9 +101,9 @@ by any additional options:: mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5) -DBAPI connections can then be procured from the pool using the :meth:`_pool.Pool.connect` -function. The return value of this method is a DBAPI connection that's contained -within a transparent proxy:: +DBAPI connections can then be procured from the pool using the +:meth:`_pool.Pool.connect` function. The return value of this method is a DBAPI +connection that's contained within a transparent proxy:: # get a connection conn = mypool.connect() @@ -120,23 +120,52 @@ pool:: # it to the pool. conn.close() -The proxy also returns its contained DBAPI connection to the pool -when it is garbage collected, -though it's not deterministic in Python that this occurs immediately (though -it is typical with cPython). - -The ``close()`` step also performs the important step of calling the -``rollback()`` method of the DBAPI connection. This is so that any -existing transaction on the connection is removed, not only ensuring -that no existing state remains on next usage, but also so that table -and row locks are released as well as that any isolated data snapshots -are removed. This behavior can be disabled using the ``reset_on_return`` -option of :class:`_pool.Pool`. - -A particular pre-created :class:`_pool.Pool` can be shared with one or more -engines by passing it to the ``pool`` argument of :func:`_sa.create_engine`:: - - e = create_engine('postgresql://', pool=mypool) +The proxy also returns its contained DBAPI connection to the pool when it is +garbage collected, though it's not deterministic in Python that this occurs +immediately (though it is typical with cPython). This usage is not recommended +however and in particular is not supported with asyncio DBAPI drivers. + +.. _pool_reset_on_return: + +Reset On Return +--------------- + +The pool also includes the a "reset on return" feature which will call the +``rollback()`` method of the DBAPI connection when the connection is returned +to the pool. This is so that any existing +transaction on the connection is removed, not only ensuring that no existing +state remains on next usage, but also so that table and row locks are released +as well as that any isolated data snapshots are removed. This ``rollback()`` +occurs in most cases even when using an :class:`_engine.Engine` object, +except in the case when the :class:`_engine.Connection` can guarantee +that a ``rollback()`` has been called immediately before the connection +is returned to the pool. + +For most DBAPIs, the call to ``rollback()`` is very inexpensive and if the +DBAPI has already completed a transaction, the method should be a no-op. +However, for DBAPIs that incur performance issues with ``rollback()`` even if +there's no state on the connection, this behavior can be disabled using the +``reset_on_return`` option of :class:`_pool.Pool`. The behavior is safe +to disable under the following conditions: + +* If the database does not support transactions at all, such as using + MySQL with the MyISAM engine, or the DBAPI is used in autocommit + mode only, the behavior can be disabled. +* If the pool itself doesn't maintain a connection after it's checked in, + such as when using :class:`.NullPool`, the behavior can be disabled. +* Otherwise, it must be ensured that: + * the application ensures that all :class:`_engine.Connection` + objects are explicitly closed out using a context manager (i.e. ``with`` + block) or a ``try/finally`` style block + * connections are never allowed to be garbage collected before being explicitly + closed. + * the DBAPI connection itself, e.g. ``connection.connection``, is not used + directly, or the application ensures that ``.rollback()`` is called + on this connection before releasing it back to the connection pool. + +The "reset on return" step may be logged using the ``logging.DEBUG`` +log level along with the ``sqlalchemy.pool`` logger, or by setting +``echo_pool='debug'`` with :func:`_sa.create_engine`. Pool Events ----------- diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index b5a37f67e3..362c811eeb 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1067,20 +1067,19 @@ class Connection(Connectable): if self._transaction: self._transaction.close() + skip_reset = True + else: + skip_reset = False if self._dbapi_connection is not None: conn = self._dbapi_connection - # this will do a reset-on-return every time, even if we - # called rollback() already. it might be worth optimizing - # this for the case that we are able to close without issue - conn.close() - - # this is in fact never true outside of a bunch of - # artificial scenarios created by the test suite and its - # fixtures. the reset_agent should no longer be necessary. - if conn._reset_agent is self._transaction: - conn._reset_agent = None + # as we just closed the transaction, close the connection + # pool connection without doing an additional reset + if skip_reset: + conn._close_no_reset() + else: + conn.close() # There is a slight chance that conn.close() may have # triggered an invalidation here in which case @@ -2309,36 +2308,14 @@ class RootTransaction(Transaction): self.is_active = True - # the SingletonThreadPool used with sqlite memory can share the same - # DBAPI connection / fairy among multiple Connection objects. while - # this is not ideal, it is a still-supported use case which at the - # moment occurs in the test suite due to how some of pytest fixtures - # work out - if connection._dbapi_connection._reset_agent is None: - connection._dbapi_connection._reset_agent = self - def _deactivate_from_connection(self): if self.is_active: assert self.connection._transaction is self self.is_active = False - if ( - self.connection._dbapi_connection is not None - and self.connection._dbapi_connection._reset_agent is self - ): - self.connection._dbapi_connection._reset_agent = None - elif self.connection._transaction is not self: util.warn("transaction already deassociated from connection") - # we have tests that want to make sure the pool handles this - # correctly. TODO: how to disable internal assertions cleanly? - # else: - # if self.connection._dbapi_connection is not None: - # assert ( - # self.connection._dbapi_connection._reset_agent is not self - # ) - def _do_deactivate(self): # called from a MarkerTransaction to cancel this root transaction. # the transaction stays in place as connection._transaction, but diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index 9b4e61fc3f..6ec4896046 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -106,7 +106,9 @@ class Pool(log.Identified): logging. :param reset_on_return: Determine steps to take on - connections as they are returned to the pool. + connections as they are returned to the pool, which were + not otherwise handled by a :class:`_engine.Connection`. + reset_on_return can have any of these values: * ``"rollback"`` - call rollback() on the connection, @@ -124,22 +126,19 @@ class Pool(log.Identified): any data changes present on the transaction are committed unconditionally. * ``None`` - don't do anything on the connection. - This setting should generally only be made on a database - that has no transaction support at all, - namely MySQL MyISAM; when used on this backend, performance - can be improved as the "rollback" call is still expensive on - MySQL. It is **strongly recommended** that this setting not be - used for transaction-supporting databases in conjunction with - a persistent pool such as :class:`.QueuePool`, as it opens - the possibility for connections still in a transaction to be - idle in the pool. The setting may be appropriate in the - case of :class:`.NullPool` or special circumstances where - the connection pool in use is not being used to maintain connection - lifecycle. + This setting is only appropriate if the database / DBAPI + works in pure "autocommit" mode at all times, or if the + application uses the :class:`_engine.Engine` with consistent + connectivity patterns. See the section + :ref:`pool_reset_on_return` for more details. * ``False`` - same as None, this is here for backwards compatibility. + .. seealso:: + + :ref:`pool_reset_on_return` + :param events: a list of 2-tuples, each of the form ``(callable, target)`` which will be passed to :func:`.event.listen` upon construction. Provided here so that event listeners @@ -429,7 +428,7 @@ class _ConnectionRecord(object): rec.fairy_ref = ref = weakref.ref( fairy, lambda ref: _finalize_fairy - and _finalize_fairy(None, rec, pool, ref, echo), + and _finalize_fairy(None, rec, pool, ref, echo, True), ) _strong_ref_connection_records[ref] = rec if echo: @@ -612,6 +611,7 @@ def _finalize_fairy( pool, ref, # this is None when called directly, not by the gc echo, + reset=True, fairy=None, ): """Cleanup for a :class:`._ConnectionFairy` whether or not it's already @@ -647,7 +647,11 @@ def _finalize_fairy( if connection is not None: if connection_record and echo: pool.logger.debug( - "Connection %r being returned to pool", connection + "Connection %r being returned to pool%s", + connection, + ", transaction state was already reset by caller" + if not reset + else "", ) try: @@ -655,7 +659,7 @@ def _finalize_fairy( connection, connection_record, echo ) assert fairy.connection is connection - if can_manipulate_connection: + if reset and can_manipulate_connection: fairy._reset(pool) if detach: @@ -738,24 +742,6 @@ class _ConnectionFairy(object): """ - _reset_agent = None - """Refer to an object with a ``.commit()`` and ``.rollback()`` method; - if non-None, the "reset-on-return" feature will call upon this object - rather than directly against the dialect-level do_rollback() and - do_commit() methods. - - In practice, a :class:`_engine.Connection` assigns a :class:`.Transaction` - object - to this variable when one is in scope so that the :class:`.Transaction` - takes the job of committing or rolling back on return if - :meth:`_engine.Connection.close` is called while the :class:`.Transaction` - still exists. - - This is essentially an "event handler" of sorts but is simplified as an - instance variable both for performance/simplicity as well as that there - can only be one "reset agent" at a time. - """ - @classmethod def _checkout(cls, pool, threadconns=None, fairy=None): if not fairy: @@ -856,13 +842,14 @@ class _ConnectionFairy(object): def _checkout_existing(self): return _ConnectionFairy._checkout(self._pool, fairy=self) - def _checkin(self): + def _checkin(self, reset=True): _finalize_fairy( self.connection, self._connection_record, self._pool, None, self._echo, + reset=reset, fairy=self, ) self.connection = None @@ -876,41 +863,16 @@ class _ConnectionFairy(object): if pool._reset_on_return is reset_rollback: if self._echo: pool.logger.debug( - "Connection %s rollback-on-return%s", - self.connection, - ", via agent" if self._reset_agent else "", + "Connection %s rollback-on-return", self.connection ) - if self._reset_agent: - if not self._reset_agent.is_active: - util.warn( - "Reset agent is not active. " - "This should not occur unless there was already " - "a connectivity error in progress." - ) - pool._dialect.do_rollback(self) - else: - self._reset_agent.rollback() - else: - pool._dialect.do_rollback(self) + pool._dialect.do_rollback(self) elif pool._reset_on_return is reset_commit: if self._echo: pool.logger.debug( - "Connection %s commit-on-return%s", + "Connection %s commit-on-return", self.connection, - ", via agent" if self._reset_agent else "", ) - if self._reset_agent: - if not self._reset_agent.is_active: - util.warn( - "Reset agent is not active. " - "This should not occur unless there was already " - "a connectivity error in progress." - ) - pool._dialect.do_commit(self) - else: - self._reset_agent.commit() - else: - pool._dialect.do_commit(self) + pool._dialect.do_commit(self) @property def _logger(self): @@ -1032,3 +994,8 @@ class _ConnectionFairy(object): self._counter -= 1 if self._counter == 0: self._checkin() + + def _close_no_reset(self): + self._counter -= 1 + if self._counter == 0: + self._checkin(reset=False) diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py index 4758d49eb6..a08725921a 100644 --- a/test/engine/test_deprecations.py +++ b/test/engine/test_deprecations.py @@ -40,6 +40,7 @@ from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.mock import Mock from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table +from .test_transaction import ResetFixture def _string_deprecation_expect(): @@ -424,7 +425,7 @@ class CreateEngineTest(fixtures.TestBase): ) -class TransactionTest(fixtures.TablesTest): +class TransactionTest(ResetFixture, fixtures.TablesTest): __backend__ = True @classmethod @@ -475,33 +476,43 @@ class TransactionTest(fixtures.TablesTest): with testing.db.connect() as conn: eq_(conn.execute(users.select()).fetchall(), [(1, "user1")]) - def test_begin_begin_rollback_rollback(self): - with testing.db.connect() as connection: + def test_begin_begin_rollback_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() with testing.expect_deprecated_20( r"Calling .begin\(\) when a transaction is already " "begun, creating a 'sub' transaction" ): trans2 = connection.begin() - assert connection.connection._reset_agent is trans trans2.rollback() - assert connection.connection._reset_agent is None trans.rollback() - assert connection.connection._reset_agent is None + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) - def test_begin_begin_commit_commit(self): - with testing.db.connect() as connection: + def test_begin_begin_commit_commit(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() with testing.expect_deprecated_20( r"Calling .begin\(\) when a transaction is already " "begun, creating a 'sub' transaction" ): trans2 = connection.begin() - assert connection.connection._reset_agent is trans trans2.commit() - assert connection.connection._reset_agent is trans trans.commit() - assert connection.connection._reset_agent is None + eq_( + reset_agent.mock_calls, + [ + mock.call.commit(connection), + mock.call.do_commit(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) def test_branch_nested_rollback(self, local_connection): connection = local_connection diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index 2c90b9bf11..c5f8b69b64 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -460,14 +460,14 @@ class PoolLoggingTest(fixtures.TestBase): [ "Created new connection %r", "Connection %r checked out from pool", - "Connection %r being returned to pool", - "Connection %s rollback-on-return%s", + "Connection %r being returned to pool%s", + "Connection %s rollback-on-return", "Connection %r checked out from pool", - "Connection %r being returned to pool", - "Connection %s rollback-on-return%s", + "Connection %r being returned to pool%s", + "Connection %s rollback-on-return", "Connection %r checked out from pool", - "Connection %r being returned to pool", - "Connection %s rollback-on-return%s", + "Connection %r being returned to pool%s", + "Connection %s rollback-on-return", "Closing connection %r", ] + (["Pool disposed. %s"] if dispose else []), diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index f29373e955..3c2257331d 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -1817,92 +1817,6 @@ class ResetOnReturnTest(PoolTestBase): assert not dbapi.connect().rollback.called assert not dbapi.connect().commit.called - def test_agent_rollback(self): - dbapi, p = self._fixture(reset_on_return="rollback") - - class Agent(object): - def __init__(self, conn): - self.conn = conn - - is_active = True - - def rollback(self): - self.conn.special_rollback() - - def commit(self): - self.conn.special_commit() - - c1 = p.connect() - c1._reset_agent = Agent(c1) - c1.close() - - assert dbapi.connect().special_rollback.called - assert not dbapi.connect().special_commit.called - - assert not dbapi.connect().rollback.called - assert not dbapi.connect().commit.called - - c1 = p.connect() - c1.close() - eq_(dbapi.connect().special_rollback.call_count, 1) - eq_(dbapi.connect().special_commit.call_count, 0) - - assert dbapi.connect().rollback.called - assert not dbapi.connect().commit.called - - def test_agent_commit(self): - dbapi, p = self._fixture(reset_on_return="commit") - - class Agent(object): - def __init__(self, conn): - self.conn = conn - - is_active = True - - def rollback(self): - self.conn.special_rollback() - - def commit(self): - self.conn.special_commit() - - c1 = p.connect() - c1._reset_agent = Agent(c1) - c1.close() - assert not dbapi.connect().special_rollback.called - assert dbapi.connect().special_commit.called - - assert not dbapi.connect().rollback.called - assert not dbapi.connect().commit.called - - c1 = p.connect() - c1.close() - - eq_(dbapi.connect().special_rollback.call_count, 0) - eq_(dbapi.connect().special_commit.call_count, 1) - assert not dbapi.connect().rollback.called - assert dbapi.connect().commit.called - - def test_reset_agent_disconnect(self): - dbapi, p = self._fixture(reset_on_return="rollback") - - class Agent(object): - def __init__(self, conn): - self.conn = conn - - def rollback(self): - p._invalidate(self.conn) - raise Exception("hi") - - def commit(self): - self.conn.commit() - - c1 = p.connect() - c1._reset_agent = Agent(c1) - c1.close() - - # no warning raised. We know it would warn due to - # QueuePoolTest.test_no_double_checkin - class SingletonThreadPoolTest(PoolTestBase): @testing.requires.threading_with_mock diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index 7ba189d3d6..07408e386e 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -477,174 +477,6 @@ class TransactionTest(fixtures.TablesTest): eq_(result.fetchall(), []) -class ResetAgentTest(fixtures.TestBase): - __backend__ = True - - def test_begin_close(self): - with testing.db.connect() as connection: - trans = connection.begin() - assert connection.connection._reset_agent is trans - assert not trans.is_active - - def test_begin_rollback(self): - with testing.db.connect() as connection: - trans = connection.begin() - assert connection.connection._reset_agent is trans - trans.rollback() - assert connection.connection._reset_agent is None - - def test_begin_commit(self): - with testing.db.connect() as connection: - trans = connection.begin() - assert connection.connection._reset_agent is trans - trans.commit() - assert connection.connection._reset_agent is None - - def test_trans_close(self): - with testing.db.connect() as connection: - trans = connection.begin() - assert connection.connection._reset_agent is trans - trans.close() - assert connection.connection._reset_agent is None - - def test_trans_reset_agent_broken_ensure(self): - eng = testing_engine() - conn = eng.connect() - trans = conn.begin() - assert conn.connection._reset_agent is trans - trans.is_active = False - - with expect_warnings("Reset agent is not active"): - conn.close() - - def test_trans_commit_reset_agent_broken_ensure_pool(self): - eng = testing_engine(options={"pool_reset_on_return": "commit"}) - conn = eng.connect() - trans = conn.begin() - assert conn.connection._reset_agent is trans - trans.is_active = False - - with expect_warnings("Reset agent is not active"): - conn.close() - - @testing.requires.savepoints - def test_begin_nested_trans_close_one(self): - with testing.db.connect() as connection: - t1 = connection.begin() - assert connection.connection._reset_agent is t1 - t2 = connection.begin_nested() - assert connection.connection._reset_agent is t1 - assert connection._nested_transaction is t2 - assert connection._transaction is t1 - t2.close() - assert connection._nested_transaction is None - assert connection._transaction is t1 - assert connection.connection._reset_agent is t1 - t1.close() - assert connection.connection._reset_agent is None - assert not t1.is_active - - @testing.requires.savepoints - def test_begin_nested_trans_close_two(self): - with testing.db.connect() as connection: - t1 = connection.begin() - assert connection.connection._reset_agent is t1 - t2 = connection.begin_nested() - assert connection.connection._reset_agent is t1 - assert connection._nested_transaction is t2 - assert connection._transaction is t1 - - assert connection.connection._reset_agent is t1 - t1.close() - - assert connection._nested_transaction is None - assert connection._transaction is None - - assert connection.connection._reset_agent is None - assert not t1.is_active - - @testing.requires.savepoints - def test_begin_nested_trans_rollback(self): - with testing.db.connect() as connection: - t1 = connection.begin() - assert connection.connection._reset_agent is t1 - t2 = connection.begin_nested() - assert connection.connection._reset_agent is t1 - assert connection._nested_transaction is t2 - assert connection._transaction is t1 - t2.close() - assert connection._nested_transaction is None - assert connection._transaction is t1 - assert connection.connection._reset_agent is t1 - t1.rollback() - assert connection._transaction is None - assert connection.connection._reset_agent is None - assert not t2.is_active - assert not t1.is_active - - @testing.requires.savepoints - def test_begin_nested_close(self): - with testing.db.connect() as connection: - trans = connection.begin_nested() - assert ( - connection.connection._reset_agent is connection._transaction - ) - assert not trans.is_active - - @testing.requires.savepoints - def test_begin_begin_nested_close(self): - with testing.db.connect() as connection: - trans = connection.begin() - trans2 = connection.begin_nested() - assert connection.connection._reset_agent is trans - assert not trans2.is_active - assert not trans.is_active - - @testing.requires.savepoints - def test_begin_begin_nested_rollback_commit(self): - with testing.db.connect() as connection: - trans = connection.begin() - trans2 = connection.begin_nested() - assert connection.connection._reset_agent is trans - trans2.rollback() - assert connection.connection._reset_agent is trans - trans.commit() - assert connection.connection._reset_agent is None - - @testing.requires.savepoints - def test_begin_begin_nested_rollback_rollback(self): - with testing.db.connect() as connection: - trans = connection.begin() - trans2 = connection.begin_nested() - assert connection.connection._reset_agent is trans - trans2.rollback() - assert connection.connection._reset_agent is trans - trans.rollback() - assert connection.connection._reset_agent is None - - @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase(self): - with testing.db.connect() as connection: - trans = connection.begin_twophase() - assert connection.connection._reset_agent is trans - - @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase_commit(self): - with testing.db.connect() as connection: - trans = connection.begin_twophase() - assert connection.connection._reset_agent is trans - trans.commit() - assert connection.connection._reset_agent is None - - @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase_rollback(self): - with testing.db.connect() as connection: - trans = connection.begin_twophase() - assert connection.connection._reset_agent is trans - trans.rollback() - assert connection.connection._reset_agent is None - - class AutoRollbackTest(fixtures.TestBase): __backend__ = True @@ -1071,164 +903,414 @@ class ConnectionCharacteristicTest(fixtures.TestBase): ) -class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): - """Still some debate over if the "reset agent" should apply to the - future connection or not. +class ResetFixture(object): + @testing.fixture() + def reset_agent(self, testing_engine): + engine = testing_engine() + engine.connect().close() + harness = mock.Mock( + do_rollback=mock.Mock(side_effect=testing.db.dialect.do_rollback), + do_commit=mock.Mock(side_effect=testing.db.dialect.do_commit), + engine=engine, + ) + event.listen(engine, "rollback", harness.rollback) + event.listen(engine, "commit", harness.commit) + event.listen(engine, "rollback_savepoint", harness.rollback_savepoint) + event.listen(engine, "rollback_twophase", harness.rollback_twophase) + event.listen(engine, "commit_twophase", harness.commit_twophase) - """ + with mock.patch.object( + engine.dialect, "do_rollback", harness.do_rollback + ), mock.patch.object(engine.dialect, "do_commit", harness.do_commit): + yield harness + + event.remove(engine, "rollback", harness.rollback) + event.remove(engine, "commit", harness.commit) + event.remove(engine, "rollback_savepoint", harness.rollback_savepoint) + event.remove(engine, "rollback_twophase", harness.rollback_twophase) + event.remove(engine, "commit_twophase", harness.commit_twophase) + + +class ResetAgentTest(ResetFixture, fixtures.TestBase): + # many of these tests illustate rollback-on-return being redundant + # vs. what the transaction just did, however this is to ensure + # even if statements were invoked on the DBAPI connection directly, + # the state is cleared. options to optimize this with clear + # docs etc. should be added. __backend__ = True - def test_begin_close(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary) + def test_begin_close(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin() + assert not trans.is_active + eq_( + reset_agent.mock_calls, + [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)], + ) + + def test_begin_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin() + trans.rollback() + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + def test_begin_commit(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin() + trans.commit() + eq_( + reset_agent.mock_calls, + [ + mock.call.commit(connection), + mock.call.do_commit(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + def test_trans_close(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() - assert connection.connection._reset_agent is trans + trans.close() + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + @testing.requires.savepoints + def test_begin_nested_trans_close_one(self, reset_agent): + with reset_agent.engine.connect() as connection: + t1 = connection.begin() + t2 = connection.begin_nested() + assert connection._nested_transaction is t2 + assert connection._transaction is t1 + t2.close() + assert connection._nested_transaction is None + assert connection._transaction is t1 + t1.close() + assert not t1.is_active + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY), + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + @testing.requires.savepoints + def test_begin_nested_trans_close_two(self, reset_agent): + with reset_agent.engine.connect() as connection: + t1 = connection.begin() + t2 = connection.begin_nested() + assert connection._nested_transaction is t2 + assert connection._transaction is t1 + + t1.close() + + assert connection._nested_transaction is None + assert connection._transaction is None + + assert not t1.is_active + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + @testing.requires.savepoints + def test_begin_nested_trans_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: + t1 = connection.begin() + t2 = connection.begin_nested() + assert connection._nested_transaction is t2 + assert connection._transaction is t1 + t2.close() + assert connection._nested_transaction is None + assert connection._transaction is t1 + t1.rollback() + assert connection._transaction is None + assert not t2.is_active + assert not t1.is_active + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY), + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + @testing.requires.savepoints + def test_begin_nested_close(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin_nested() assert not trans.is_active - eq_(canary.mock_calls, [mock.call(connection)]) + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + ], + ) - def test_begin_rollback(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary) + @testing.requires.savepoints + def test_begin_begin_nested_close(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + assert not trans2.is_active + assert not trans.is_active + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + ], + ) + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_commit(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + trans2.rollback() + trans.commit() + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY), + mock.call.commit(connection), + mock.call.do_commit(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + trans2.rollback() + trans.rollback() + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY), + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin_twophase() # noqa + + # pg8000 rolls back via the rollback_twophase + eq_( + reset_agent.mock_calls[0], + mock.call.rollback_twophase(connection, mock.ANY, mock.ANY), + ) + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_commit(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin_twophase() + trans.commit() + eq_( + reset_agent.mock_calls[0], + mock.call.commit_twophase(connection, mock.ANY, mock.ANY), + ) + + eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY)) + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin_twophase() + trans.rollback() + eq_( + reset_agent.mock_calls[0:2], + [ + mock.call.rollback_twophase(connection, mock.ANY, mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) + + eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY)) + + +class FutureResetAgentTest( + ResetFixture, fixtures.FutureEngineMixin, fixtures.TestBase +): + + __backend__ = True + + def test_reset_agent_no_conn_transaction(self, reset_agent): + with reset_agent.engine.connect(): + pass + + eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)]) + + def test_begin_close(self, reset_agent): + with reset_agent.engine.connect() as connection: + trans = connection.begin() + + assert not trans.is_active + eq_( + reset_agent.mock_calls, + [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)], + ) + + def test_begin_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() - assert connection.connection._reset_agent is trans trans.rollback() - assert connection.connection._reset_agent is None assert not trans.is_active - eq_(canary.mock_calls, [mock.call(connection)]) + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) - def test_begin_commit(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary.rollback) - event.listen(connection, "commit", canary.commit) + def test_begin_commit(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() - assert connection.connection._reset_agent is trans trans.commit() - assert connection.connection._reset_agent is None assert not trans.is_active - eq_(canary.mock_calls, [mock.call.commit(connection)]) + eq_( + reset_agent.mock_calls, + [ + mock.call.commit(connection), + mock.call.do_commit(mock.ANY), + mock.call.do_rollback(mock.ANY), + ], + ) @testing.requires.savepoints - def test_begin_nested_close(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary.rollback) - event.listen(connection, "commit", canary.commit) + def test_begin_nested_close(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin_nested() - assert ( - connection.connection._reset_agent is connection._transaction - ) # it's a savepoint, but root made sure it closed assert not trans.is_active - eq_(canary.mock_calls, [mock.call.rollback(connection)]) + eq_( + reset_agent.mock_calls, + [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)], + ) @testing.requires.savepoints - def test_begin_begin_nested_close(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary.rollback) - event.listen(connection, "commit", canary.commit) + def test_begin_begin_nested_close(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is trans assert not trans2.is_active assert not trans.is_active - eq_(canary.mock_calls, [mock.call.rollback(connection)]) + eq_( + reset_agent.mock_calls, + [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)], + ) @testing.requires.savepoints - def test_begin_begin_nested_rollback_commit(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen( - connection, "rollback_savepoint", canary.rollback_savepoint - ) - event.listen(connection, "rollback", canary.rollback) - event.listen(connection, "commit", canary.commit) + def test_begin_begin_nested_rollback_commit(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is trans trans2.rollback() # this is not a connection level event - assert connection.connection._reset_agent is trans trans.commit() - assert connection.connection._reset_agent is None eq_( - canary.mock_calls, + reset_agent.mock_calls, [ mock.call.rollback_savepoint(connection, mock.ANY, None), mock.call.commit(connection), + mock.call.do_commit(mock.ANY), + mock.call.do_rollback(mock.ANY), ], ) @testing.requires.savepoints - def test_begin_begin_nested_rollback_rollback(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary.rollback) - event.listen(connection, "commit", canary.commit) + def test_begin_begin_nested_rollback_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is trans trans2.rollback() - assert connection.connection._reset_agent is trans trans.rollback() - assert connection.connection._reset_agent is None - eq_(canary.mock_calls, [mock.call.rollback(connection)]) + eq_( + reset_agent.mock_calls, + [ + mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY), + mock.call.rollback(connection), + mock.call.do_rollback(mock.ANY), + mock.call.do_rollback(mock.ANY), # this is the reset on return + ], + ) @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary.rollback) - event.listen( - connection, "rollback_twophase", canary.rollback_twophase - ) - event.listen(connection, "commit", canary.commit) + def test_reset_via_agent_begin_twophase(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin_twophase() - assert connection.connection._reset_agent is trans assert not trans.is_active + # pg8000 uses the rollback_twophase as the full rollback. eq_( - canary.mock_calls, - [mock.call.rollback_twophase(connection, mock.ANY, False)], + reset_agent.mock_calls[0], + mock.call.rollback_twophase(connection, mock.ANY, False), ) @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase_commit(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary.rollback) - event.listen(connection, "commit", canary.commit) - event.listen(connection, "commit_twophase", canary.commit_twophase) + def test_reset_via_agent_begin_twophase_commit(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin_twophase() - assert connection.connection._reset_agent is trans trans.commit() - assert connection.connection._reset_agent is None + + # again pg8000 vs. other PG drivers have different API eq_( - canary.mock_calls, - [mock.call.commit_twophase(connection, mock.ANY, False)], + reset_agent.mock_calls[0], + mock.call.commit_twophase(connection, mock.ANY, False), ) + eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY)) + @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase_rollback(self): - canary = mock.Mock() - with testing.db.connect() as connection: - event.listen(connection, "rollback", canary.rollback) - event.listen( - connection, "rollback_twophase", canary.rollback_twophase - ) - event.listen(connection, "commit", canary.commit) + def test_reset_via_agent_begin_twophase_rollback(self, reset_agent): + with reset_agent.engine.connect() as connection: trans = connection.begin_twophase() - assert connection.connection._reset_agent is trans trans.rollback() - assert connection.connection._reset_agent is None + + # pg8000 vs. the other postgresql drivers have different + # twophase implementations. the base postgresql driver emits + # "ROLLBACK PREPARED" explicitly then calls do_rollback(). + # pg8000 has a dedicated API method. so we get either one or + # two do_rollback() at the end, just need at least one. eq_( - canary.mock_calls, - [mock.call.rollback_twophase(connection, mock.ANY, False)], + reset_agent.mock_calls[0:2], + [ + mock.call.rollback_twophase(connection, mock.ANY, False), + mock.call.do_rollback(mock.ANY), + # mock.call.do_rollback(mock.ANY), + ], ) + eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY)) class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):