Key Behavioral Changes - ORM
=============================
+.. _change_4246:
+
+FOR UPDATE clause is rendered within the joined eager load subquery as well as outside
+--------------------------------------------------------------------------------------
+
+This change applies specifically to the use of the :func:`.joinedload` loading
+strategy in conjunction with a row limited query, e.g. using :meth:`.Query.first`
+or :meth:`.Query.limit`, as well as with use of the :class:`.Query.with_for_update` method.
+
+Given a query as::
+
+ session.query(A).options(joinedload(A.b)).limit(5)
+
+The :class:`.Query` object renders a SELECT of the following form when joined
+eager loading is combined with LIMIT::
+
+ SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+ SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5
+ ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id
+
+This is so that the limit of rows takes place for the primary entity without
+affecting the joined eager load of related items. When the above query is
+combined with "SELECT..FOR UPDATE", the behavior has been this::
+
+ SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+ SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5
+ ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id FOR UPDATE
+
+However, MySQL due to https://bugs.mysql.com/bug.php?id=90693 does not lock
+the rows inside the subquery, unlike that of Postgresql and other databases.
+So the above query now renders as::
+
+ SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+ SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 FOR UPDATE
+ ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id FOR UPDATE
+
+On the Oracle dialect, the inner "FOR UPDATE" is not rendered as Oracle does
+not support this syntax and the dialect skips any "FOR UPDATE" that is against
+a subquery; it isn't necessary in any case since Oracle, like Postgresql,
+correctly locks all elements of the returned row.
+
+When using the :paramref:`.Query.with_for_update.of` modifier, typically on
+Postgresql, the outer "FOR UPDATE" is omitted, and the OF is now rendered
+on the inside; previously, the OF target would not be converted to accommodate
+for the subquery correctly. So
+given::
+
+ session.query(A).options(joinedload(A.b)).with_for_update(of=A).limit(5)
+
+The query would now render as::
+
+ SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+ SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 FOR UPDATE OF a
+ ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id
+
+The above form should be helpful on Postgresql additionally since Postgresql
+will not allow the FOR UPDATE clause to be rendered after the LEFT OUTER JOIN
+target.
+
+Overall, FOR UPDATE remains highly specific to the target database in use
+and can't easily be generalized for more complex queries.
+
+:ticket:`4246`
+
New Features and Improvements - Core
====================================
--- /dev/null
+.. change::
+ :tags: bug, orm, mysql, postgresql
+ :tickets: 4246
+
+ The ORM now doubles the "FOR UPDATE" clause within the subquery that
+ renders in conjunction with joined eager loading in some cases, as it has
+ been observed that MySQL does not lock the rows from a subquery. This
+ means the query renders with two FOR UPDATE clauses; note that on some
+ backends such as Oracle, FOR UPDATE clauses on subqueries are silently
+ ignored since they are unnecessary. Additionally, in the case of the "OF"
+ clause used primarily with Postgresql, the FOR UPDATE is rendered only on
+ the inner subquery when this is used so that the selectable can be targeted
+ to the table within the SELECT statement.
+
+ .. seealso::
+
+ :ref:`change_4246`
\ No newline at end of file
order_by=context.order_by,
**self._select_args
)
+ # put FOR UPDATE on the inner query, where MySQL will honor it,
+ # as well as if it has an OF so Postgresql can use it.
+ inner._for_update_arg = context._for_update_arg
for hint in self._with_hints:
inner = inner.with_hint(*hint)
[inner] + context.secondary_columns,
use_labels=context.labels)
- statement._for_update_arg = context._for_update_arg
+ # Oracle however does not allow FOR UPDATE on the subquery,
+ # and the Oracle dialect ignores it, plus for Postgresql, MySQL
+ # we expect that all elements of the row are locked, so also put it
+ # on the outside (except in the case of PG when OF is used)
+ if context._for_update_arg is not None and \
+ context._for_update_arg.of is None:
+ statement._for_update_arg = context._for_update_arg
from_clause = inner
for eager_join in context.eager_joins.values():
--- /dev/null
+"""Test MySQL FOR UPDATE behavior.
+
+See #4246
+
+"""
+import contextlib
+
+from sqlalchemy import Column, Integer, ForeignKey, update
+from sqlalchemy.orm import relationship, Session, joinedload
+from sqlalchemy import exc
+
+from sqlalchemy.testing import fixtures
+from sqlalchemy import testing
+
+
+class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
+ __backend__ = True
+ __only_on__ = 'mysql'
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = 'a'
+ id = Column(Integer, primary_key=True)
+ x = Column(Integer)
+ y = Column(Integer)
+ bs = relationship("B")
+ __table_args__ = {"mysql_engine": "InnoDB"}
+
+ class B(Base):
+ __tablename__ = 'b'
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey('a.id'))
+ x = Column(Integer)
+ y = Column(Integer)
+ __table_args__ = {"mysql_engine": "InnoDB"}
+
+ @classmethod
+ def insert_data(cls):
+ A = cls.classes.A
+ B = cls.classes.B
+
+ # all the x/y are < 10
+ s = Session()
+ s.add_all(
+ [
+ A(x=5, y=5, bs=[B(x=4, y=4), B(x=2, y=8), B(x=7, y=1)]),
+ A(x=7, y=5, bs=[B(x=4, y=4), B(x=5, y=8)])
+ ]
+ )
+ s.commit()
+
+ @contextlib.contextmanager
+ def run_test(self):
+ connection = testing.db.connect()
+ connection.execute("set innodb_lock_wait_timeout=1")
+ main_trans = connection.begin()
+ try:
+ yield Session(bind=connection)
+ finally:
+ main_trans.rollback()
+ connection.close()
+
+ def _assert_a_is_locked(self, should_be_locked):
+ A = self.classes.A
+ with testing.db.begin() as alt_trans:
+ alt_trans.execute("set innodb_lock_wait_timeout=1")
+ # set x/y > 10
+ try:
+ alt_trans.execute(
+ update(A).values(x=15, y=19)
+ )
+ except (exc.InternalError, exc.OperationalError) as err:
+ assert "Lock wait timeout exceeded" in str(err)
+ assert should_be_locked
+ else:
+ assert not should_be_locked
+
+ def _assert_b_is_locked(self, should_be_locked):
+ B = self.classes.B
+ with testing.db.begin() as alt_trans:
+ alt_trans.execute("set innodb_lock_wait_timeout=1")
+ # set x/y > 10
+ try:
+ alt_trans.execute(
+ update(B).values(x=15, y=19)
+ )
+ except (exc.InternalError, exc.OperationalError) as err:
+ assert "Lock wait timeout exceeded" in str(err)
+ assert should_be_locked
+ else:
+ assert not should_be_locked
+
+ def test_basic_lock(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).with_for_update().all()
+ # test our fixture
+ self._assert_a_is_locked(True)
+
+ def test_basic_not_lock(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).all()
+ # test our fixture
+ self._assert_a_is_locked(False)
+
+ def test_joined_lock_subquery(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).options(joinedload(A.bs)).with_for_update().first()
+
+ # test for issue #4246, should be locked
+ self._assert_a_is_locked(True)
+ self._assert_b_is_locked(True)
+
+ def test_joined_lock_subquery_inner_for_update(self):
+ A = self.classes.A
+ B = self.classes.B
+ with self.run_test() as s:
+ q = s.query(A).with_for_update().subquery()
+ s.query(q).join(B).all()
+
+ # FOR UPDATE is inside the subquery, should be locked
+ self._assert_a_is_locked(True)
+
+ # FOR UPDATE is inside the subquery, B is not locked
+ self._assert_b_is_locked(False)
+
+ def test_joined_lock_subquery_inner_for_update_outer(self):
+ A = self.classes.A
+ B = self.classes.B
+ with self.run_test() as s:
+ q = s.query(A).with_for_update().subquery()
+ s.query(q).join(B).with_for_update().all()
+
+ # FOR UPDATE is inside the subquery, should be locked
+ self._assert_a_is_locked(True)
+
+ # FOR UPDATE is also outside the subquery, B is locked
+ self._assert_b_is_locked(True)
+
+ def test_joined_lock_subquery_order_for_update_outer(self):
+ A = self.classes.A
+ B = self.classes.B
+ with self.run_test() as s:
+ q = s.query(A).order_by(A.id).subquery()
+ s.query(q).join(B).with_for_update().all()
+ # FOR UPDATE is inside the subquery, should not be locked
+ self._assert_a_is_locked(False)
+ self._assert_b_is_locked(True)
+
+ def test_joined_lock_no_subquery(self):
+ A = self.classes.A
+ with self.run_test() as s:
+ s.query(A).options(joinedload(A.bs)).with_for_update().all()
+ # no subquery, should be locked
+ self._assert_a_is_locked(True)
+ self._assert_b_is_locked(True)
from sqlalchemy.databases import *
from sqlalchemy.orm import mapper
from sqlalchemy.orm import Session
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import relationship
from sqlalchemy.testing import AssertsCompiledSQL, eq_
from sqlalchemy.testing import assert_raises_message
from sqlalchemy import exc
from test.orm import _fixtures
+from sqlalchemy import testing
class LegacyLockModeTest(_fixtures.FixtureTest):
)
+class BackendTest(_fixtures.FixtureTest):
+ __backend__ = True
+
+ # test against the major backends. We are naming specific databases
+ # here rather than using requirements rules since the behavior of
+ # "FOR UPDATE" as well as "OF" is very specific to each DB, and we need
+ # to run the query differently based on backend.
+
+ @classmethod
+ def setup_mappers(cls):
+ User, users = cls.classes.User, cls.tables.users
+ Address, addresses = cls.classes.Address, cls.tables.addresses
+ mapper(User, users, properties={
+ "addresses": relationship(Address)
+ })
+ mapper(Address, addresses)
+
+ def test_inner_joinedload_w_limit(self):
+ User = self.classes.User
+ sess = Session()
+ q = sess.query(User).options(
+ joinedload(User.addresses, innerjoin=True)
+ ).with_for_update().limit(1)
+
+ if testing.against("oracle"):
+ assert_raises_message(
+ exc.DatabaseError,
+ "ORA-02014",
+ q.all
+ )
+ else:
+ q.all()
+ sess.close()
+
+ def test_inner_joinedload_wo_limit(self):
+ User = self.classes.User
+ sess = Session()
+ sess.query(User).options(
+ joinedload(User.addresses, innerjoin=True)
+ ).with_for_update().all()
+ sess.close()
+
+ def test_outer_joinedload_w_limit(self):
+ User = self.classes.User
+ sess = Session()
+ q = sess.query(User).options(
+ joinedload(User.addresses, innerjoin=False)
+ )
+
+ if testing.against("postgresql"):
+ q = q.with_for_update(of=User)
+ else:
+ q = q.with_for_update()
+
+ q = q.limit(1)
+
+ if testing.against("oracle"):
+ assert_raises_message(
+ exc.DatabaseError,
+ "ORA-02014",
+ q.all
+ )
+ else:
+ q.all()
+ sess.close()
+
+ def test_outer_joinedload_wo_limit(self):
+ User = self.classes.User
+ sess = Session()
+ q = sess.query(User).options(
+ joinedload(User.addresses, innerjoin=False)
+ )
+
+ if testing.against("postgresql"):
+ q = q.with_for_update(of=User)
+ else:
+ q = q.with_for_update()
+
+ q.all()
+ sess.close()
+
+ def test_join_w_subquery(self):
+ User = self.classes.User
+ Address = self.classes.Address
+ sess = Session()
+ q1 = sess.query(User).with_for_update().subquery()
+ sess.query(q1).join(Address).all()
+ sess.close()
+
+ def test_plain(self):
+ User = self.classes.User
+ sess = Session()
+ sess.query(User).with_for_update().all()
+ sess.close()
+
+
class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL):
"""run some compile tests, even though these are redundant."""
run_inserts = None
def setup_mappers(cls):
User, users = cls.classes.User, cls.tables.users
Address, addresses = cls.classes.Address, cls.tables.addresses
- mapper(User, users)
+ mapper(User, users, properties={
+ "addresses": relationship(Address)
+ })
mapper(Address, addresses)
def test_default_update(self):
.with_for_update(of=[User.id, User.id, User.id]),
"SELECT users.id AS users_id FROM users FOR UPDATE OF users",
dialect="postgresql"
- )
+ )
def test_postgres_update_skip_locked(self):
User = self.classes.User
"SELECT users.id AS users_id FROM users LOCK IN SHARE MODE",
dialect="mysql"
)
+
+ def test_for_update_on_inner_w_joinedload(self):
+ User = self.classes.User
+ sess = Session()
+ self.assert_compile(
+ sess.query(User).options(
+ joinedload(User.addresses)).with_for_update().limit(1),
+ "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name "
+ "AS anon_1_users_name, addresses_1.id AS addresses_1_id, "
+ "addresses_1.user_id AS addresses_1_user_id, "
+ "addresses_1.email_address AS addresses_1_email_address "
+ "FROM (SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users LIMIT %s FOR UPDATE) AS anon_1 "
+ "LEFT OUTER JOIN addresses AS addresses_1 "
+ "ON anon_1.users_id = addresses_1.user_id FOR UPDATE",
+ dialect="mysql"
+ )
+
+ def test_for_update_on_inner_w_joinedload_no_render_oracle(self):
+ User = self.classes.User
+ sess = Session()
+ self.assert_compile(
+ sess.query(User).options(
+ joinedload(User.addresses)).with_for_update().limit(1),
+ "SELECT anon_1.users_id AS anon_1_users_id, "
+ "anon_1.users_name AS anon_1_users_name, "
+ "addresses_1.id AS addresses_1_id, "
+ "addresses_1.user_id AS addresses_1_user_id, "
+ "addresses_1.email_address AS addresses_1_email_addres_1 "
+ "FROM (SELECT users_id, users_name FROM "
+ "(SELECT users.id AS users_id, users.name AS users_name "
+ "FROM users) WHERE ROWNUM <= :param_1) anon_1 "
+ "LEFT OUTER JOIN addresses addresses_1 "
+ "ON anon_1.users_id = addresses_1.user_id FOR UPDATE",
+ dialect="oracle"
+ )