mysession.add(some_object)
mysession.flush()
-The flush process *always* occurs within a transaction, even if the
-:class:`~sqlalchemy.orm.session.Session` has been configured with
-``autocommit=True``, a setting that disables the session's persistent
-transactional state. If no transaction is present,
-:meth:`~.Session.flush` creates its own transaction and
-commits it. Any failures during flush will always result in a rollback of
-whatever transaction is present. If the Session is not in ``autocommit=True``
-mode, an explicit call to :meth:`~.Session.rollback` is
+The flush process *always* occurs within a transaction (subject
+to the :ref:`isolation level <session_transaction_isolation>`_ of the
+database transaction), which is
+*never* committed automatically; the :meth:`_orm.Session.commit` method
+must be called, or an appropriate context manager which does the same
+thing must be used, in order for the database changes to be committed.
+
+Any failures during flush will always result in a rollback of
+whatever transaction is present. In order to continue using that
+same :class:`_orm.Session`, an explicit call to :meth:`~.Session.rollback` is
required after a flush fails, even though the underlying transaction will have
been rolled back already - this is so that the overall nesting pattern of
so-called "subtransactions" is consistently maintained.
+.. seealso::
+
+ :ref:`faq_session_rollback` - further background on why
+ :meth:`_orm.Session.rollback` must be called when a flush fails.
+
Expiring / Refreshing
---------------------
-.. _session_autocommit:
-
.. _session_explicit_begin:
Explicit Begin
---------------
-.. versionchanged:: 1.4
- SQLAlchemy 1.4 deprecates "autocommit mode", which is historically enabled
- by using the :paramref:`_orm.Session.autocommit` flag. Going forward,
- a new approach to allowing usage of the :meth:`_orm.Session.begin` method
- is new "autobegin" behavior so that the method may now be called when
- a :class:`_orm.Session` is first constructed, or after the previous
- transaction has ended and before it begins a new one.
-
- For background on migrating away from the "subtransaction" pattern for
- frameworks that rely upon nesting of begin()/commit() pairs, see the
- next section :ref:`session_subtransactions`.
-
The :class:`_orm.Session` features "autobegin" behavior, meaning that as soon
as operations begin to take place, it ensures a :class:`_orm.SessionTransaction`
is present to track ongoing operations. This transaction is completed
own transactional processes with that of the ORM :class:`_orm.Session`.
-.. _session_subtransactions:
-
-Migrating from the "subtransaction" pattern
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-.. deprecated:: 1.4 The :paramref:`_orm.Session.begin.subtransactions`
- flag is deprecated. While the :class:`_orm.Session` still uses the
- "subtransactions" pattern internally, it is not suitable for end-user
- use as it leads to confusion, and additionally it may be removed from
- the :class:`_orm.Session` itself in version 2.0 once "autocommit"
- mode is removed.
-
-The "subtransaction" pattern that was often used with autocommit mode is
-also deprecated in 1.4. This pattern allowed the use of the
-:meth:`_orm.Session.begin` method when a transaction were already begun,
-resulting in a construct called a "subtransaction", which was essentially
-a block that would prevent the :meth:`_orm.Session.commit` method from actually
-committing.
-
-This pattern has been shown to be confusing in real world applications, and
-it is preferable for an application to ensure that the top-most level of database
-operations are performed with a single begin/commit pair.
-
-To provide backwards compatibility for applications that make use of this
-pattern, the following context manager or a similar implementation based on
-a decorator may be used::
-
-
- import contextlib
-
- @contextlib.contextmanager
- def transaction(session):
- if not session.in_transaction():
- with session.begin():
- yield
- else:
- yield
-
-
-The above context manager may be used in the same way the
-"subtransaction" flag works, such as in the following example::
-
-
- # method_a starts a transaction and calls method_b
- def method_a(session):
- with transaction(session):
- method_b(session)
-
- # method_b also starts a transaction, but when
- # called from method_a participates in the ongoing
- # transaction.
- def method_b(session):
- with transaction(session):
- session.add(SomeObject('bat', 'lala'))
-
- Session = sessionmaker(engine)
-
- # create a Session and call method_a
- with Session() as session:
- method_a(session)
-
-To compare towards the preferred idiomatic pattern, the begin block should
-be at the outermost level. This removes the need for individual functions
-or methods to be concerned with the details of transaction demarcation::
-
- def method_a(session):
- method_b(session)
-
- def method_b(session):
- session.add(SomeObject('bat', 'lala'))
-
- Session = sessionmaker(engine)
-
- # create a Session and call method_a
- with Session() as session:
- with session.begin():
- method_a(session)
-
-.. seealso::
-
- :ref:`connections_subtransactions` - similar pattern based on Core only
.. _session_twophase:
The defaults of create_session() are the opposite of that of
:func:`sessionmaker`; ``autoflush`` and ``expire_on_commit`` are
- False, ``autocommit`` is True. In this sense the session acts
- more like the "classic" SQLAlchemy 0.3 session with these.
-
- .. deprecated:: 1.4 The "autocommit" parameter will be removed in
- SQLAlchemy 2.0. :func:`_orm.create_session` will return a
- :class:`_orm.Session` that does not include "autocommit' behavior
- in release 2.0.
+ False.
Usage::
"""
- if kwargs.get("future", False):
- kwargs.setdefault("autocommit", False)
- else:
- kwargs.setdefault("autocommit", True)
-
kwargs.setdefault("autoflush", False)
kwargs.setdefault("expire_on_commit", False)
return Session(bind=bind, **kwargs)
result = False
- no_autoflush = (
- bool(passive & attributes.NO_AUTOFLUSH) or state.session.autocommit
- )
+ no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
# in the case of inheritance, particularly concrete and abstract
# concrete inheritance, the class manager might have some keys
:class:`.SessionTransaction` is at the top of the stack, and
corresponds to a real "COMMIT"/"ROLLBACK"
block. If non-``None``, then this is either a "subtransaction"
- or a "nested" / SAVEPOINT transaction. If the
+ (an internal marker object used by the flush process) or a
+ "nested" / SAVEPOINT transaction. If the
:attr:`.SessionTransaction.nested` attribute is ``True``, then
this is a SAVEPOINT, and if ``False``, indicates this a subtransaction.
_is_asyncio = False
- @util.deprecated_params(
- autocommit=(
- "2.0",
- "The :paramref:`.Session.autocommit` parameter is deprecated "
- "and will be removed in SQLAlchemy version 2.0. The "
- ':class:`_orm.Session` now features "autobegin" behavior '
- "such that the :meth:`.Session.begin` method may be called "
- "if a transaction has not yet been started yet. See the section "
- ":ref:`session_explicit_begin` for background.",
- ),
- )
def __init__(
self,
bind=None,
autoflush=True,
future=False,
expire_on_commit=True,
- autocommit=False,
twophase=False,
binds=None,
enable_baked_queries=True,
info=None,
query_cls=None,
+ autocommit=False,
):
r"""Construct a new Session.
generate a :class:`.Session`-producing callable with a given
set of arguments.
- :param autocommit:
- Defaults to ``False``. When ``True``, the
- :class:`.Session` does not automatically begin transactions for
- individual statement executions, will acquire connections from the
- engine on an as-needed basis, releasing to the connection pool
- after each statement. Flushes will begin and commit (or possibly
- rollback) their own transaction if no transaction is present.
- When using this mode, the
- :meth:`.Session.begin` method may be used to explicitly start
- transactions, but the usual "autobegin" behavior is not present.
-
:param autoflush: When ``True``, all query operations will issue a
:meth:`~.Session.flush` call to this ``Session`` before proceeding.
This is a convenience feature so that :meth:`~.Session.flush` need
not be called repeatedly in order for database queries to retrieve
- results. It's typical that ``autoflush`` is used in conjunction
- with ``autocommit=False``. In this scenario, explicit calls to
- :meth:`~.Session.flush` are rarely needed; you usually only need to
- call :meth:`~.Session.commit` (which flushes) to finalize changes.
+ results.
:param bind: An optional :class:`_engine.Engine` or
:class:`_engine.Connection` to
otherwise be configured against the :class:`_orm.sessionmaker`
in use
- * The "subtransactions" feature of :meth:`_orm.Session.begin` is
- removed in version 2.0 and is disabled when the future flag is
- set.
-
* The behavior of the :paramref:`_orm.relationship.cascade_backrefs`
flag on a :func:`_orm.relationship` will always assume
"False" behavior.
called. This allows each database to roll back the entire
transaction, before each transaction is committed.
+ :param autocommit: the "autocommit" keyword is present for backwards
+ compatibility but must remain at its default value of ``False``.
+
"""
+
+ # considering allowing the "autocommit" keyword to still be accepted
+ # as long as it's False, so that external test suites, oslo.db etc
+ # continue to function as the argument appears to be passed in lots
+ # of cases including in our own test suite
+ if autocommit:
+ raise sa_exc.ArgumentError(
+ "autocommit=True is no longer supported"
+ )
self.identity_map = identity.WeakInstanceDict()
self._new = {} # InstanceState->object, strong refs object
self.expire_on_commit = expire_on_commit
self.enable_baked_queries = enable_baked_queries
- if autocommit:
- if future:
- raise sa_exc.ArgumentError(
- "Cannot use autocommit mode with future=True."
- )
- self.autocommit = True
- else:
- self.autocommit = False
-
self.twophase = twophase
self._query_cls = query_cls if query_cls else query.Query
if info:
return {}
def _autobegin(self):
- if not self.autocommit and self._transaction is None:
+ if self._transaction is None:
trans = SessionTransaction(self, autobegin=True)
assert self._transaction is trans
return False
- @util.deprecated_params(
- subtransactions=(
- "2.0",
- "The :paramref:`_orm.Session.begin.subtransactions` flag is "
- "deprecated and "
- "will be removed in SQLAlchemy version 2.0. See "
- "the documentation at :ref:`session_subtransactions` for "
- "background on a compatible alternative pattern.",
- )
- )
- def begin(self, subtransactions=False, nested=False, _subtrans=False):
+ def begin(self, nested=False, _subtrans=False):
"""Begin a transaction, or nested transaction,
on this :class:`.Session`, if one is not already begun.
documentation on SAVEPOINT transactions, please see
:ref:`session_begin_nested`.
- :param subtransactions: if True, indicates that this
- :meth:`~.Session.begin` can create a "subtransaction".
-
:return: the :class:`.SessionTransaction` object. Note that
:class:`.SessionTransaction`
acts as a Python context manager, allowing :meth:`.Session.begin`
- to be used in a "with" block. See :ref:`session_autocommit` for
+ to be used in a "with" block. See :ref:`session_explicit_begin` for
an example.
.. seealso::
"""
- if subtransactions and self.future:
- raise NotImplementedError(
- "subtransactions are not implemented in future "
- "Session objects."
- )
-
if self._autobegin():
- if not subtransactions and not nested and not _subtrans:
+ if not nested and not _subtrans:
return self._transaction
if self._transaction is not None:
- if subtransactions or _subtrans or nested:
+ if _subtrans or nested:
trans = self._transaction._begin(nested=nested)
assert self._transaction is trans
if nested:
raise sa_exc.InvalidRequestError(
"A transaction is already begun on this Session."
)
- elif not self.autocommit:
+ else:
# outermost transaction. must be a not nested and not
# a subtransaction
- assert not nested and not _subtrans and not subtransactions
+ assert not nested and not _subtrans
trans = SessionTransaction(self)
assert self._transaction is trans
- else:
- # legacy autocommit mode
- assert not self.future
- trans = SessionTransaction(self, nested=nested)
- assert self._transaction is trans
return self._transaction # needed for __enter__/__exit__ hook
transaction is committed unconditionally, automatically releasing any
SAVEPOINTs in effect.
- When using legacy "autocommit" mode, this method is only
- valid to call if a transaction is actually in progress, else
- an error is raised. Similarly, when using legacy "subtransactions",
- the method will instead close out the current "subtransaction",
- rather than the actual database transaction, if a transaction
- is in progress.
-
.. seealso::
:ref:`session_committing`
self._transaction.prepare()
- def connection(
- self,
- bind_arguments=None,
- close_with_result=False,
- execution_options=None,
- **kw
- ):
+ def connection(self, bind_arguments=None, execution_options=None, **kw):
r"""Return a :class:`_engine.Connection` object corresponding to this
:class:`.Session` object's transactional state.
- If this :class:`.Session` is configured with ``autocommit=False``,
- either the :class:`_engine.Connection` corresponding to the current
+ Either the :class:`_engine.Connection` corresponding to the current
transaction is returned, or if no transaction is in progress, a new
one is begun and the :class:`_engine.Connection`
returned (note that no
transactional state is established with the DBAPI until the first
SQL statement is emitted).
- Alternatively, if this :class:`.Session` is configured with
- ``autocommit=True``, an ad-hoc :class:`_engine.Connection` is returned
- using :meth:`_engine.Engine.connect` on the underlying
- :class:`_engine.Engine`.
-
Ambiguity in multi-bind or unbound :class:`.Session` objects can be
resolved through any of the optional keyword arguments. This
ultimately makes usage of the :meth:`.get_bind` method for resolution.
:param clause:
deprecated; use bind_arguments
- :param close_with_result: Passed to :meth:`_engine.Engine.connect`,
- indicating the :class:`_engine.Connection` should be considered
- "single use", automatically closing when the first result set is
- closed. This flag only has an effect if this :class:`.Session` is
- configured with ``autocommit=True`` and does not already have a
- transaction in progress.
-
- .. deprecated:: 1.4 this parameter is deprecated and will be removed
- in SQLAlchemy 2.0
-
:param execution_options: a dictionary of execution options that will
be passed to :meth:`_engine.Connection.execution_options`, **when the
connection is first procured only**. If the connection is already
return self._connection_for_bind(
bind,
- close_with_result=close_with_result,
execution_options=execution_options,
)
def _connection_for_bind(self, engine, execution_options=None, **kw):
TransactionalContext._trans_ctx_check(self)
- if self._transaction is not None or self._autobegin():
- return self._transaction._connection_for_bind(
- engine, execution_options
- )
-
- assert self._transaction is None
- assert self.autocommit
- conn = engine.connect(**kw)
- if execution_options:
- conn = conn.execution_options(**execution_options)
- return conn
+ if self._transaction is None:
+ assert self._autobegin()
+ return self._transaction._connection_for_bind(
+ engine, execution_options
+ )
def execute(
self,
bind = self.get_bind(**bind_arguments)
- if self.autocommit:
- # legacy stuff, we can't use future_result w/ autocommit because
- # we rely upon close_with_result, also legacy. it's all
- # interrelated
- conn = self._connection_for_bind(bind, close_with_result=True)
- execution_options = execution_options.union(
- dict(future_result=False)
- )
- else:
- conn = self._connection_for_bind(bind)
+ conn = self._connection_for_bind(bind)
result = conn._execute_20(statement, params or {}, execution_options)
if compile_state_cls:
expire all state whenever the :meth:`Session.rollback`
or :meth:`Session.commit` methods are called, so that new
state can be loaded for the new transaction. For this reason,
- calling :meth:`Session.expire_all` should not be needed when
- autocommit is ``False``, assuming the transaction is isolated.
+ calling :meth:`Session.expire_all` is not usually needed,
+ assuming the transaction is isolated.
.. seealso::
You may flush() as often as you like within a transaction to move
changes from Python to the database's transaction buffer.
- For ``autocommit`` Sessions with no active manual transaction, flush()
- will create a transaction on the fly that surrounds the entire set of
- operations into the flush.
-
:param objects: Optional; restricts the flush operation to operate
only on elements that are in the given collection.
:meth:`_orm.Session.in_transaction`
"""
- if self.autocommit:
- return (
- self._transaction is not None and self._transaction.is_active
- )
- else:
- return self._transaction is None or self._transaction.is_active
+ return self._transaction is None or self._transaction.is_active
identity_map = None
"""A mapping of object identities to objects themselves.
bind=None,
class_=Session,
autoflush=True,
- autocommit=False,
expire_on_commit=True,
info=None,
**kw
objects. Defaults to :class:`.Session`.
:param autoflush: The autoflush setting to use with newly created
:class:`.Session` objects.
- :param autocommit: The autocommit setting to use with newly created
- :class:`.Session` objects.
:param expire_on_commit=True: the
:paramref:`_orm.Session.expire_on_commit` setting to use
with newly created :class:`.Session` objects.
"""
kw["bind"] = bind
kw["autoflush"] = autoflush
- kw["autocommit"] = autocommit
kw["expire_on_commit"] = expire_on_commit
if info is not None:
kw["info"] = info
else:
return ids
- sharded_session = sessionmaker(
- class_=ShardedSession, autoflush=True, autocommit=False
- )
+ sharded_session = sessionmaker(class_=ShardedSession, autoflush=True)
sharded_session.configure(
shards={
"north_america": db1,
)
self.mapper_registry.map_imperatively(Person, person)
- sess = fixture_session(autocommit=False, expire_on_commit=True)
+ sess = fixture_session(expire_on_commit=True)
p1 = Person()
sess.add(Ball(person=p1))
sess.commit()
from sqlalchemy import util
from sqlalchemy.engine import default
from sqlalchemy.engine import result_tuple
-from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import aliased
from sqlalchemy.orm import as_declarative
from sqlalchemy.orm import attributes
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.collections import collection
from sqlalchemy.orm.util import polymorphic_union
-from sqlalchemy.sql import elements
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import assertions
"arguments in SQLAlchemy 2.0."
)
-autocommit_dep = (
- "The Session.autocommit parameter is deprecated "
- "and will be removed in SQLAlchemy version 2.0."
-)
-
-subtransactions_dep = (
- "The Session.begin.subtransactions flag is deprecated "
- "and will be removed in SQLAlchemy version 2.0."
-)
opt_strings_dep = (
"Using strings to indicate column or relationship "
"paths in loader options"
s1 = Session(testing.db)
str(s1)
- def test_subtransactions_deprecated(self):
- s1 = Session(testing.db)
- s1.begin()
-
- with testing.expect_deprecated_20(subtransactions_dep):
- s1.begin(subtransactions=True)
-
- s1.close()
-
- def test_autocommit_deprecated(Self):
- with testing.expect_deprecated_20(autocommit_dep):
- Session(autocommit=True)
-
@testing.combinations(
{"mapper": None},
{"clause": None},
kw["clause"] = expr
eq_(m1.mock_calls, [call(**kw)])
- @testing.requires.independent_connections
- @testing.emits_warning(".*previous exception")
- def test_failed_rollback_deactivates_transaction_ctx_integration(self):
- # test #4050 in the same context as that of oslo.db
-
- User = self.classes.User
-
- with testing.expect_deprecated_20(
- "The Session.autocommit parameter is deprecated"
- ):
- session = Session(bind=testing.db, autocommit=True)
-
- evented_exceptions = []
- caught_exceptions = []
-
- def canary(context):
- evented_exceptions.append(context.original_exception)
-
- rollback_error = testing.db.dialect.dbapi.InterfaceError(
- "Can't roll back to savepoint"
- )
-
- def prevent_savepoint_rollback(
- cursor, statement, parameters, context=None
- ):
- if (
- context is not None
- and context.compiled
- and isinstance(
- context.compiled.statement,
- elements.RollbackToSavepointClause,
- )
- ):
- raise rollback_error
-
- self.event_listen(testing.db, "handle_error", canary, retval=True)
- self.event_listen(
- testing.db.dialect, "do_execute", prevent_savepoint_rollback
- )
-
- with session.begin():
- session.add(User(id=1, name="x"))
-
- try:
- with session.begin():
- try:
- with session.begin_nested():
- # raises IntegrityError on flush
- session.add(User(id=1, name="x"))
-
- # outermost is the failed SAVEPOINT rollback
- # from the "with session.begin_nested()"
- except sa_exc.DBAPIError as dbe_inner:
- caught_exceptions.append(dbe_inner.orig)
- raise
- except sa_exc.DBAPIError as dbe_outer:
- caught_exceptions.append(dbe_outer.orig)
-
- is_true(
- isinstance(
- evented_exceptions[0], testing.db.dialect.dbapi.IntegrityError
- )
- )
- eq_(evented_exceptions[1], rollback_error)
- eq_(len(evented_exceptions), 2)
- eq_(caught_exceptions, [rollback_error, rollback_error])
-
- def test_contextmanager_commit(self):
- User = self.classes.User
-
- with testing.expect_deprecated_20(
- "The Session.autocommit parameter is deprecated"
- ):
- sess = Session(testing.db, autocommit=True)
- with sess.begin():
- sess.add(User(name="u1"))
-
- sess.rollback()
- eq_(sess.query(User).count(), 1)
-
- def test_contextmanager_rollback(self):
- User = self.classes.User
-
- with testing.expect_deprecated_20(
- "The Session.autocommit parameter is deprecated"
- ):
- sess = Session(testing.db, autocommit=True)
-
- def go():
- with sess.begin():
- sess.add(User()) # name can't be null
-
- assert_raises(sa_exc.DBAPIError, go)
-
- eq_(sess.query(User).count(), 0)
-
- with sess.begin():
- sess.add(User(name="u1"))
- eq_(sess.query(User).count(), 1)
-
-
-class TransScopingTest(_fixtures.FixtureTest):
- run_inserts = None
- __prefer_requires__ = ("independent_connections",)
-
- @testing.combinations((True,), (False,), argnames="begin")
- @testing.combinations((True,), (False,), argnames="expire_on_commit")
- @testing.combinations((True,), (False,), argnames="modify_unconditional")
- @testing.combinations(
- ("nothing",), ("modify",), ("add",), ("delete",), argnames="case_"
- )
- def test_autobegin_attr_change(
- self, case_, begin, modify_unconditional, expire_on_commit
- ):
- """test :ticket:`6360`"""
-
- autocommit = True
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
-
- with testing.expect_deprecated_20(autocommit_dep):
- s = Session(
- testing.db,
- autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- )
-
- u = User(name="x")
- u2 = User(name="d")
- u3 = User(name="e")
- s.add_all([u, u2, u3])
-
- if autocommit:
- s.flush()
- else:
- s.commit()
-
- if begin:
- s.begin()
-
- if case_ == "add":
- # this autobegins
- s.add(User(name="q"))
- elif case_ == "delete":
- # this autobegins
- s.delete(u2)
- elif case_ == "modify":
- # this autobegins
- u3.name = "m"
-
- if case_ == "nothing" and not begin:
- assert not s._transaction
- expect_expire = expire_on_commit
- elif autocommit and not begin:
- assert not s._transaction
- expect_expire = expire_on_commit
- else:
- assert s._transaction
- expect_expire = True
-
- if modify_unconditional:
- # this autobegins
- u.name = "y"
- expect_expire = True
-
- if not expect_expire:
- assert not s._transaction
-
- # test is that state is consistent after rollback()
- s.rollback()
-
- if autocommit and not begin and modify_unconditional:
- eq_(u.name, "y")
- else:
- if not expect_expire:
- assert "name" in u.__dict__
- else:
- assert "name" not in u.__dict__
- eq_(u.name, "x")
-
- def test_no_autoflush_or_commit_in_expire_w_autocommit(self):
- """test second part of :ticket:`6233`.
-
- Here we test that the "autoflush on unexpire" feature added
- in :ticket:`5226` is turned off for a legacy autocommit session.
-
- """
-
- with testing.expect_deprecated_20(autocommit_dep):
- s = Session(
- testing.db,
- autocommit=True,
- expire_on_commit=True,
- autoflush=True,
- )
-
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
-
- u1 = User(name="u1")
- s.add(u1)
- s.flush() # this commits
-
- u1.name = "u2" # this does not commit
-
- assert "id" not in u1.__dict__
- u1.id # this unexpires
-
- # never expired
- eq_(u1.__dict__["name"], "u2")
-
- eq_(u1.name, "u2")
-
- # still in dirty collection
- assert u1 in s.dirty
-
-
-class AutocommitClosesOnFailTest(fixtures.MappedTest):
- __requires__ = ("deferrable_fks",)
-
- @classmethod
- def define_tables(cls, metadata):
- Table("t1", metadata, Column("id", Integer, primary_key=True))
-
- Table(
- "t2",
- metadata,
- Column("id", Integer, primary_key=True),
- Column(
- "t1_id",
- Integer,
- ForeignKey("t1.id", deferrable=True, initially="deferred"),
- ),
- )
-
- @classmethod
- def setup_classes(cls):
- class T1(cls.Comparable):
- pass
-
- class T2(cls.Comparable):
- pass
-
- @classmethod
- def setup_mappers(cls):
- T2, T1, t2, t1 = (
- cls.classes.T2,
- cls.classes.T1,
- cls.tables.t2,
- cls.tables.t1,
- )
-
- cls.mapper_registry.map_imperatively(T1, t1)
- cls.mapper_registry.map_imperatively(T2, t2)
-
- def test_close_transaction_on_commit_fail(self):
- T2 = self.classes.T2
-
- with testing.expect_deprecated_20(autocommit_dep):
- session = Session(testing.db, autocommit=True)
-
- # with a deferred constraint, this fails at COMMIT time instead
- # of at INSERT time.
- session.add(T2(id=1, t1_id=123))
-
- assert_raises(
- (sa.exc.IntegrityError, sa.exc.DatabaseError), session.flush
- )
-
- assert session._legacy_transaction() is None
-
class DeprecatedInhTest(_poly_fixtures._Polymorphic):
def test_with_polymorphic(self):
m.add_property("_name", deferred(users.c.name))
m.add_property("name", synonym("_name"))
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
assert sess.get(User, 7)
u = sess.query(User).filter_by(name="jack").one()
)
-class AutoCommitTest(_LocalFixture):
- __backend__ = True
-
- def test_begin_nested_requires_trans(self):
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True)
- assert_raises(sa_exc.InvalidRequestError, sess.begin_nested)
-
- def test_begin_preflush(self):
- User = self.classes.User
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True)
-
- u1 = User(name="ed")
- sess.add(u1)
-
- sess.begin()
- u2 = User(name="some other user")
- sess.add(u2)
- sess.rollback()
- assert u2 not in sess
- assert u1 in sess
- assert sess.query(User).filter_by(name="ed").one() is u1
-
- def test_accounting_commit_fails_add(self):
- User = self.classes.User
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True)
-
- fail = False
-
- def fail_fn(*arg, **kw):
- if fail:
- raise Exception("commit fails")
-
- event.listen(sess, "after_flush_postexec", fail_fn)
- u1 = User(name="ed")
- sess.add(u1)
-
- fail = True
- assert_raises(Exception, sess.flush)
- fail = False
-
- assert u1 not in sess
- u1new = User(id=2, name="fred")
- sess.add(u1new)
- sess.add(u1)
- sess.flush()
- assert u1 in sess
- eq_(
- sess.query(User.name).order_by(User.name).all(),
- [("ed",), ("fred",)],
- )
-
- def test_accounting_commit_fails_delete(self):
- User = self.classes.User
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True)
-
- fail = False
-
- def fail_fn(*arg, **kw):
- if fail:
- raise Exception("commit fails")
-
- event.listen(sess, "after_flush_postexec", fail_fn)
- u1 = User(name="ed")
- sess.add(u1)
- sess.flush()
-
- sess.delete(u1)
- fail = True
- assert_raises(Exception, sess.flush)
- fail = False
-
- assert u1 in sess
- assert u1 not in sess.deleted
- sess.delete(u1)
- sess.flush()
- assert u1 not in sess
- eq_(sess.query(User.name).order_by(User.name).all(), [])
-
- @testing.requires.updateable_autoincrement_pks
- def test_accounting_no_select_needed(self):
- """test that flush accounting works on non-expired instances
- when autocommit=True/expire_on_commit=True."""
-
- User = self.classes.User
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True, expire_on_commit=True)
-
- u1 = User(id=1, name="ed")
- sess.add(u1)
- sess.flush()
-
- u1.id = 3
- u1.name = "fred"
- self.assert_sql_count(testing.db, sess.flush, 1)
- assert "id" not in u1.__dict__
- eq_(u1.id, 3)
-
-
-class SessionStateTest(_fixtures.FixtureTest):
- run_inserts = None
-
- __prefer_requires__ = ("independent_connections",)
-
- def test_autocommit_doesnt_raise_on_pending(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
- with assertions.expect_deprecated_20(autocommit_dep):
- session = Session(testing.db, autocommit=True)
-
- session.add(User(name="ed"))
-
- session.begin()
- session.flush()
- session.commit()
-
-
-class SessionTransactionTest(fixtures.RemovesEvents, _fixtures.FixtureTest):
- run_inserts = None
- __backend__ = True
-
- @testing.fixture
- def conn(self):
- with testing.db.connect() as conn:
- yield conn
-
- @testing.fixture
- def future_conn(self):
-
- engine = Engine._future_facade(testing.db)
- with engine.connect() as conn:
- yield conn
-
- def test_deactive_status_check(self):
- sess = fixture_session()
- trans = sess.begin()
-
- with assertions.expect_deprecated_20(subtransactions_dep):
- trans2 = sess.begin(subtransactions=True)
- trans2.rollback()
- assert_raises_message(
- sa_exc.InvalidRequestError,
- "This session is in 'inactive' state, due to the SQL transaction "
- "being rolled back; no further SQL can be emitted within this "
- "transaction.",
- trans.commit,
- )
-
- def test_deactive_status_check_w_exception(self):
- sess = fixture_session()
- trans = sess.begin()
- with assertions.expect_deprecated_20(subtransactions_dep):
- trans2 = sess.begin(subtransactions=True)
- try:
- raise Exception("test")
- except Exception:
- trans2.rollback(_capture_exception=True)
- assert_raises_message(
- sa_exc.PendingRollbackError,
- r"This Session's transaction has been rolled back due to a "
- r"previous exception during flush. To begin a new transaction "
- r"with this Session, first issue Session.rollback\(\). "
- r"Original exception was: test",
- trans.commit,
- )
-
- def test_error_on_using_inactive_session_commands(self):
- users, User = self.tables.users, self.classes.User
-
- self.mapper_registry.map_imperatively(User, users)
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True)
- sess.begin()
- with assertions.expect_deprecated_20(subtransactions_dep):
- sess.begin(subtransactions=True)
- sess.add(User(name="u1"))
- sess.flush()
- sess.rollback()
- with assertions.expect_deprecated_20(subtransactions_dep):
- assert_raises_message(
- sa_exc.InvalidRequestError,
- "This session is in 'inactive' state, due to the SQL "
- "transaction "
- "being rolled back; no further SQL can be emitted within this "
- "transaction.",
- sess.begin,
- subtransactions=True,
- )
- sess.close()
-
- def test_subtransaction_on_external_subtrans(self, conn):
- users, User = self.tables.users, self.classes.User
-
- self.mapper_registry.map_imperatively(User, users)
-
- trans = conn.begin()
- sess = Session(bind=conn, autocommit=False, autoflush=True)
- with assertions.expect_deprecated_20(subtransactions_dep):
- sess.begin(subtransactions=True)
- u = User(name="ed")
- sess.add(u)
- sess.flush()
- sess.commit() # commit does nothing
- trans.rollback() # rolls back
- assert len(sess.query(User).all()) == 0
- sess.close()
-
- def test_subtransaction_on_noautocommit(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
- sess = fixture_session(autocommit=False, autoflush=True)
- with assertions.expect_deprecated_20(subtransactions_dep):
- sess.begin(subtransactions=True)
- u = User(name="u1")
- sess.add(u)
- sess.flush()
- sess.commit() # commit does nothing
- sess.rollback() # rolls back
- assert len(sess.query(User).all()) == 0
- sess.close()
-
- @testing.requires.savepoints
- def test_heavy_nesting(self):
- users = self.tables.users
-
- session = fixture_session()
- session.begin()
- session.connection().execute(users.insert().values(name="user1"))
- with assertions.expect_deprecated_20(subtransactions_dep):
- session.begin(subtransactions=True)
- session.begin_nested()
- session.connection().execute(users.insert().values(name="user2"))
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
- session.rollback()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 1
- )
- session.connection().execute(users.insert().values(name="user3"))
- session.commit()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
-
- @testing.requires.savepoints
- def test_heavy_nesting_future(self):
- users = self.tables.users
-
- from sqlalchemy.future import Engine
-
- engine = Engine._future_facade(testing.db)
- with Session(engine, autocommit=False) as session:
- session.begin()
- session.connection().execute(users.insert().values(name="user1"))
- with assertions.expect_deprecated_20(subtransactions_dep):
- session.begin(subtransactions=True)
- session.begin_nested()
- session.connection().execute(users.insert().values(name="user2"))
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
- session.rollback()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 1
- )
- session.connection().execute(users.insert().values(name="user3"))
- session.commit()
- assert (
- session.connection()
- .exec_driver_sql("select count(1) from users")
- .scalar()
- == 2
- )
-
- @testing.requires.savepoints
- def test_mixed_transaction_control(self):
- users, User = self.tables.users, self.classes.User
-
- self.mapper_registry.map_imperatively(User, users)
-
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True)
-
- sess.begin()
- sess.begin_nested()
- with assertions.expect_deprecated_20(subtransactions_dep):
- transaction = sess.begin(subtransactions=True)
-
- sess.add(User(name="u1"))
-
- transaction.commit()
- sess.commit()
- sess.commit()
-
- sess.close()
-
- eq_(len(sess.query(User).all()), 1)
-
- t1 = sess.begin()
- t2 = sess.begin_nested()
-
- sess.add(User(name="u2"))
-
- t2.commit()
- assert sess._legacy_transaction() is t1
-
- sess.close()
-
- @testing.requires.savepoints
- def test_nested_transaction_connection_add_autocommit(self):
- users, User = self.tables.users, self.classes.User
-
- self.mapper_registry.map_imperatively(User, users)
-
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = fixture_session(autocommit=True)
-
- sess.begin()
- sess.begin_nested()
-
- u1 = User(name="u1")
- sess.add(u1)
- sess.flush()
-
- sess.rollback()
-
- u2 = User(name="u2")
- sess.add(u2)
-
- sess.commit()
-
- eq_(set(sess.query(User).all()), set([u2]))
-
- sess.begin()
- sess.begin_nested()
-
- u3 = User(name="u3")
- sess.add(u3)
- sess.commit() # commit the nested transaction
- sess.rollback()
-
- eq_(set(sess.query(User).all()), set([u2]))
-
- sess.close()
-
- def test_active_flag_autocommit(self):
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = Session(bind=testing.db, autocommit=True)
- assert not sess.is_active
- sess.begin()
- assert sess.is_active
- sess.rollback()
- assert not sess.is_active
-
-
class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
run_inserts = None
- def _listener_fixture(self, **kw):
- canary = []
-
- def listener(name):
- def go(*arg, **kw):
- canary.append(name)
-
- return go
-
- sess = fixture_session(**kw)
-
- for evt in [
- "after_transaction_create",
- "after_transaction_end",
- "before_commit",
- "after_commit",
- "after_rollback",
- "after_soft_rollback",
- "before_flush",
- "after_flush",
- "after_flush_postexec",
- "after_begin",
- "before_attach",
- "after_attach",
- "after_bulk_update",
- "after_bulk_delete",
- ]:
- event.listen(sess, evt, listener(evt))
-
- return sess, canary
-
- def test_flush_autocommit_hook(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
-
- with assertions.expect_deprecated_20(autocommit_dep):
- sess, canary = self._listener_fixture(
- autoflush=False, autocommit=True, expire_on_commit=False
- )
-
- u = User(name="u1")
- sess.add(u)
- sess.flush()
- eq_(
- canary,
- [
- "before_attach",
- "after_attach",
- "before_flush",
- "after_transaction_create",
- "after_begin",
- "after_flush",
- "after_flush_postexec",
- "before_commit",
- "after_commit",
- "after_transaction_end",
- ],
- )
-
def test_on_bulk_update_hook(self):
User, users = self.classes.User, self.tables.users
self.assert_sql_count(testing.db, go, 1)
-class BindIntegrationTest(_fixtures.FixtureTest):
- run_inserts = None
-
- def test_bound_connection_transactional(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
-
- with testing.db.connect() as c:
- trans = c.begin()
-
- with assertions.expect_deprecated_20(autocommit_dep):
- sess = Session(bind=c, autocommit=True)
- u = User(name="u3")
- sess.add(u)
- sess.flush()
- assert c.in_transaction()
- trans.commit()
- assert not c.in_transaction()
- assert (
- c.exec_driver_sql("select count(1) from users").scalar() == 1
- )
-
-
class MergeResultTest(_fixtures.FixtureTest):
run_setup_mappers = "once"
run_inserts = "once"
User, Address = self._user_address_fixture(
addresses_args={"order_by": addresses.c.email_address}
)
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u1 = User(name="jack")
a1 = Address(email_address="a1")
a2 = Address(email_address="a2")
def test_rollback(self):
User, Address = self._user_address_fixture()
- sess = fixture_session(
- expire_on_commit=False, autocommit=False, autoflush=True
- )
+ sess = fixture_session(expire_on_commit=False, autoflush=True)
u1 = User(name="jack")
u1.addresses.append(Address(email_address="lala@hoho.com"))
sess.add(u1)
}
)
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = User(name="ed")
u.addresses.extend(
[Address(email_address=letter) for letter in "abcdef"]
}
)
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = User(name="ed")
u.addresses.extend(
[Address(email_address=letter) for letter in "abcdef"]
User, Address = self._user_address_fixture(
addresses_args={"backref": "user"}
)
- sess = fixture_session(autoflush=autoflush, autocommit=False)
+ sess = fixture_session(
+ autoflush=autoflush,
+ )
u = User(name="buffy")
):
opts.update(context.execution_options)
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
sess.query(User).first()
eq_(opts["my_option"], True)
users, User = self.tables.users, self.classes.User
self.mapper_registry.map_imperatively(User, users)
- s = fixture_session(autocommit=False)
+ s = fixture_session()
u = s.get(User, 10)
s.expire_all()
users, User = self.tables.users, self.classes.User
self.mapper_registry.map_imperatively(User, users)
- s = fixture_session(autocommit=False)
+ s = fixture_session()
u = s.get(User, 10)
s.expire_all()
users, User = self.tables.users, self.classes.User
self.mapper_registry.map_imperatively(User, users)
- s = fixture_session(autocommit=False)
+ s = fixture_session()
u = s.get(User, 10)
s.expire_all()
users, User = self.tables.users, self.classes.User
self.mapper_registry.map_imperatively(User, users)
- s = fixture_session(autocommit=False)
+ s = fixture_session()
u = s.get(User, 10)
s.expire_all()
s.execute(users.delete().where(User.id == 10))
self.mapper_registry.map_imperatively(
User, users, properties={"name": deferred(users.c.name)}
)
- s = fixture_session(autocommit=False)
+ s = fixture_session()
u = s.get(User, 10)
assert "name" not in u.__dict__
},
)
self.mapper_registry.map_imperatively(Address, addresses)
- s = fixture_session(autoflush=True, autocommit=False)
+ s = fixture_session(
+ autoflush=True,
+ )
u = s.get(User, 8)
adlist = u.addresses
eq_(
},
)
self.mapper_registry.map_imperatively(Address, addresses)
- s = fixture_session(autoflush=True, autocommit=False)
+ s = fixture_session(
+ autoflush=True,
+ )
u = s.get(User, 8)
assert_raises_message(
sa_exc.InvalidRequestError,
m.add_property("name", synonym("_name"))
m.add_property("addresses", relationship(Address))
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
assert sess.get(User, 7)
u = sess.query(User).filter_by(name="jack").one()
self.tables.users,
)
- s = fixture_session(autoflush=True, autocommit=False, future=True)
+ s = fixture_session(autoflush=True, future=True)
self.mapper_registry.map_imperatively(
User,
users,
self.tables.users,
)
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
self.mapper_registry.map_imperatively(
User,
users,
u = User(
id=7, name="fred", addresses=[Address(id=1, email_address="fred1")]
)
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
sess.add(u)
sess.commit()
self.mapper_registry.map_imperatively(User, users)
u = User(id=7)
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = sess.merge(u)
assert not bool(attributes.instance_state(u).expired_attributes)
def test_option_building(self):
User = self.classes.User
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
q1 = sess.query(User)
eq_(q1._execution_options, dict())
def test_get_options(self):
User = self.classes.User
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
q = sess.query(User).execution_options(foo="bar", stream_results=True)
eq_(q.get_execution_options(), dict(foo="bar", stream_results=True))
result.close()
return iter([])
- sess = fixture_session(autocommit=False, query_cls=TQuery)
+ sess = fixture_session(query_cls=TQuery)
q1 = sess.query(User).execution_options(**execution_options)
q1.all()
conn1 = testing.db.connect()
conn2 = testing.db.connect()
- sess = Session(autocommit=False, bind=conn1)
+ sess = Session(bind=conn1)
u = User(name="x")
sess.add(u)
sess.flush()
s4 = maker2(info={"s4": 8})
eq_(s4.info, {"s4": 8})
+ def test_autocommit_kw_accepted_but_must_be_false(self):
+ Session(autocommit=False)
+
+ with expect_raises_message(
+ sa.exc.ArgumentError, "autocommit=True is no longer supported"
+ ):
+ Session(autocommit=True)
+
@testing.requires.independent_connections
@engines.close_open_connections
def test_autoflush(self):
conn1 = bind.connect()
conn2 = bind.connect()
- sess = Session(bind=conn1, autocommit=False, autoflush=True)
+ sess = Session(bind=conn1, autoflush=True)
u = User()
u.name = "ed"
sess.add(u)
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
- with fixture_session(autocommit=False, autoflush=True) as sess:
+ with fixture_session(autoflush=True) as sess:
u = User()
u.name = "ed"
sess.add(u)
self.mapper_registry.map_imperatively(User, users)
conn1 = testing.db.connect()
- sess = Session(bind=conn1, autocommit=False, autoflush=True)
+ sess = Session(bind=conn1, autoflush=True)
u = User()
u.name = "ed"
sess.add(u)
sess.commit()
def test_active_flag_autobegin(self):
- sess = Session(bind=config.db, autocommit=False)
+ sess = Session(
+ bind=config.db,
+ )
assert sess.is_active
assert not sess.in_transaction()
sess.begin()
assert sess.is_active
def test_active_flag_partial_rollback(self):
- sess = Session(bind=config.db, autocommit=False)
+ sess = Session(
+ bind=config.db,
+ )
assert sess.is_active
assert not sess.in_transaction()
sess.begin()
"""
User, Address = self.classes("User", "Address")
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = User(name="ed", addresses=[Address(email_address="foo")])
sess.add(u)
eq_(
def test_deferred_expression_obj_was_gced(self):
User, Address = self.classes("User", "Address")
- sess = fixture_session(
- autoflush=True, autocommit=False, expire_on_commit=False
- )
+ sess = fixture_session(autoflush=True, expire_on_commit=False)
u = User(name="ed", addresses=[Address(email_address="foo")])
sess.add(u)
User, Address = self.classes("User", "Address")
- sess = fixture_session(
- autoflush=True, autocommit=False, expire_on_commit=False
- )
+ sess = fixture_session(autoflush=True, expire_on_commit=False)
u = User(name="ed", addresses=[Address(email_address="foo")])
sess.add(u)
sess.commit()
def test_deferred_expression_obj_was_never_flushed(self):
User, Address = self.classes("User", "Address")
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = User(name="ed", addresses=[Address(email_address="foo")])
assert_raises_message(
def test_deferred_expression_unflushed_obj_became_detached_unexpired(self):
User, Address = self.classes("User", "Address")
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = User(name="ed", addresses=[Address(email_address="foo")])
q = sess.query(Address).filter(Address.user == u)
def test_deferred_expression_unflushed_obj_became_detached_expired(self):
User, Address = self.classes("User", "Address")
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = User(name="ed", addresses=[Address(email_address="foo")])
q = sess.query(Address).filter(Address.user == u)
def test_deferred_expr_unflushed_obj_became_detached_expired_by_key(self):
User, Address = self.classes("User", "Address")
- sess = fixture_session(autoflush=True, autocommit=False)
+ sess = fixture_session(
+ autoflush=True,
+ )
u = User(name="ed", addresses=[Address(email_address="foo")])
q = sess.query(Address).filter(Address.user == u)
def test_deferred_expression_expired_obj_became_detached_expired(self):
User, Address = self.classes("User", "Address")
- sess = fixture_session(
- autoflush=True, autocommit=False, expire_on_commit=True
- )
+ sess = fixture_session(autoflush=True, expire_on_commit=True)
u = User(name="ed", addresses=[Address(email_address="foo")])
sess.add(u)
User, users, properties={"addresses": relationship(Address)}
)
- sess = fixture_session(autocommit=False, autoflush=True)
+ sess = fixture_session(autoflush=True)
u = sess.get(User, 8)
newad = Address(email_address="a new address")
u.addresses.append(newad)
self._test_session().expire_all()
def test_rollback(self):
- sess = self._test_session(autocommit=False, expire_on_commit=True)
+ sess = self._test_session(expire_on_commit=True)
sess.commit()
sess.rollback()
self.mapper_registry.map_imperatively(User, users)
trans = conn.begin()
- sess = Session(bind=conn, autocommit=False, autoflush=True)
+ sess = Session(bind=conn, autoflush=True)
u = User(name="ed")
sess.add(u)
sess.flush()
self.mapper_registry.map_imperatively(User, users)
trans = conn.begin()
- sess = Session(bind=conn, autocommit=False, autoflush=True)
+ sess = Session(bind=conn, autoflush=True)
u1 = User(name="u1")
sess.add(u1)
sess.flush()
conn = future_conn
conn.begin()
- sess = Session(bind=conn, autocommit=False, autoflush=True)
+ sess = Session(bind=conn, autoflush=True)
u = User(name="ed")
sess.add(u)
sess.flush()
conn = future_conn
conn.begin()
- sess = Session(bind=conn, autocommit=False, autoflush=True)
+ sess = Session(bind=conn, autoflush=True)
u = User(name="ed")
sess.add(u)
sess.flush()
conn = future_conn
conn.begin()
- sess = Session(bind=conn, autocommit=False, autoflush=True)
+ sess = Session(bind=conn, autoflush=True)
u1 = User(name="u1")
sess.add(u1)
sess.flush()
self.mapper_registry.map_imperatively(User, users)
- s1 = fixture_session(autocommit=False)
- s2 = fixture_session(autocommit=False)
+ s1 = fixture_session()
+ s2 = fixture_session()
u1 = User(name="u1")
s1.add(u1)
s1.flush()
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
u = User(name="u1")
sess.add(u)
sess.flush()
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
- sess = fixture_session(autocommit=False, future=True)
+ sess = fixture_session(future=True)
u = User(name="u1")
sess.add(u)
sess.flush()
self.mapper_registry.map_imperatively(User, users)
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
sess.begin_nested()
)
def test_no_sql_during_commit(self):
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
@event.listens_for(sess, "after_commit")
def go(session):
)
def test_no_sql_during_prepare(self):
- sess = fixture_session(autocommit=False, twophase=True)
+ sess = fixture_session(twophase=True)
sess.prepare()
)
def test_no_sql_during_rollback(self):
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
sess.connection()
eq_(session.is_active, True)
def test_no_prepare_wo_twophase(self):
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
assert_raises_message(
sa_exc.InvalidRequestError,
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
- session = fixture_session(autocommit=False)
+ session = fixture_session()
session.add(User(name="ed"))
session._legacy_transaction().commit()
User, users = self.classes.User, self.tables.users
self.mapper_registry.map_imperatively(User, users)
- session = fixture_session(autocommit=False, future=True)
+ session = fixture_session(future=True)
session.add(User(name="ed"))
session._legacy_transaction().commit()
),
)
- session = fixture_session(autocommit=False)
+ session = fixture_session()
u = User(name="u1")
a = Address(email_address="u1@e")
t1 = Test(id=1, txt=txt)
self.assert_(t1.txt == txt)
- session = fixture_session(autocommit=False)
+ session = fixture_session()
session.add(t1)
session.commit()
t1 = Test(txt=txt)
t1.t2s.append(Test2())
t1.t2s.append(Test2())
- session = fixture_session(autocommit=False, expire_on_commit=False)
+ session = fixture_session(expire_on_commit=False)
session.add(t1)
session.commit()
session.close()
h4 = Hoho()
h5 = Hoho(foober="im the new foober")
- session = fixture_session(autocommit=False, expire_on_commit=False)
+ session = fixture_session(expire_on_commit=False)
session.add_all((h1, h2, h3, h4, h5))
session.commit()
# don't set deferred attribute, commit session
o = Order(id=42)
- session = fixture_session(autocommit=False)
+ session = fixture_session()
session.add(o)
session.commit()
def test_switch_on_update(self):
Parent, Child = self._fixture()
- sess = fixture_session(autocommit=False)
+ sess = fixture_session()
p1 = Parent(id=1, child=Child())
sess.add(p1)
):
s1.commit()
- s2 = fixture_session(autocommit=False)
+ s2 = fixture_session()
f1_s = s2.get(Foo, f1.id)
f1_s.value = "f1rev3"
with conditional_sane_rowcount_warnings(
s1.add(f1s1)
s1.commit()
- s2 = fixture_session(autocommit=False)
+ s2 = fixture_session()
f1s2 = s2.get(Foo, f1s1.id)
f1s2.value = "f1 new value"
with conditional_sane_rowcount_warnings(
s1.add(f1s1)
s1.commit()
- s2 = fixture_session(autocommit=False)
+ s2 = fixture_session()
f1s2 = s2.get(Foo, f1s1.id)
# not sure if I like this API
s2.refresh(f1s2, with_for_update=True)
Foo, version_table = self.classes.Foo, self.tables.version_table
- s1 = fixture_session(autocommit=False)
+ s1 = fixture_session()
self.mapper_registry.map_imperatively(Foo, version_table)
f1s1 = Foo(value="foo", version_id=0)
s1.add(f1s1)
s1.commit()
- s2 = fixture_session(autocommit=False)
+ s2 = fixture_session()
f1s2 = (
s2.query(Foo)
.with_for_update(read=True)