--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 5984
+
+ The unit of work process now turns off all "lazy='raise'" behavior
+ altogether when a flush is proceeding. While there are areas where the UOW
+ is sometimes loading things that aren't ultimately needed, the lazy="raise"
+ strategy is not helpful here as the user often does not have much control
+ or visibility into the flush process.
+
column-oriented attributes, the :func:`.defer` option supports the
:paramref:`.orm.defer.raiseload` option which works in the same way.
+.. versionchanged:: 1.4.0 The "raiseload" strategies **do not take place**
+ within the unit of work flush process, as of SQLAlchemy 1.4.0. This means
+ that if the unit of work needs to load a particular attribute in order to
+ complete its work, it will perform the load. It's not always easy to prevent
+ a particular relationship load from occurring within the UOW process
+ particularly with less common kinds of relationships. The lazy="raise" case
+ is more intended for explicit attribute access within the application space.
+
.. seealso::
:ref:`wildcard_loader_strategies`
if not isdelete or self.passive_deletes:
passive = attributes.PASSIVE_NO_INITIALIZE
elif self.direction is MANYTOONE:
+ # here, we were hoping to optimize having to fetch many-to-one
+ # for history and ignore it, if there's no further cascades
+ # to take place. however there are too many less common conditions
+ # that still take place and tests in test_relationships /
+ # test_cascade etc. will still fail.
passive = attributes.PASSIVE_NO_FETCH_RELATED
else:
passive = attributes.PASSIVE_OFF
history = impl.get_history(
state,
state.dict,
- attributes.PASSIVE_OFF | attributes.LOAD_AGAINST_COMMITTED,
+ attributes.PASSIVE_OFF
+ | attributes.LOAD_AGAINST_COMMITTED
+ | attributes.NO_RAISE,
)
if history and impl.uses_objects:
state_history = history.as_state()
# TODO: store the history as (state, object) tuples
# so we don't have to keep converting here
history = impl.get_history(
- state, state.dict, passive | attributes.LOAD_AGAINST_COMMITTED
+ state,
+ state.dict,
+ passive
+ | attributes.LOAD_AGAINST_COMMITTED
+ | attributes.NO_RAISE,
)
if history and impl.uses_objects:
state_history = history.as_state()
from sqlalchemy.orm import Session
from sqlalchemy.orm import util as orm_util
from sqlalchemy.orm.attributes import instance_state
+from sqlalchemy.orm.decl_api import declarative_base
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import not_in
+from sqlalchemy.testing.assertsql import CompiledSQL
from sqlalchemy.testing.fixtures import fixture_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
u1.address = a2
+class M2OwNoUseGetCascadeTest(
+ testing.AssertsExecutionResults, fixtures.TestBase
+):
+ @testing.fixture
+ def fixture(self, metadata):
+ Base = declarative_base(metadata=metadata)
+
+ def go(lazy="select", cascade="save-update"):
+ class A(Base):
+ __tablename__ = "a"
+
+ id = Column(Integer, primary_key=True)
+ email = Column(String(50), unique=True)
+
+ bs = relationship(
+ "B",
+ back_populates="user",
+ primaryjoin="A.email == B.email",
+ )
+
+ class B(Base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ email = Column(String(50), ForeignKey("a.email"))
+
+ user = relationship(
+ "A",
+ lazy=lazy,
+ cascade=cascade,
+ single_parent=True,
+ back_populates="bs",
+ primaryjoin="A.email == B.email",
+ )
+
+ Base.metadata.create_all(testing.db)
+ return A, B
+
+ yield go
+ Base.registry.dispose()
+
+ def test_cascade_deletes_user(self, fixture):
+ A, B = fixture(cascade="all, delete-orphan")
+
+ sess = fixture_session()
+
+ a1 = A(email="x")
+ b1 = B(user=a1)
+ sess.add_all([a1, b1])
+ sess.commit()
+
+ b1 = sess.execute(select(B)).scalars().first()
+
+ sess.delete(b1)
+
+ self.assert_sql_execution(
+ testing.db,
+ sess.flush,
+ # looking for other bs'
+ CompiledSQL(
+ "SELECT b.id AS b_id, b.email AS b_email "
+ "FROM b WHERE :param_1 = b.email",
+ lambda ctx: [{"param_1": "x"}],
+ ),
+ CompiledSQL(
+ "DELETE FROM b WHERE b.id = :id", lambda ctx: [{"id": 1}]
+ ),
+ CompiledSQL(
+ "DELETE FROM a WHERE a.id = :id", lambda ctx: [{"id": 1}]
+ ),
+ )
+
+ @testing.combinations(("select",), ("raise",), argnames="lazy")
+ def test_ignores_user(self, fixture, lazy):
+ A, B = fixture()
+
+ sess = fixture_session()
+
+ a1 = A(email="x")
+ b1 = B(user=a1)
+ sess.add_all([a1, b1])
+ sess.commit()
+
+ b1 = sess.execute(select(B)).scalars().first()
+
+ sess.delete(b1)
+
+ self.assert_sql_execution(
+ testing.db,
+ sess.flush,
+ # we would like it to be able to skip this SELECT but this is not
+ # implemented right now
+ CompiledSQL(
+ "SELECT a.id AS a_id, a.email AS a_email FROM a "
+ "WHERE a.email = :param_1",
+ [{"param_1": "x"}],
+ ),
+ CompiledSQL(
+ "DELETE FROM b WHERE b.id = :id", lambda ctx: [{"id": 1}]
+ ),
+ )
+
+
class NoSaveCascadeFlushTest(_fixtures.FixtureTest):
"""Test related item not present in session, commit proceeds."""
self._assert_uow_size(sess, 6)
+class RaiseLoadIgnoredTest(
+ fixtures.DeclarativeMappedTest,
+ testing.AssertsExecutionResults,
+):
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = "a"
+
+ id = Column(Integer, primary_key=True)
+
+ bs = relationship("B", back_populates="user", lazy="raise")
+
+ class B(Base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey("a.id"))
+ user = relationship("A", back_populates="bs", lazy="raise")
+
+ def test_delete_head(self):
+ A, B = self.classes("A", "B")
+
+ sess = fixture_session()
+
+ sess.add(A(bs=[B(), B()]))
+ sess.commit()
+
+ a1 = sess.execute(select(A)).scalars().first()
+
+ sess.delete(a1)
+
+ self.assert_sql_execution(
+ testing.db,
+ sess.flush,
+ # for the flush process, lazy="raise" is ignored
+ CompiledSQL(
+ "SELECT b.id AS b_id, b.a_id AS b_a_id FROM b "
+ "WHERE :param_1 = b.a_id",
+ [{"param_1": 1}],
+ ),
+ CompiledSQL(
+ "UPDATE b SET a_id=:a_id WHERE b.id = :b_id",
+ [{"a_id": None, "b_id": 1}, {"a_id": None, "b_id": 2}],
+ ),
+ CompiledSQL("DELETE FROM a WHERE a.id = :id", [{"id": 1}]),
+ )
+
+
class SingleCycleTest(UOWTest):
def teardown_test(self):
engines.testing_reaper.rollback_all()