--- /dev/null
+.. change::
+ :tags: usecase, engine, orm
+ :tickets: 6288
+
+ Applied consistent behavior to the use case of
+ calling ``.commit()`` or ``.rollback()`` inside of an existing
+ ``.begin()`` context manager, with the addition of potentially
+ emitting SQL within the block subsequent to the commit or rollback.
+ This change continues upon the change first added in
+ :ticket:`6155` where the use case of calling "rollback" inside of
+ a ``.begin()`` contextmanager block was proposed:
+
+ * calling ``.commit()`` or ``.rollback()`` will now be allowed
+ without error or warning within all scopes, including
+ that of legacy and future :class:`_engine.Engine`, ORM
+ :class:`_orm.Session`, asyncio :class:`.AsyncEngine`. Previously,
+ the :class:`_orm.Session` disallowed this.
+
+ * The remaining scope of the context manager is then closed;
+ when the block ends, a check is emitted to see if the transaction
+ was already ended, and if so the block returns without action.
+
+ * It will now raise **an error** if subsequent SQL of any kind
+ is emitted within the block, **after** ``.commit()`` or
+ ``.rollback()`` is called. The block should be closed as
+ the state of the executable object would otherwise be undefined
+ in this state.
from .interfaces import ExceptionContext
from .util import _distill_params
from .util import _distill_params_20
+from .util import TransactionalContext
from .. import exc
from .. import inspection
from .. import log
_is_future = False
_sqla_logger_namespace = "sqlalchemy.engine.Connection"
+ # used by sqlalchemy.engine.util.TransactionalContext
+ _trans_context_manager = None
+
def __init__(
self,
engine,
):
self._invalid_transaction()
+ elif self._trans_context_manager:
+ TransactionalContext._trans_ctx_check(self)
+
if self._is_future and self._transaction is None:
self._autobegin()
self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
-class Transaction(object):
+class Transaction(TransactionalContext):
"""Represent a database transaction in progress.
The :class:`.Transaction` object is procured by
finally:
assert not self.is_active
- def __enter__(self):
- return self
+ def _get_subject(self):
+ return self.connection
- def __exit__(self, type_, value, traceback):
- if type_ is None and self.is_active:
- try:
- self.commit()
- except:
- with util.safe_reraise():
- self.rollback()
- else:
- if self._deactivated_from_connection:
- self.close()
- else:
- self.rollback()
+ def _transaction_is_active(self):
+ return self.is_active
+
+ def _transaction_is_closed(self):
+ return not self._deactivated_from_connection
class MarkerTransaction(Transaction):
)
self.connection = connection
+
+ if connection._trans_context_manager:
+ TransactionalContext._trans_ctx_check(connection)
+
if connection._nested_transaction is not None:
self._transaction = connection._nested_transaction
else:
def __init__(self, connection):
assert connection._transaction is None
+ if connection._trans_context_manager:
+ TransactionalContext._trans_ctx_check(connection)
self.connection = connection
self._connection_begin_impl()
connection._transaction = self
def __init__(self, connection):
assert connection._transaction is not None
+ if connection._trans_context_manager:
+ TransactionalContext._trans_ctx_check(connection)
self.connection = connection
self._savepoint = self.connection._savepoint_impl()
self.is_active = True
self.close_with_result = close_with_result
def __enter__(self):
+ self.transaction.__enter__()
return self.conn
def __exit__(self, type_, value, traceback):
try:
- if type_ is not None:
- if self.transaction.is_active:
- self.transaction.rollback()
- else:
- if self.transaction.is_active:
- self.transaction.commit()
+ self.transaction.__exit__(type_, value, traceback)
finally:
if not self.close_with_result:
self.conn.close()
return (params,), _no_kw
else:
raise exc.ArgumentError("mapping or sequence expected for parameters")
+
+
+class TransactionalContext(object):
+ """Apply Python context manager behavior to transaction objects.
+
+ Performs validation to ensure the subject of the transaction is not
+ used if the transaction were ended prematurely.
+
+ """
+
+ _trans_subject = None
+
+ def _transaction_is_active(self):
+ raise NotImplementedError()
+
+ def _transaction_is_closed(self):
+ raise NotImplementedError()
+
+ def _get_subject(self):
+ raise NotImplementedError()
+
+ @classmethod
+ def _trans_ctx_check(cls, subject):
+ trans_context = subject._trans_context_manager
+ if trans_context:
+ if not trans_context._transaction_is_active():
+ raise exc.InvalidRequestError(
+ "Can't operate on closed transaction inside context "
+ "manager. Please complete the context manager "
+ "before emitting further commands."
+ )
+
+ def __enter__(self):
+ subject = self._get_subject()
+
+ # none for outer transaction, may be non-None for nested
+ # savepoint, legacy nesting cases
+ trans_context = subject._trans_context_manager
+ self._outer_trans_ctx = trans_context
+
+ self._trans_subject = subject
+ subject._trans_context_manager = self
+ return self
+
+ def __exit__(self, type_, value, traceback):
+ subject = self._trans_subject
+
+ # simplistically we could assume that
+ # "subject._trans_context_manager is self". However, any calling
+ # code that is manipulating __exit__ directly would break this
+ # assumption. alembic context manager
+ # is an example of partial use that just calls __exit__ and
+ # not __enter__ at the moment. it's safe to assume this is being done
+ # in the wild also
+ out_of_band_exit = (
+ subject is None or subject._trans_context_manager is not self
+ )
+
+ if type_ is None and self._transaction_is_active():
+ try:
+ self.commit()
+ except:
+ with util.safe_reraise():
+ self.rollback()
+ finally:
+ if not out_of_band_exit:
+ subject._trans_context_manager = self._outer_trans_ctx
+ self._trans_subject = self._outer_trans_ctx = None
+ else:
+ try:
+ if not self._transaction_is_active():
+ if not self._transaction_is_closed():
+ self.close()
+ else:
+ self.rollback()
+ finally:
+ if not out_of_band_exit:
+ subject._trans_context_manager = self._outer_trans_ctx
+ self._trans_subject = self._outer_trans_ctx = None
class StartableContext(abc.ABC):
@abc.abstractmethod
- async def start(self) -> "StartableContext":
+ async def start(self, is_ctxmanager=False) -> "StartableContext":
pass
def __await__(self):
return self.start().__await__()
async def __aenter__(self):
- return await self.start()
+ return await self.start(is_ctxmanager=True)
@abc.abstractmethod
async def __aexit__(self, type_, value, traceback):
self.sync_engine = async_engine.sync_engine
self.sync_connection = sync_connection
- async def start(self):
+ async def start(self, is_ctxmanager=False):
"""Start this :class:`_asyncio.AsyncConnection` object's context
outside of using a Python ``with:`` block.
def __init__(self, conn):
self.conn = conn
- async def start(self):
- await self.conn.start()
+ async def start(self, is_ctxmanager=False):
+ await self.conn.start(is_ctxmanager=is_ctxmanager)
self.transaction = self.conn.begin()
await self.transaction.__aenter__()
return self.conn
async def __aexit__(self, type_, value, traceback):
- if type_ is not None:
- await self.transaction.rollback()
- else:
- if self.transaction.is_active:
- await self.transaction.commit()
+ await self.transaction.__aexit__(type_, value, traceback)
await self.conn.close()
def __init__(self, sync_engine: Engine):
await greenlet_spawn(self._sync_transaction().commit)
- async def start(self):
+ async def start(self, is_ctxmanager=False):
"""Start this :class:`_asyncio.AsyncTransaction` object's context
outside of using a Python ``with:`` block.
if self.nested
else self.connection._sync_connection().begin
)
+ if is_ctxmanager:
+ self.sync_transaction.__enter__()
return self
async def __aexit__(self, type_, value, traceback):
- if type_ is None and self.is_active:
- try:
- await self.commit()
- except:
- with util.safe_reraise():
- await self.rollback()
- else:
- await self.rollback()
+ await greenlet_spawn(
+ self._sync_transaction().__exit__, type_, value, traceback
+ )
def _get_sync_engine_or_connection(async_engine):
await greenlet_spawn(self._sync_transaction().commit)
- async def start(self):
+ async def start(self, is_ctxmanager=False):
self.sync_transaction = await greenlet_spawn(
self.session.sync_session.begin_nested
if self.nested
else self.session.sync_session.begin
)
+ if is_ctxmanager:
+ self.sync_transaction.__enter__()
return self
async def __aexit__(self, type_, value, traceback):
- return await greenlet_spawn(
+ await greenlet_spawn(
self._sync_transaction().__exit__, type_, value, traceback
)
def __enter__(self):
self.transaction = self.conn.begin()
+ self.transaction.__enter__()
return self.conn
def __exit__(self, type_, value, traceback):
try:
- if type_ is not None:
- if self.transaction.is_active:
- self.transaction.rollback()
- else:
- if self.transaction.is_active:
- self.transaction.commit()
+ self.transaction.__exit__(type_, value, traceback)
finally:
self.conn.close()
from .. import exc as sa_exc
from .. import sql
from .. import util
+from ..engine.util import TransactionalContext
from ..inspection import inspect
from ..sql import coercions
from ..sql import dml
]
-class SessionTransaction(object):
+class SessionTransaction(TransactionalContext):
"""A :class:`.Session`-level transaction.
:class:`.SessionTransaction` is produced from the
nested=False,
autobegin=False,
):
+ TransactionalContext._trans_ctx_check(session)
+
self.session = session
self._connections = {}
self._parent = parent
self.session = None
self._connections = None
- def __enter__(self):
- return self
+ def _get_subject(self):
+ return self.session
- def __exit__(self, type_, value, traceback):
- self._assert_active(deactive_ok=True, prepared_ok=True)
- if self.session._transaction is None:
- return
- if type_ is None:
- try:
- self.commit()
- except:
- with util.safe_reraise():
- self.rollback()
- else:
- self.rollback()
+ def _transaction_is_active(self):
+ return self._state is ACTIVE
+
+ def _transaction_is_closed(self):
+ return self._state is CLOSED
class Session(_SessionClassMethods):
_sessions[self.hash_key] = self
+ # used by sqlalchemy.engine.util.TransactionalContext
+ _trans_context_manager = None
+
connection_callable = None
def __enter__(self):
def _autobegin(self):
if not self.autocommit and self._transaction is None:
+
trans = SessionTransaction(self, autobegin=True)
assert self._transaction is trans
return True
)
def _connection_for_bind(self, engine, execution_options=None, **kw):
+ TransactionalContext._trans_ctx_check(self)
+
if self._transaction is not None or self._autobegin():
return self._transaction._connection_for_bind(
engine, execution_options
else:
drop_all_tables_from_metadata(metadata, config.db)
+ @config.fixture(
+ params=[
+ (rollback, second_operation, begin_nested)
+ for rollback in (True, False)
+ for second_operation in ("none", "execute", "begin")
+ for begin_nested in (
+ True,
+ False,
+ )
+ ]
+ )
+ def trans_ctx_manager_fixture(self, request, metadata):
+ rollback, second_operation, begin_nested = request.param
+
+ from sqlalchemy import Table, Column, Integer, func, select
+ from . import eq_
+
+ t = Table("test", metadata, Column("data", Integer))
+ eng = getattr(self, "bind", None) or config.db
+
+ t.create(eng)
+
+ def run_test(subject, trans_on_subject, execute_on_subject):
+ with subject.begin() as trans:
+
+ if begin_nested:
+ if not config.requirements.savepoints.enabled:
+ config.skip_test("savepoints not enabled")
+ if execute_on_subject:
+ nested_trans = subject.begin_nested()
+ else:
+ nested_trans = trans.begin_nested()
+
+ with nested_trans:
+ if execute_on_subject:
+ subject.execute(t.insert(), {"data": 10})
+ else:
+ trans.execute(t.insert(), {"data": 10})
+
+ # for nested trans, we always commit/rollback on the
+ # "nested trans" object itself.
+ # only Session(future=False) will affect savepoint
+ # transaction for session.commit/rollback
+
+ if rollback:
+ nested_trans.rollback()
+ else:
+ nested_trans.commit()
+
+ if second_operation != "none":
+ with assertions.expect_raises_message(
+ sa.exc.InvalidRequestError,
+ "Can't operate on closed transaction "
+ "inside context "
+ "manager. Please complete the context "
+ "manager "
+ "before emitting further commands.",
+ ):
+ if second_operation == "execute":
+ if execute_on_subject:
+ subject.execute(
+ t.insert(), {"data": 12}
+ )
+ else:
+ trans.execute(t.insert(), {"data": 12})
+ elif second_operation == "begin":
+ if execute_on_subject:
+ subject.begin_nested()
+ else:
+ trans.begin_nested()
+
+ # outside the nested trans block, but still inside the
+ # transaction block, we can run SQL, and it will be
+ # committed
+ if execute_on_subject:
+ subject.execute(t.insert(), {"data": 14})
+ else:
+ trans.execute(t.insert(), {"data": 14})
+
+ else:
+ if execute_on_subject:
+ subject.execute(t.insert(), {"data": 10})
+ else:
+ trans.execute(t.insert(), {"data": 10})
+
+ if trans_on_subject:
+ if rollback:
+ subject.rollback()
+ else:
+ subject.commit()
+ else:
+ if rollback:
+ trans.rollback()
+ else:
+ trans.commit()
+
+ if second_operation != "none":
+ with assertions.expect_raises_message(
+ sa.exc.InvalidRequestError,
+ "Can't operate on closed transaction inside "
+ "context "
+ "manager. Please complete the context manager "
+ "before emitting further commands.",
+ ):
+ if second_operation == "execute":
+ if execute_on_subject:
+ subject.execute(t.insert(), {"data": 12})
+ else:
+ trans.execute(t.insert(), {"data": 12})
+ elif second_operation == "begin":
+ if hasattr(trans, "begin"):
+ trans.begin()
+ else:
+ subject.begin()
+ elif second_operation == "begin_nested":
+ if execute_on_subject:
+ subject.begin_nested()
+ else:
+ trans.begin_nested()
+
+ expected_committed = 0
+ if begin_nested:
+ # begin_nested variant, we inserted a row after the nested
+ # block
+ expected_committed += 1
+ if not rollback:
+ # not rollback variant, our row inserted in the target
+ # block itself would be committed
+ expected_committed += 1
+
+ if execute_on_subject:
+ eq_(
+ subject.scalar(select(func.count()).select_from(t)),
+ expected_committed,
+ )
+ else:
+ with subject.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count()).select_from(t)),
+ expected_committed,
+ )
+
+ return run_test
+
_connection_fixture_connection = None
asyncio = None # noqa F811
def _not_implemented():
+ # this conditional is to prevent pylance from considering
+ # greenlet_spawn() etc as "no return" and dimming out code below it
+ if have_greenlet:
+ return None
+
if not compat.py3k:
raise ValueError("Cannot use this function in py2.")
else:
with testing.db.connect() as conn:
yield conn
+ def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
+ fn = trans_ctx_manager_fixture
+
+ # add commit/rollback to the legacy Connection object so that
+ # we can test this less-likely case in use with the legacy
+ # Engine.begin() context manager
+ class ConnWCommitRollback(testing.db._connection_cls):
+ def commit(self):
+ self.get_transaction().commit()
+
+ def rollback(self):
+ self.get_transaction().rollback()
+
+ with mock.patch.object(
+ testing.db, "_connection_cls", ConnWCommitRollback
+ ):
+ fn(testing.db, trans_on_subject=False, execute_on_subject=False)
+
+ def test_interrupt_ctxmanager_connection(self, trans_ctx_manager_fixture):
+ fn = trans_ctx_manager_fixture
+
+ with testing.db.connect() as conn:
+ fn(conn, trans_on_subject=False, execute_on_subject=True)
+
def test_commits(self, local_connection):
users = self.tables.users
connection = local_connection
trans.rollback()
assert not local_connection.in_transaction()
- # would be subject to autocommit
- local_connection.execute(select(1))
+ # previously, would be subject to autocommit.
+ # now it raises
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Can't operate on closed transaction inside context manager. "
+ "Please complete the context manager before emitting "
+ "further commands.",
+ ):
+ local_connection.execute(select(1))
assert not local_connection.in_transaction()
connection = local_connection
users = self.tables.users
trans = connection.begin()
+ trans.__enter__()
connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
try:
)
trans = connection.begin()
+ trans.__enter__()
connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
trans.__exit__(None, None, None)
assert not trans.is_active
with testing.db.connect() as conn:
yield conn
+ def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
+ fn = trans_ctx_manager_fixture
+
+ fn(testing.db, trans_on_subject=False, execute_on_subject=False)
+
+ @testing.combinations((True,), (False,), argnames="trans_on_subject")
+ def test_interrupt_ctxmanager_connection(
+ self, trans_ctx_manager_fixture, trans_on_subject
+ ):
+ fn = trans_ctx_manager_fixture
+
+ with testing.db.connect() as conn:
+ fn(
+ conn,
+ trans_on_subject=trans_on_subject,
+ execute_on_subject=True,
+ )
+
def test_autobegin_rollback(self):
users = self.tables.users
with testing.db.connect() as conn:
trans.rollback()
assert not local_connection.in_transaction()
- # autobegin
- local_connection.execute(select(1))
+ # previously, would be subject to autocommit.
+ # now it raises
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Can't operate on closed transaction inside context manager. "
+ "Please complete the context manager before emitting "
+ "further commands.",
+ ):
+ local_connection.execute(select(1))
- assert local_connection.in_transaction()
+ assert not local_connection.in_transaction()
@testing.combinations((True,), (False,), argnames="roll_back_in_block")
def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block):
from sqlalchemy.ext.asyncio import engine as _async_engine
from sqlalchemy.ext.asyncio import exc as asyncio_exc
from sqlalchemy.pool import AsyncAdaptedQueuePool
+from sqlalchemy.testing import assertions
from sqlalchemy.testing import async_test
from sqlalchemy.testing import combinations
+from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
from sqlalchemy.util.concurrency import greenlet_spawn
-class EngineFixture(fixtures.TablesTest):
+class AsyncFixture:
+ @config.fixture(
+ params=[
+ (rollback, run_second_execute, begin_nested)
+ for rollback in (True, False)
+ for run_second_execute in (True, False)
+ for begin_nested in (True, False)
+ ]
+ )
+ def async_trans_ctx_manager_fixture(self, request, metadata):
+ rollback, run_second_execute, begin_nested = request.param
+
+ from sqlalchemy import Table, Column, Integer, func, select
+
+ t = Table("test", metadata, Column("data", Integer))
+ eng = getattr(self, "bind", None) or config.db
+
+ t.create(eng)
+
+ async def run_test(subject, trans_on_subject, execute_on_subject):
+ async with subject.begin() as trans:
+
+ if begin_nested:
+ if not config.requirements.savepoints.enabled:
+ config.skip_test("savepoints not enabled")
+ if execute_on_subject:
+ nested_trans = subject.begin_nested()
+ else:
+ nested_trans = trans.begin_nested()
+
+ async with nested_trans:
+ if execute_on_subject:
+ await subject.execute(t.insert(), {"data": 10})
+ else:
+ await trans.execute(t.insert(), {"data": 10})
+
+ # for nested trans, we always commit/rollback on the
+ # "nested trans" object itself.
+ # only Session(future=False) will affect savepoint
+ # transaction for session.commit/rollback
+
+ if rollback:
+ await nested_trans.rollback()
+ else:
+ await nested_trans.commit()
+
+ if run_second_execute:
+ with assertions.expect_raises_message(
+ exc.InvalidRequestError,
+ "Can't operate on closed transaction "
+ "inside context manager. Please complete the "
+ "context manager "
+ "before emitting further commands.",
+ ):
+ if execute_on_subject:
+ await subject.execute(
+ t.insert(), {"data": 12}
+ )
+ else:
+ await trans.execute(
+ t.insert(), {"data": 12}
+ )
+
+ # outside the nested trans block, but still inside the
+ # transaction block, we can run SQL, and it will be
+ # committed
+ if execute_on_subject:
+ await subject.execute(t.insert(), {"data": 14})
+ else:
+ await trans.execute(t.insert(), {"data": 14})
+
+ else:
+ if execute_on_subject:
+ await subject.execute(t.insert(), {"data": 10})
+ else:
+ await trans.execute(t.insert(), {"data": 10})
+
+ if trans_on_subject:
+ if rollback:
+ await subject.rollback()
+ else:
+ await subject.commit()
+ else:
+ if rollback:
+ await trans.rollback()
+ else:
+ await trans.commit()
+
+ if run_second_execute:
+ with assertions.expect_raises_message(
+ exc.InvalidRequestError,
+ "Can't operate on closed transaction inside "
+ "context "
+ "manager. Please complete the context manager "
+ "before emitting further commands.",
+ ):
+ if execute_on_subject:
+ await subject.execute(t.insert(), {"data": 12})
+ else:
+ await trans.execute(t.insert(), {"data": 12})
+
+ expected_committed = 0
+ if begin_nested:
+ # begin_nested variant, we inserted a row after the nested
+ # block
+ expected_committed += 1
+ if not rollback:
+ # not rollback variant, our row inserted in the target
+ # block itself would be committed
+ expected_committed += 1
+
+ if execute_on_subject:
+ eq_(
+ await subject.scalar(select(func.count()).select_from(t)),
+ expected_committed,
+ )
+ else:
+ with subject.connect() as conn:
+ eq_(
+ await conn.scalar(select(func.count()).select_from(t)),
+ expected_committed,
+ )
+
+ return run_test
+
+
+class EngineFixture(AsyncFixture, fixtures.TablesTest):
__requires__ = ("async_dialect",)
@testing.fixture
async with async_engine.connect() as conn:
eq_(await conn.scalar(text("select 1")), 2)
+ @async_test
+ async def test_interrupt_ctxmanager_connection(
+ self, async_engine, async_trans_ctx_manager_fixture
+ ):
+ fn = async_trans_ctx_manager_fixture
+
+ async with async_engine.connect() as conn:
+ await fn(conn, trans_on_subject=False, execute_on_subject=True)
+
def test_proxied_attrs_engine(self, async_engine):
sync_engine = async_engine.sync_engine
from sqlalchemy.testing import eq_
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
+from .test_engine_py3k import AsyncFixture as _AsyncFixture
from ...orm import _fixtures
-class AsyncFixture(_fixtures.FixtureTest):
+class AsyncFixture(_AsyncFixture, _fixtures.FixtureTest):
__requires__ = ("async_dialect",)
@classmethod
class AsyncSessionTransactionTest(AsyncFixture):
run_inserts = None
+ @async_test
+ async def test_interrupt_ctxmanager_connection(
+ self, async_trans_ctx_manager_fixture, async_session
+ ):
+ fn = async_trans_ctx_manager_fixture
+
+ await fn(async_session, trans_on_subject=True, execute_on_subject=True)
+
@async_test
async def test_sessionmaker_block_one(self, async_engine):
from sqlalchemy.testing import assert_warnings
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
eq_(sess.connection().execute(users.select()).all(), [(1, "user1")])
sess.close()
+ @testing.combinations((True,), (False,), argnames="future")
+ def test_interrupt_ctxmanager(self, trans_ctx_manager_fixture, future):
+ fn = trans_ctx_manager_fixture
+
+ session = fixture_session(future=future)
+
+ fn(session, trans_on_subject=True, execute_on_subject=True)
+
+ @testing.combinations((True,), (False,), argnames="future")
+ @testing.combinations((True,), (False,), argnames="rollback")
+ @testing.combinations((True,), (False,), argnames="expire_on_commit")
+ @testing.combinations(
+ ("add",),
+ ("modify",),
+ ("delete",),
+ ("begin",),
+ argnames="check_operation",
+ )
+ def test_interrupt_ctxmanager_ops(
+ self, future, rollback, expire_on_commit, check_operation
+ ):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ session = fixture_session(
+ future=future, expire_on_commit=expire_on_commit
+ )
+
+ with session.begin():
+ u1 = User(id=7, name="u1")
+ session.add(u1)
+
+ with session.begin():
+ u1.name # unexpire
+ u2 = User(id=8, name="u1")
+ session.add(u2)
+
+ session.flush()
+
+ if rollback:
+ session.rollback()
+ else:
+ session.commit()
+
+ with expect_raises_message(
+ sa_exc.InvalidRequestError,
+ "Can't operate on closed transaction "
+ "inside context manager. Please complete the context "
+ "manager before emitting further commands.",
+ ):
+ if check_operation == "add":
+ u3 = User(id=9, name="u2")
+ session.add(u3)
+ elif check_operation == "begin":
+ session.begin()
+ elif check_operation == "modify":
+ u1.name = "newname"
+ elif check_operation == "delete":
+ session.delete(u1)
+
class TransactionFlagsTest(fixtures.TestBase):
def test_in_transaction(self):
self._assert_count(1)
- @testing.requires.savepoints
- def test_something_with_rollback(self):
- A = self.A
-
- a1 = A()
- self.session.add(a1)
- self.session.flush()
-
- self._assert_count(1)
- self.session.rollback()
- self._assert_count(0)
-
- a1 = A()
- self.session.add(a1)
- self.session.commit()
- self._assert_count(1)
-
- a2 = A()
-
- self.session.add(a2)
- self.session.flush()
- self._assert_count(2)
-
- self.session.rollback()
- self._assert_count(1)
-
def _assert_count(self, count):
result = self.connection.scalar(
select(func.count()).select_from(self.table)
if self.trans.is_active:
self.trans.rollback()
+ @testing.requires.savepoints
+ def test_something_with_context_managers(self):
+ A = self.A
+
+ a1 = A()
+
+ with self.session.begin():
+ self.session.add(a1)
+ self.session.flush()
+
+ self._assert_count(1)
+ self.session.rollback()
+
+ self._assert_count(0)
+
+ a1 = A()
+ with self.session.begin():
+ self.session.add(a1)
+
+ self._assert_count(1)
+
+ a2 = A()
+
+ with self.session.begin():
+ self.session.add(a2)
+ self.session.flush()
+ self._assert_count(2)
+
+ self.session.rollback()
+ self._assert_count(1)
+
class FutureJoinIntoAnExternalTransactionTest(
NewStyleJoinIntoAnExternalTransactionTest,