False, ``autocommit`` is True. In this sense the session acts
more like the "classic" SQLAlchemy 0.3 session with these.
+ .. deprecated:: 1.4 The "autocommit" parameter will be removed in
+ SQLAlchemy 2.0. :func:`_orm.create_session` will return a
+ :class:`_orm.Session` that does not include "autocommit' behavior
+ in release 2.0.
+
Usage::
>>> from sqlalchemy.orm import create_session
#
# ORM Session
#
- r"The Session.autocommit parameter is deprecated ",
r"The merge_result\(\) method is superseded by the "
r"merge_frozen_result\(\)",
- r"The Session.begin.subtransactions flag is deprecated",
]:
warnings.filterwarnings(
"ignore",
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import configure_mappers
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import join as orm_join
from sqlalchemy.orm import joinedload
)
self.mapper_registry.map_imperatively(B, table2)
- sess = create_session(self.engine)
+ sess = Session(self.engine, autoflush=False)
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
B, table2, inherits=A, polymorphic_identity="b"
)
- sess = create_session(self.engine)
+ sess = Session(self.engine, autoflush=False)
a1 = A()
a2 = A()
b1 = B(col3="b1")
)
self.mapper_registry.map_imperatively(B, table2)
- sess = create_session(self.engine)
+ sess = Session(self.engine, autoflush=False)
a1 = A(col2="a1")
a2 = A(col2="a2")
b1 = B(col2="b1")
bq += fn2
- sess = fixture_session(autocommit=True, enable_baked_queries=False)
+ sess = fixture_session(enable_baked_queries=False)
eq_(bq.add_criteria(fn3)(sess).params(id=7).all(), [(7, "jack")])
eq_(
t = bq(sess).get(tokyo.id)
return t
- Sess = sessionmaker(
- class_=Session, bind=db2, autoflush=True, autocommit=False
- )
+ Sess = sessionmaker(class_=Session, bind=db2, autoflush=True)
sess2 = Sess()
t = get_tokyo(sess)
session.expire(a1)
eq_(a1.data, "d1")
- def test_autocommit_session(self):
- A = self.classes.A
- session = self._session_fixture(autocommit=True)
- a1 = session.query(A).set_shard("main").first()
-
- eq_(a1.data, "d1")
-
class LazyLoadIdentityKeyTest(fixtures.DeclarativeMappedTest):
def _init_dbs(self):
self.mapper_registry.map_imperatively(User, users)
with testing.db.connect() as c:
- sess = Session(bind=c, autocommit=False)
+ sess = Session(bind=c)
u = User(name="u1")
sess.add(u)
sess.flush()
c.exec_driver_sql("select count(1) from users").scalar() == 0
)
- sess = Session(bind=c, autocommit=False)
+ sess = Session(bind=c)
u = User(name="u2")
sess.add(u)
sess.flush()
c.exec_driver_sql("select count(1) from users").scalar() == 0
)
- with testing.db.connect() as c:
- trans = c.begin()
- sess = Session(bind=c, autocommit=True)
- u = User(name="u3")
- sess.add(u)
- sess.flush()
- assert c.in_transaction()
- trans.commit()
- assert not c.in_transaction()
- assert (
- c.exec_driver_sql("select count(1) from users").scalar() == 1
- )
-
class SessionBindTest(fixtures.MappedTest):
@classmethod
from sqlalchemy import true
from sqlalchemy import util
from sqlalchemy.engine import default
+from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import aliased
from sqlalchemy.orm import as_declarative
from sqlalchemy.orm import attributes
"arguments in SQLAlchemy 2.0."
)
+autocommit_dep = (
+ "The Session.autocommit parameter is deprecated "
+ "and will be removed in SQLAlchemy version 2.0."
+)
+
+subtransactions_dep = (
+ "The Session.begin.subtransactions flag is deprecated "
+ "and will be removed in SQLAlchemy version 2.0."
+)
opt_strings_dep = (
"Using strings to indicate column or relationship "
"paths in loader options"
s1 = Session(testing.db)
s1.begin()
- with testing.expect_deprecated_20(
- "The Session.begin.subtransactions flag is deprecated "
- "and will be removed in SQLAlchemy version 2.0."
- ):
+ with testing.expect_deprecated_20(subtransactions_dep):
s1.begin(subtransactions=True)
s1.close()
def test_autocommit_deprecated(Self):
- with testing.expect_deprecated_20(
- "The Session.autocommit parameter is deprecated "
- "and will be removed in SQLAlchemy version 2.0."
- ):
+ with testing.expect_deprecated_20(autocommit_dep):
Session(autocommit=True)
@testing.combinations(
eq_(sess.query(User).count(), 1)
+class TransScopingTest(_fixtures.FixtureTest):
+ run_inserts = None
+ __prefer_requires__ = ("independent_connections",)
+
+ @testing.combinations((True,), (False,), argnames="begin")
+ @testing.combinations((True,), (False,), argnames="expire_on_commit")
+ @testing.combinations((True,), (False,), argnames="modify_unconditional")
+ @testing.combinations(
+ ("nothing",), ("modify",), ("add",), ("delete",), argnames="case_"
+ )
+ def test_autobegin_attr_change(
+ self, case_, begin, modify_unconditional, expire_on_commit
+ ):
+ """test :ticket:`6360`"""
+
+ autocommit = True
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with testing.expect_deprecated_20(autocommit_dep):
+ s = Session(
+ testing.db,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ )
+
+ u = User(name="x")
+ u2 = User(name="d")
+ u3 = User(name="e")
+ s.add_all([u, u2, u3])
+
+ if autocommit:
+ s.flush()
+ else:
+ s.commit()
+
+ if begin:
+ s.begin()
+
+ if case_ == "add":
+ # this autobegins
+ s.add(User(name="q"))
+ elif case_ == "delete":
+ # this autobegins
+ s.delete(u2)
+ elif case_ == "modify":
+ # this autobegins
+ u3.name = "m"
+
+ if case_ == "nothing" and not begin:
+ assert not s._transaction
+ expect_expire = expire_on_commit
+ elif autocommit and not begin:
+ assert not s._transaction
+ expect_expire = expire_on_commit
+ else:
+ assert s._transaction
+ expect_expire = True
+
+ if modify_unconditional:
+ # this autobegins
+ u.name = "y"
+ expect_expire = True
+
+ if not expect_expire:
+ assert not s._transaction
+
+ # test is that state is consistent after rollback()
+ s.rollback()
+
+ if autocommit and not begin and modify_unconditional:
+ eq_(u.name, "y")
+ else:
+ if not expect_expire:
+ assert "name" in u.__dict__
+ else:
+ assert "name" not in u.__dict__
+ eq_(u.name, "x")
+
+ def test_no_autoflush_or_commit_in_expire_w_autocommit(self):
+ """test second part of :ticket:`6233`.
+
+ Here we test that the "autoflush on unexpire" feature added
+ in :ticket:`5226` is turned off for a legacy autocommit session.
+
+ """
+
+ with testing.expect_deprecated_20(autocommit_dep):
+ s = Session(
+ testing.db,
+ autocommit=True,
+ expire_on_commit=True,
+ autoflush=True,
+ )
+
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ u1 = User(name="u1")
+ s.add(u1)
+ s.flush() # this commits
+
+ u1.name = "u2" # this does not commit
+
+ assert "id" not in u1.__dict__
+ u1.id # this unexpires
+
+ # never expired
+ eq_(u1.__dict__["name"], "u2")
+
+ eq_(u1.name, "u2")
+
+ # still in dirty collection
+ assert u1 in s.dirty
+
+
class AutocommitClosesOnFailTest(fixtures.MappedTest):
__requires__ = ("deferrable_fks",)
def test_close_transaction_on_commit_fail(self):
T2 = self.classes.T2
- session = Session(testing.db, autocommit=True)
+ with testing.expect_deprecated_20(autocommit_dep):
+ session = Session(testing.db, autocommit=True)
# with a deferred constraint, this fails at COMMIT time instead
# of at INSERT time.
)
+class AutoCommitTest(_LocalFixture):
+ __backend__ = True
+
+ def test_begin_nested_requires_trans(self):
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+ assert_raises(sa_exc.InvalidRequestError, sess.begin_nested)
+
+ def test_begin_preflush(self):
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ u1 = User(name="ed")
+ sess.add(u1)
+
+ sess.begin()
+ u2 = User(name="some other user")
+ sess.add(u2)
+ sess.rollback()
+ assert u2 not in sess
+ assert u1 in sess
+ assert sess.query(User).filter_by(name="ed").one() is u1
+
+ def test_accounting_commit_fails_add(self):
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ fail = False
+
+ def fail_fn(*arg, **kw):
+ if fail:
+ raise Exception("commit fails")
+
+ event.listen(sess, "after_flush_postexec", fail_fn)
+ u1 = User(name="ed")
+ sess.add(u1)
+
+ fail = True
+ assert_raises(Exception, sess.flush)
+ fail = False
+
+ assert u1 not in sess
+ u1new = User(id=2, name="fred")
+ sess.add(u1new)
+ sess.add(u1)
+ sess.flush()
+ assert u1 in sess
+ eq_(
+ sess.query(User.name).order_by(User.name).all(),
+ [("ed",), ("fred",)],
+ )
+
+ def test_accounting_commit_fails_delete(self):
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ fail = False
+
+ def fail_fn(*arg, **kw):
+ if fail:
+ raise Exception("commit fails")
+
+ event.listen(sess, "after_flush_postexec", fail_fn)
+ u1 = User(name="ed")
+ sess.add(u1)
+ sess.flush()
+
+ sess.delete(u1)
+ fail = True
+ assert_raises(Exception, sess.flush)
+ fail = False
+
+ assert u1 in sess
+ assert u1 not in sess.deleted
+ sess.delete(u1)
+ sess.flush()
+ assert u1 not in sess
+ eq_(sess.query(User.name).order_by(User.name).all(), [])
+
+ @testing.requires.updateable_autoincrement_pks
+ def test_accounting_no_select_needed(self):
+ """test that flush accounting works on non-expired instances
+ when autocommit=True/expire_on_commit=True."""
+
+ User = self.classes.User
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True, expire_on_commit=True)
+
+ u1 = User(id=1, name="ed")
+ sess.add(u1)
+ sess.flush()
+
+ u1.id = 3
+ u1.name = "fred"
+ self.assert_sql_count(testing.db, sess.flush, 1)
+ assert "id" not in u1.__dict__
+ eq_(u1.id, 3)
+
+
+class SessionStateTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ __prefer_requires__ = ("independent_connections",)
+
+ def test_autocommit_doesnt_raise_on_pending(self):
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+ with assertions.expect_deprecated_20(autocommit_dep):
+ session = Session(testing.db, autocommit=True)
+
+ session.add(User(name="ed"))
+
+ session.begin()
+ session.flush()
+ session.commit()
+
+
+class SessionTransactionTest(fixtures.RemovesEvents, _fixtures.FixtureTest):
+ run_inserts = None
+ __backend__ = True
+
+ @testing.fixture
+ def conn(self):
+ with testing.db.connect() as conn:
+ yield conn
+
+ @testing.fixture
+ def future_conn(self):
+
+ engine = Engine._future_facade(testing.db)
+ with engine.connect() as conn:
+ yield conn
+
+ def test_deactive_status_check(self):
+ sess = fixture_session()
+ trans = sess.begin()
+
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ trans2 = sess.begin(subtransactions=True)
+ trans2.rollback()
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This session is in 'inactive' state, due to the SQL transaction "
+ "being rolled back; no further SQL can be emitted within this "
+ "transaction.",
+ trans.commit,
+ )
+
+ def test_deactive_status_check_w_exception(self):
+ sess = fixture_session()
+ trans = sess.begin()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ trans2 = sess.begin(subtransactions=True)
+ try:
+ raise Exception("test")
+ except Exception:
+ trans2.rollback(_capture_exception=True)
+ assert_raises_message(
+ sa_exc.PendingRollbackError,
+ r"This Session's transaction has been rolled back due to a "
+ r"previous exception during flush. To begin a new transaction "
+ r"with this Session, first issue Session.rollback\(\). "
+ r"Original exception was: test",
+ trans.commit,
+ )
+
+ def test_error_on_using_inactive_session_commands(self):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+ sess.begin()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ sess.begin(subtransactions=True)
+ sess.add(User(name="u1"))
+ sess.flush()
+ sess.rollback()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This session is in 'inactive' state, due to the SQL "
+ "transaction "
+ "being rolled back; no further SQL can be emitted within this "
+ "transaction.",
+ sess.begin,
+ subtransactions=True,
+ )
+ sess.close()
+
+ def test_subtransaction_on_external_subtrans(self, conn):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ trans = conn.begin()
+ sess = Session(bind=conn, autocommit=False, autoflush=True)
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ sess.begin(subtransactions=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ trans.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ def test_subtransaction_on_noautocommit(self):
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+ sess = fixture_session(autocommit=False, autoflush=True)
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ sess.begin(subtransactions=True)
+ u = User(name="u1")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ sess.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ @testing.requires.savepoints
+ def test_heavy_nesting(self):
+ users = self.tables.users
+
+ session = fixture_session()
+ session.begin()
+ session.connection().execute(users.insert().values(name="user1"))
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ session.begin(subtransactions=True)
+ session.begin_nested()
+ session.connection().execute(users.insert().values(name="user2"))
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+ session.rollback()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 1
+ )
+ session.connection().execute(users.insert().values(name="user3"))
+ session.commit()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+
+ @testing.requires.savepoints
+ def test_heavy_nesting_future(self):
+ users = self.tables.users
+
+ from sqlalchemy.future import Engine
+
+ engine = Engine._future_facade(testing.db)
+ with Session(engine, autocommit=False) as session:
+ session.begin()
+ session.connection().execute(users.insert().values(name="user1"))
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ session.begin(subtransactions=True)
+ session.begin_nested()
+ session.connection().execute(users.insert().values(name="user2"))
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+ session.rollback()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 1
+ )
+ session.connection().execute(users.insert().values(name="user3"))
+ session.commit()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+
+ @testing.requires.savepoints
+ def test_mixed_transaction_control(self):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ sess.begin()
+ sess.begin_nested()
+ with assertions.expect_deprecated_20(subtransactions_dep):
+ transaction = sess.begin(subtransactions=True)
+
+ sess.add(User(name="u1"))
+
+ transaction.commit()
+ sess.commit()
+ sess.commit()
+
+ sess.close()
+
+ eq_(len(sess.query(User).all()), 1)
+
+ t1 = sess.begin()
+ t2 = sess.begin_nested()
+
+ sess.add(User(name="u2"))
+
+ t2.commit()
+ assert sess._legacy_transaction() is t1
+
+ sess.close()
+
+ @testing.requires.savepoints
+ def test_nested_transaction_connection_add_autocommit(self):
+ users, User = self.tables.users, self.classes.User
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = fixture_session(autocommit=True)
+
+ sess.begin()
+ sess.begin_nested()
+
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ sess.rollback()
+
+ u2 = User(name="u2")
+ sess.add(u2)
+
+ sess.commit()
+
+ eq_(set(sess.query(User).all()), set([u2]))
+
+ sess.begin()
+ sess.begin_nested()
+
+ u3 = User(name="u3")
+ sess.add(u3)
+ sess.commit() # commit the nested transaction
+ sess.rollback()
+
+ eq_(set(sess.query(User).all()), set([u2]))
+
+ sess.close()
+
+ def test_active_flag_autocommit(self):
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = Session(bind=testing.db, autocommit=True)
+ assert not sess.is_active
+ sess.begin()
+ assert sess.is_active
+ sess.rollback()
+ assert not sess.is_active
+
+
class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
run_inserts = None
+ def _listener_fixture(self, **kw):
+ canary = []
+
+ def listener(name):
+ def go(*arg, **kw):
+ canary.append(name)
+
+ return go
+
+ sess = fixture_session(**kw)
+
+ for evt in [
+ "after_transaction_create",
+ "after_transaction_end",
+ "before_commit",
+ "after_commit",
+ "after_rollback",
+ "after_soft_rollback",
+ "before_flush",
+ "after_flush",
+ "after_flush_postexec",
+ "after_begin",
+ "before_attach",
+ "after_attach",
+ "after_bulk_update",
+ "after_bulk_delete",
+ ]:
+ event.listen(sess, evt, listener(evt))
+
+ return sess, canary
+
+ def test_flush_autocommit_hook(self):
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess, canary = self._listener_fixture(
+ autoflush=False, autocommit=True, expire_on_commit=False
+ )
+
+ u = User(name="u1")
+ sess.add(u)
+ sess.flush()
+ eq_(
+ canary,
+ [
+ "before_attach",
+ "after_attach",
+ "before_flush",
+ "after_transaction_create",
+ "after_begin",
+ "after_flush",
+ "after_flush_postexec",
+ "before_commit",
+ "after_commit",
+ "after_transaction_end",
+ ],
+ )
+
def test_on_bulk_update_hook(self):
User, users = self.classes.User, self.tables.users
assert ad2 in u1.addresses
self.assert_sql_count(testing.db, go, 1)
+
+
+class BindIntegrationTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ def test_bound_connection_transactional(self):
+ User, users = self.classes.User, self.tables.users
+
+ self.mapper_registry.map_imperatively(User, users)
+
+ with testing.db.connect() as c:
+ trans = c.begin()
+
+ with assertions.expect_deprecated_20(autocommit_dep):
+ sess = Session(bind=c, autocommit=True)
+ u = User(name="u3")
+ sess.add(u)
+ sess.flush()
+ assert c.in_transaction()
+ trans.commit()
+ assert not c.in_transaction()
+ assert (
+ c.exec_driver_sql("select count(1) from users").scalar() == 1
+ )
return sess, canary
- def test_flush_autocommit_hook(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
-
- sess, canary = self._listener_fixture(
- autoflush=False, autocommit=True, expire_on_commit=False
- )
-
- u = User(name="u1")
- sess.add(u)
- sess.flush()
- eq_(
- canary,
- [
- "before_attach",
- "after_attach",
- "before_flush",
- "after_transaction_create",
- "after_begin",
- "after_flush",
- "after_flush_postexec",
- "before_commit",
- "after_commit",
- "after_transaction_end",
- ],
- )
-
def test_rollback_hook(self):
User, users = self.classes.User, self.tables.users
sess, canary = self._listener_fixture()
Session = scoped_session(sa.orm.sessionmaker(), mock_scope_func)
s0 = SessionMaker()
- assert s0.autocommit == False
+ assert s0.autoflush == True
mock_scope_func.return_value = 0
s1 = Session()
- assert s1.autocommit == False
+ assert s1.autoflush == True
assert_raises_message(
sa.exc.InvalidRequestError,
"Scoped session is already present",
Session,
- autocommit=True,
+ autoflush=False,
)
mock_scope_func.return_value = 1
- s2 = Session(autocommit=True)
- assert s2.autocommit == True
+ s2 = Session(autoflush=False)
+ assert s2.autoflush == False
def test_methods_etc(self):
mock_session = Mock()
assert not s.in_transaction()
eq_(s.connection().scalar(select(User.name)), "u1")
- def test_no_autoflush_or_commit_in_expire_w_autocommit(self):
- """test second part of :ticket:`6233`.
-
- Here we test that the "autoflush on unexpire" feature added
- in :ticket:`5226` is turned off for a legacy autocommit session.
-
- """
-
- s = Session(
- testing.db, autocommit=True, expire_on_commit=True, autoflush=True
- )
-
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
-
- u1 = User(name="u1")
- s.add(u1)
- s.flush() # this commits
-
- u1.name = "u2" # this does not commit
-
- assert "id" not in u1.__dict__
- u1.id # this unexpires
-
- # never expired
- eq_(u1.__dict__["name"], "u2")
-
- eq_(u1.name, "u2")
-
- # still in dirty collection
- assert u1 in s.dirty
-
def test_autobegin_begin_method(self):
s = Session(testing.db)
s.begin,
)
- @testing.combinations((True,), (False,), argnames="autocommit")
@testing.combinations((True,), (False,), argnames="begin")
@testing.combinations((True,), (False,), argnames="expire_on_commit")
@testing.combinations((True,), (False,), argnames="modify_unconditional")
("nothing",), ("modify",), ("add",), ("delete",), argnames="case_"
)
def test_autobegin_attr_change(
- self, case_, autocommit, begin, modify_unconditional, expire_on_commit
+ self, case_, begin, modify_unconditional, expire_on_commit
):
"""test :ticket:`6360`"""
s = Session(
testing.db,
- autocommit=autocommit,
expire_on_commit=expire_on_commit,
)
u3 = User(name="e")
s.add_all([u, u2, u3])
- if autocommit:
- s.flush()
- else:
- s.commit()
+ s.commit()
if begin:
s.begin()
if case_ == "nothing" and not begin:
assert not s._transaction
expect_expire = expire_on_commit
- elif autocommit and not begin:
- assert not s._transaction
- expect_expire = expire_on_commit
else:
assert s._transaction
expect_expire = True
# test is that state is consistent after rollback()
s.rollback()
- if autocommit and not begin and modify_unconditional:
- eq_(u.name, "y")
+ if not expect_expire:
+ assert "name" in u.__dict__
else:
- if not expect_expire:
- assert "name" in u.__dict__
- else:
- assert "name" not in u.__dict__
- eq_(u.name, "x")
+ assert "name" not in u.__dict__
+ eq_(u.name, "x")
@testing.requires.independent_connections
@engines.close_open_connections
)
sess.commit()
- def test_autocommit_doesnt_raise_on_pending(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
- session = Session(testing.db, autocommit=True)
-
- session.add(User(name="ed"))
-
- session.begin()
- session.flush()
- session.commit()
-
- def test_active_flag_autocommit(self):
- sess = Session(bind=config.db, autocommit=True)
- assert not sess.is_active
- sess.begin()
- assert sess.is_active
- sess.rollback()
- assert not sess.is_active
-
def test_active_flag_autobegin(self):
sess = Session(bind=config.db, autocommit=False)
assert sess.is_active
assert s._legacy_transaction() is tran
tran.close()
- def test_subtransaction_on_external_subtrans(self, conn):
- users, User = self.tables.users, self.classes.User
-
- self.mapper_registry.map_imperatively(User, users)
-
- trans = conn.begin()
- sess = Session(bind=conn, autocommit=False, autoflush=True)
- sess.begin(subtransactions=True)
- u = User(name="ed")
- sess.add(u)
- sess.flush()
- sess.commit() # commit does nothing
- trans.rollback() # rolls back
- assert len(sess.query(User).all()) == 0
- sess.close()
-
def test_subtransaction_on_external_no_begin(self, conn):
users, User = self.tables.users, self.classes.User
session.rollback()
assert u1 in session
- @testing.requires.savepoints
- def test_heavy_nesting(self):
- users = self.tables.users
-
- session = fixture_session()
- session.begin()
- session.connection().execute(users.insert().values(name="user1"))
- session.begin(subtransactions=True)
- session.begin_nested()
- session.connection().execute(users.insert().values(name="user2"))
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
- session.rollback()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 1
- )
- session.connection().execute(users.insert().values(name="user3"))
- session.commit()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
-
- @testing.requires.savepoints
- def test_heavy_nesting_future(self):
- users = self.tables.users
-
- engine = Engine._future_facade(testing.db)
- with Session(engine, autocommit=False) as session:
- session.begin()
- session.connection().execute(users.insert().values(name="user1"))
- session.begin(subtransactions=True)
- session.begin_nested()
- session.connection().execute(users.insert().values(name="user2"))
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
- session.rollback()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 1
- )
- session.connection().execute(users.insert().values(name="user3"))
- session.commit()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
-
@testing.requires.savepoints
def test_dirty_state_transferred_deep_nesting(self):
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(Address, addresses)
engine2 = engines.testing_engine()
- sess = fixture_session(autocommit=True, autoflush=False, twophase=True)
+ sess = fixture_session(autoflush=False, twophase=True)
sess.bind_mapper(User, testing.db)
sess.bind_mapper(Address, engine2)
sess.begin()
assert not c2.invalidated
assert c2.connection.is_valid
- def test_subtransaction_on_noautocommit(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
- sess = fixture_session(autocommit=False, autoflush=True)
- sess.begin(subtransactions=True)
- u = User(name="u1")
- sess.add(u)
- sess.flush()
- sess.commit() # commit does nothing
- sess.rollback() # rolls back
- assert len(sess.query(User).all()) == 0
- sess.close()
-
@testing.requires.savepoints
def test_nested_transaction(self):
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
- sess = fixture_session(autocommit=True)
+ sess = fixture_session()
sess.begin()
sess.begin_nested()
sess.commit()
eq_(set(sess.query(User).all()), set([u2]))
+ sess.rollback()
sess.begin()
sess.begin_nested()
sess.close()
- @testing.requires.savepoints
- def test_mixed_transaction_control(self):
- users, User = self.tables.users, self.classes.User
-
- self.mapper_registry.map_imperatively(User, users)
-
- sess = fixture_session(autocommit=True)
-
- sess.begin()
- sess.begin_nested()
- transaction = sess.begin(subtransactions=True)
-
- sess.add(User(name="u1"))
-
- transaction.commit()
- sess.commit()
- sess.commit()
-
- sess.close()
-
- eq_(len(sess.query(User).all()), 1)
-
- t1 = sess.begin()
- t2 = sess.begin_nested()
-
- sess.add(User(name="u2"))
-
- t2.commit()
- assert sess._legacy_transaction() is t1
-
- sess.close()
-
@testing.requires.savepoints
def test_mixed_transaction_close(self):
users, User = self.tables.users, self.classes.User
sess.commit,
)
- def test_error_on_using_inactive_session_commands(self):
- users, User = self.tables.users, self.classes.User
-
- self.mapper_registry.map_imperatively(User, users)
- sess = fixture_session(autocommit=True)
- sess.begin()
- sess.begin(subtransactions=True)
- sess.add(User(name="u1"))
- sess.flush()
- sess.rollback()
- assert_raises_message(
- sa_exc.InvalidRequestError,
- "This session is in 'inactive' state, due to the SQL transaction "
- "being rolled back; no further SQL can be emitted within this "
- "transaction.",
- sess.begin,
- subtransactions=True,
- )
- sess.close()
-
def test_no_sql_during_commit(self):
sess = fixture_session(autocommit=False)
trans.commit,
)
- def test_deactive_status_check(self):
- sess = fixture_session()
- trans = sess.begin()
- trans2 = sess.begin(subtransactions=True)
- trans2.rollback()
- assert_raises_message(
- sa_exc.InvalidRequestError,
- "This session is in 'inactive' state, due to the SQL transaction "
- "being rolled back; no further SQL can be emitted within this "
- "transaction.",
- trans.commit,
- )
-
- def test_deactive_status_check_w_exception(self):
- sess = fixture_session()
- trans = sess.begin()
- trans2 = sess.begin(subtransactions=True)
- try:
- raise Exception("test")
- except Exception:
- trans2.rollback(_capture_exception=True)
- assert_raises_message(
- sa_exc.PendingRollbackError,
- r"This Session's transaction has been rolled back due to a "
- r"previous exception during flush. To begin a new transaction "
- r"with this Session, first issue Session.rollback\(\). "
- r"Original exception was: test",
- trans.commit,
- )
-
def _inactive_flushed_session_fixture(self):
users, User = self.tables.users, self.classes.User
assert u1.name == "edward"
-class AutoCommitTest(_LocalFixture):
- __backend__ = True
-
- def test_begin_nested_requires_trans(self):
- sess = fixture_session(autocommit=True)
- assert_raises(sa_exc.InvalidRequestError, sess.begin_nested)
-
- def test_begin_preflush(self):
- User = self.classes.User
- sess = fixture_session(autocommit=True)
-
- u1 = User(name="ed")
- sess.add(u1)
-
- sess.begin()
- u2 = User(name="some other user")
- sess.add(u2)
- sess.rollback()
- assert u2 not in sess
- assert u1 in sess
- assert sess.query(User).filter_by(name="ed").one() is u1
-
- def test_accounting_commit_fails_add(self):
- User = self.classes.User
- sess = fixture_session(autocommit=True)
-
- fail = False
-
- def fail_fn(*arg, **kw):
- if fail:
- raise Exception("commit fails")
-
- event.listen(sess, "after_flush_postexec", fail_fn)
- u1 = User(name="ed")
- sess.add(u1)
-
- fail = True
- assert_raises(Exception, sess.flush)
- fail = False
-
- assert u1 not in sess
- u1new = User(id=2, name="fred")
- sess.add(u1new)
- sess.add(u1)
- sess.flush()
- assert u1 in sess
- eq_(
- sess.query(User.name).order_by(User.name).all(),
- [("ed",), ("fred",)],
- )
-
- def test_accounting_commit_fails_delete(self):
- User = self.classes.User
- sess = fixture_session(autocommit=True)
-
- fail = False
-
- def fail_fn(*arg, **kw):
- if fail:
- raise Exception("commit fails")
-
- event.listen(sess, "after_flush_postexec", fail_fn)
- u1 = User(name="ed")
- sess.add(u1)
- sess.flush()
-
- sess.delete(u1)
- fail = True
- assert_raises(Exception, sess.flush)
- fail = False
-
- assert u1 in sess
- assert u1 not in sess.deleted
- sess.delete(u1)
- sess.flush()
- assert u1 not in sess
- eq_(sess.query(User.name).order_by(User.name).all(), [])
-
- @testing.requires.updateable_autoincrement_pks
- def test_accounting_no_select_needed(self):
- """test that flush accounting works on non-expired instances
- when autocommit=True/expire_on_commit=True."""
-
- User = self.classes.User
- sess = fixture_session(autocommit=True, expire_on_commit=True)
-
- u1 = User(id=1, name="ed")
- sess.add(u1)
- sess.flush()
-
- u1.id = 3
- u1.name = "fred"
- self.assert_sql_count(testing.db, sess.flush, 1)
- assert "id" not in u1.__dict__
- eq_(u1.id, 3)
-
-
class ContextManagerPlusFutureTest(FixtureTest):
run_inserts = None
__backend__ = True