with testing.db.connect() as conn:
eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
+ def test_begin_begin_rollback_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_begin_commit_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.commit()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ def test_branch_nested_rollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ nested = branched.begin()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert not connection.in_transaction()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please",
+ connection.exec_driver_sql,
+ "select 1",
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
+ conn = local_connection
+ users = self.tables.users
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ mk1 = conn.begin()
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ mk1.rollback()
+
+ assert not sp1.is_active
+ assert not trans.is_active
+ assert conn._transaction is trans
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ @testing.requires.savepoints
+ def test_rollback_to_subtransaction(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ trans2 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans3 = connection.begin()
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ trans3.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive savepoint transaction.",
+ connection.exec_driver_sql,
+ "select 1",
+ )
+ trans2.rollback()
+ assert connection._nested_transaction is None
+
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ transaction.commit()
+ eq_(
+ connection.execute(
+ select(users.c.user_id).order_by(users.c.user_id)
+ ).fetchall(),
+ [(1,), (4,)],
+ )
+
+ # PG emergency shutdown:
+ # select * from pg_prepared_xacts
+ # ROLLBACK PREPARED '<xid>'
+ # MySQL emergency shutdown:
+ # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 |
+ # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done
+ @testing.requires.skip_mysql_on_windows
+ @testing.requires.two_phase_transactions
+ @testing.requires.savepoints
+ def test_mixed_two_phase_transaction(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ transaction2 = connection.begin()
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ transaction3 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ transaction4 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ transaction4.commit()
+ transaction3.rollback()
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ transaction2.commit()
+ transaction.prepare()
+ transaction.commit()
+ eq_(
+ connection.execute(
+ select(users.c.user_id).order_by(users.c.user_id)
+ ).fetchall(),
+ [(1,), (2,), (5,)],
+ )
+
+ @testing.requires.savepoints
+ def test_inactive_due_to_subtransaction_on_nested_no_commit(
+ self, local_connection
+ ):
+ connection = local_connection
+ trans = connection.begin()
+
+ nested = connection.begin_nested()
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ trans2.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive savepoint transaction. "
+ "Please rollback",
+ nested.commit,
+ )
+ trans.commit()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This nested transaction is inactive",
+ nested.commit,
+ )
+
+ def test_close(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ assert connection.in_transaction()
+ trans2.close()
+ assert connection.in_transaction()
+ transaction.commit()
+ assert not connection.in_transaction()
+ self.assert_(
+ connection.exec_driver_sql(
+ "select count(*) from " "users"
+ ).scalar()
+ == 5
+ )
+ result = connection.exec_driver_sql("select * from users")
+ assert len(result.fetchall()) == 5
+
+ def test_close2(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ assert connection.in_transaction()
+ trans2.close()
+ assert connection.in_transaction()
+ transaction.close()
+ assert not connection.in_transaction()
+ self.assert_(
+ connection.exec_driver_sql(
+ "select count(*) from " "users"
+ ).scalar()
+ == 0
+ )
+ result = connection.exec_driver_sql("select * from users")
+ assert len(result.fetchall()) == 0
+
+ def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
+ connection = local_connection
+ trans = connection.begin()
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ trans2.rollback()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please rollback",
+ trans.commit,
+ )
+
+ trans.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This transaction is inactive",
+ trans.commit,
+ )
+
+ def test_nested_rollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ try:
+ transaction = connection.begin()
+ try:
+ connection.execute(
+ users.insert(), user_id=1, user_name="user1"
+ )
+ connection.execute(
+ users.insert(), user_id=2, user_name="user2"
+ )
+ connection.execute(
+ users.insert(), user_id=3, user_name="user3"
+ )
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ try:
+ connection.execute(
+ users.insert(), user_id=4, user_name="user4"
+ )
+ connection.execute(
+ users.insert(), user_id=5, user_name="user5"
+ )
+ raise Exception("uh oh")
+ trans2.commit()
+ except Exception:
+ trans2.rollback()
+ raise
+ transaction.rollback()
+ except Exception:
+ transaction.rollback()
+ raise
+ except Exception as e:
+ # and not "This transaction is inactive"
+ # comment moved here to fix pep8
+ assert str(e) == "uh oh"
+ else:
+ assert False
+
+ def test_nesting(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ trans2.commit()
+ transaction.rollback()
+ self.assert_(
+ connection.exec_driver_sql(
+ "select count(*) from " "users"
+ ).scalar()
+ == 0
+ )
+ result = connection.exec_driver_sql("select * from users")
+ assert len(result.fetchall()) == 0
+
+ def test_no_marker_on_inactive_trans(self, local_connection):
+ conn = local_connection
+ conn.begin()
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ mk1 = conn.begin()
+
+ mk1.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "the current transaction on this connection is inactive.",
+ conn.begin,
+ )
+
def test_implicit_autocommit_compiled(self):
users = self.tables.users
result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 0
- def test_nested_rollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- try:
- transaction = connection.begin()
- try:
- connection.execute(
- users.insert(), user_id=1, user_name="user1"
- )
- connection.execute(
- users.insert(), user_id=2, user_name="user2"
- )
- connection.execute(
- users.insert(), user_id=3, user_name="user3"
- )
- trans2 = connection.begin()
- try:
- connection.execute(
- users.insert(), user_id=4, user_name="user4"
- )
- connection.execute(
- users.insert(), user_id=5, user_name="user5"
- )
- raise Exception("uh oh")
- trans2.commit()
- except Exception:
- trans2.rollback()
- raise
- transaction.rollback()
- except Exception:
- transaction.rollback()
- raise
- except Exception as e:
- # and not "This transaction is inactive"
- # comment moved here to fix pep8
- assert str(e) == "uh oh"
- else:
- assert False
-
- def test_branch_nested_rollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- connection.begin()
- branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert not connection.in_transaction()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive transaction. Please",
- connection.exec_driver_sql,
- "select 1",
- )
-
- def test_no_marker_on_inactive_trans(self, local_connection):
- conn = local_connection
- conn.begin()
-
- mk1 = conn.begin()
-
- mk1.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "the current transaction on this connection is inactive.",
- conn.begin,
- )
-
- @testing.requires.savepoints
- def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
- conn = local_connection
- users = self.tables.users
- trans = conn.begin()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- mk1 = conn.begin()
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- mk1.rollback()
-
- assert not sp1.is_active
- assert not trans.is_active
- assert conn._transaction is trans
- assert conn._nested_transaction is None
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
-
- def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
- connection = local_connection
- trans = connection.begin()
- trans2 = connection.begin()
- trans2.rollback()
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive transaction. Please rollback",
- trans.commit,
- )
-
- trans.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This transaction is inactive",
- trans.commit,
- )
-
- @testing.requires.savepoints
- def test_inactive_due_to_subtransaction_on_nested_no_commit(
- self, local_connection
- ):
- connection = local_connection
- trans = connection.begin()
-
- nested = connection.begin_nested()
-
- trans2 = connection.begin()
- trans2.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive savepoint transaction. "
- "Please rollback",
- nested.commit,
- )
- trans.commit()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This nested transaction is inactive",
- nested.commit,
- )
-
def test_deactivated_warning_ctxmanager(self, local_connection):
with expect_warnings(
"transaction already deassociated from connection"
0,
)
- def test_nesting(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
- trans2 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
- trans2.commit()
- transaction.rollback()
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 0
- )
- result = connection.exec_driver_sql("select * from users")
- assert len(result.fetchall()) == 0
-
def test_with_interface(self, local_connection):
connection = local_connection
users = self.tables.users
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
connection.execute(users.insert(), user_id=3, user_name="user3")
- trans2 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
- assert connection.in_transaction()
- trans2.close()
assert connection.in_transaction()
transaction.commit()
assert not connection.in_transaction()
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 5
- )
result = connection.exec_driver_sql("select * from users")
- assert len(result.fetchall()) == 5
+ eq_(len(result.fetchall()), 3)
def test_close2(self, local_connection):
connection = local_connection
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
connection.execute(users.insert(), user_id=3, user_name="user3")
- trans2 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
- assert connection.in_transaction()
- trans2.close()
assert connection.in_transaction()
transaction.close()
assert not connection.in_transaction()
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 0
- )
result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 0
[(1,), (2,), (3,)],
)
- @testing.requires.savepoints
- def test_rollback_to_subtransaction(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- trans2 = connection.begin_nested()
- connection.execute(users.insert(), user_id=2, user_name="user2")
-
- trans3 = connection.begin()
- connection.execute(users.insert(), user_id=3, user_name="user3")
- trans3.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive savepoint transaction.",
- connection.exec_driver_sql,
- "select 1",
- )
- trans2.rollback()
- assert connection._nested_transaction is None
-
- connection.execute(users.insert(), user_id=4, user_name="user4")
- transaction.commit()
- eq_(
- connection.execute(
- select(users.c.user_id).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (4,)],
- )
-
@testing.requires.two_phase_transactions
def test_two_phase_transaction(self, local_connection):
connection = local_connection
[(1,), (2,)],
)
- # PG emergency shutdown:
- # select * from pg_prepared_xacts
- # ROLLBACK PREPARED '<xid>'
- # MySQL emergency shutdown:
- # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 |
- # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done
- @testing.requires.skip_mysql_on_windows
- @testing.requires.two_phase_transactions
- @testing.requires.savepoints
- def test_mixed_two_phase_transaction(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- transaction2 = connection.begin()
- connection.execute(users.insert(), user_id=2, user_name="user2")
- transaction3 = connection.begin_nested()
- connection.execute(users.insert(), user_id=3, user_name="user3")
- transaction4 = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- transaction4.commit()
- transaction3.rollback()
- connection.execute(users.insert(), user_id=5, user_name="user5")
- transaction2.commit()
- transaction.prepare()
- transaction.commit()
- eq_(
- connection.execute(
- select(users.c.user_id).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (2,), (5,)],
- )
-
@testing.requires.two_phase_transactions
@testing.requires.two_phase_recovery
def test_two_phase_recover(self):
trans.rollback()
assert connection.connection._reset_agent is None
- def test_begin_begin_rollback_rollback(self):
- with testing.db.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin()
- assert connection.connection._reset_agent is trans
- trans2.rollback()
- assert connection.connection._reset_agent is None
- trans.rollback()
- assert connection.connection._reset_agent is None
-
- def test_begin_begin_commit_commit(self):
- with testing.db.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin()
- assert connection.connection._reset_agent is trans
- trans2.commit()
- assert connection.connection._reset_agent is trans
- trans.commit()
- assert connection.connection._reset_agent is None
-
@testing.requires.two_phase_transactions
def test_reset_via_agent_begin_twophase(self):
with testing.db.connect() as connection: