From 453cdfd75688e869637f0bbd171594e1fe7b6a39 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 30 Apr 2018 11:31:48 -0400 Subject: [PATCH] Render FOR UPDATE on the inner subquery as well as the outer The ORM now doubles the "FOR UPDATE" clause within the subquery that renders in conjunction with joined eager loading in some cases, as it has been observed that MySQL does not lock the rows from a subquery. This means the query renders with two FOR UPDATE clauses; note that on some backends such as Oracle, FOR UPDATE clauses on subqueries are silently ignored since they are unnecessary. Additionally, in the case of the "OF" clause used primarily with Postgresql, the FOR UPDATE is rendered only on the inner subquery when this is used so that the selectable can be targeted to the table within the SELECT statement. Change-Id: Ie5520d08d82bf0afd9e1bd2d43a0b2a0db0de16d Fixes: #4246 --- doc/build/changelog/migration_13.rst | 64 ++++++++ doc/build/changelog/unreleased_13/4246.rst | 17 +++ lib/sqlalchemy/orm/query.py | 11 +- test/dialect/mysql/test_for_update.py | 161 +++++++++++++++++++++ test/orm/test_lockmode.py | 141 +++++++++++++++++- 5 files changed, 391 insertions(+), 3 deletions(-) create mode 100644 doc/build/changelog/unreleased_13/4246.rst create mode 100644 test/dialect/mysql/test_for_update.py diff --git a/doc/build/changelog/migration_13.rst b/doc/build/changelog/migration_13.rst index 3786067b3f..71ef4d0113 100644 --- a/doc/build/changelog/migration_13.rst +++ b/doc/build/changelog/migration_13.rst @@ -23,6 +23,70 @@ New Features and Improvements - ORM Key Behavioral Changes - ORM ============================= +.. _change_4246: + +FOR UPDATE clause is rendered within the joined eager load subquery as well as outside +-------------------------------------------------------------------------------------- + +This change applies specifically to the use of the :func:`.joinedload` loading +strategy in conjunction with a row limited query, e.g. using :meth:`.Query.first` +or :meth:`.Query.limit`, as well as with use of the :class:`.Query.with_for_update` method. + +Given a query as:: + + session.query(A).options(joinedload(A.b)).limit(5) + +The :class:`.Query` object renders a SELECT of the following form when joined +eager loading is combined with LIMIT:: + + SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM ( + SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 + ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id + +This is so that the limit of rows takes place for the primary entity without +affecting the joined eager load of related items. When the above query is +combined with "SELECT..FOR UPDATE", the behavior has been this:: + + SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM ( + SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 + ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id FOR UPDATE + +However, MySQL due to https://bugs.mysql.com/bug.php?id=90693 does not lock +the rows inside the subquery, unlike that of Postgresql and other databases. +So the above query now renders as:: + + SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM ( + SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 FOR UPDATE + ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id FOR UPDATE + +On the Oracle dialect, the inner "FOR UPDATE" is not rendered as Oracle does +not support this syntax and the dialect skips any "FOR UPDATE" that is against +a subquery; it isn't necessary in any case since Oracle, like Postgresql, +correctly locks all elements of the returned row. + +When using the :paramref:`.Query.with_for_update.of` modifier, typically on +Postgresql, the outer "FOR UPDATE" is omitted, and the OF is now rendered +on the inside; previously, the OF target would not be converted to accommodate +for the subquery correctly. So +given:: + + session.query(A).options(joinedload(A.b)).with_for_update(of=A).limit(5) + +The query would now render as:: + + SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM ( + SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 FOR UPDATE OF a + ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id + +The above form should be helpful on Postgresql additionally since Postgresql +will not allow the FOR UPDATE clause to be rendered after the LEFT OUTER JOIN +target. + +Overall, FOR UPDATE remains highly specific to the target database in use +and can't easily be generalized for more complex queries. + +:ticket:`4246` + New Features and Improvements - Core ==================================== diff --git a/doc/build/changelog/unreleased_13/4246.rst b/doc/build/changelog/unreleased_13/4246.rst new file mode 100644 index 0000000000..0b47c9e5ca --- /dev/null +++ b/doc/build/changelog/unreleased_13/4246.rst @@ -0,0 +1,17 @@ +.. change:: + :tags: bug, orm, mysql, postgresql + :tickets: 4246 + + The ORM now doubles the "FOR UPDATE" clause within the subquery that + renders in conjunction with joined eager loading in some cases, as it has + been observed that MySQL does not lock the rows from a subquery. This + means the query renders with two FOR UPDATE clauses; note that on some + backends such as Oracle, FOR UPDATE clauses on subqueries are silently + ignored since they are unnecessary. Additionally, in the case of the "OF" + clause used primarily with Postgresql, the FOR UPDATE is rendered only on + the inner subquery when this is used so that the selectable can be targeted + to the table within the SELECT statement. + + .. seealso:: + + :ref:`change_4246` \ No newline at end of file diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 6d2b144e32..ea8371f506 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -3493,6 +3493,9 @@ class Query(object): order_by=context.order_by, **self._select_args ) + # put FOR UPDATE on the inner query, where MySQL will honor it, + # as well as if it has an OF so Postgresql can use it. + inner._for_update_arg = context._for_update_arg for hint in self._with_hints: inner = inner.with_hint(*hint) @@ -3510,7 +3513,13 @@ class Query(object): [inner] + context.secondary_columns, use_labels=context.labels) - statement._for_update_arg = context._for_update_arg + # Oracle however does not allow FOR UPDATE on the subquery, + # and the Oracle dialect ignores it, plus for Postgresql, MySQL + # we expect that all elements of the row are locked, so also put it + # on the outside (except in the case of PG when OF is used) + if context._for_update_arg is not None and \ + context._for_update_arg.of is None: + statement._for_update_arg = context._for_update_arg from_clause = inner for eager_join in context.eager_joins.values(): diff --git a/test/dialect/mysql/test_for_update.py b/test/dialect/mysql/test_for_update.py new file mode 100644 index 0000000000..af467f920a --- /dev/null +++ b/test/dialect/mysql/test_for_update.py @@ -0,0 +1,161 @@ +"""Test MySQL FOR UPDATE behavior. + +See #4246 + +""" +import contextlib + +from sqlalchemy import Column, Integer, ForeignKey, update +from sqlalchemy.orm import relationship, Session, joinedload +from sqlalchemy import exc + +from sqlalchemy.testing import fixtures +from sqlalchemy import testing + + +class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest): + __backend__ = True + __only_on__ = 'mysql' + + @classmethod + def setup_classes(cls): + Base = cls.DeclarativeBasic + + class A(Base): + __tablename__ = 'a' + id = Column(Integer, primary_key=True) + x = Column(Integer) + y = Column(Integer) + bs = relationship("B") + __table_args__ = {"mysql_engine": "InnoDB"} + + class B(Base): + __tablename__ = 'b' + id = Column(Integer, primary_key=True) + a_id = Column(ForeignKey('a.id')) + x = Column(Integer) + y = Column(Integer) + __table_args__ = {"mysql_engine": "InnoDB"} + + @classmethod + def insert_data(cls): + A = cls.classes.A + B = cls.classes.B + + # all the x/y are < 10 + s = Session() + s.add_all( + [ + A(x=5, y=5, bs=[B(x=4, y=4), B(x=2, y=8), B(x=7, y=1)]), + A(x=7, y=5, bs=[B(x=4, y=4), B(x=5, y=8)]) + ] + ) + s.commit() + + @contextlib.contextmanager + def run_test(self): + connection = testing.db.connect() + connection.execute("set innodb_lock_wait_timeout=1") + main_trans = connection.begin() + try: + yield Session(bind=connection) + finally: + main_trans.rollback() + connection.close() + + def _assert_a_is_locked(self, should_be_locked): + A = self.classes.A + with testing.db.begin() as alt_trans: + alt_trans.execute("set innodb_lock_wait_timeout=1") + # set x/y > 10 + try: + alt_trans.execute( + update(A).values(x=15, y=19) + ) + except (exc.InternalError, exc.OperationalError) as err: + assert "Lock wait timeout exceeded" in str(err) + assert should_be_locked + else: + assert not should_be_locked + + def _assert_b_is_locked(self, should_be_locked): + B = self.classes.B + with testing.db.begin() as alt_trans: + alt_trans.execute("set innodb_lock_wait_timeout=1") + # set x/y > 10 + try: + alt_trans.execute( + update(B).values(x=15, y=19) + ) + except (exc.InternalError, exc.OperationalError) as err: + assert "Lock wait timeout exceeded" in str(err) + assert should_be_locked + else: + assert not should_be_locked + + def test_basic_lock(self): + A = self.classes.A + with self.run_test() as s: + s.query(A).with_for_update().all() + # test our fixture + self._assert_a_is_locked(True) + + def test_basic_not_lock(self): + A = self.classes.A + with self.run_test() as s: + s.query(A).all() + # test our fixture + self._assert_a_is_locked(False) + + def test_joined_lock_subquery(self): + A = self.classes.A + with self.run_test() as s: + s.query(A).options(joinedload(A.bs)).with_for_update().first() + + # test for issue #4246, should be locked + self._assert_a_is_locked(True) + self._assert_b_is_locked(True) + + def test_joined_lock_subquery_inner_for_update(self): + A = self.classes.A + B = self.classes.B + with self.run_test() as s: + q = s.query(A).with_for_update().subquery() + s.query(q).join(B).all() + + # FOR UPDATE is inside the subquery, should be locked + self._assert_a_is_locked(True) + + # FOR UPDATE is inside the subquery, B is not locked + self._assert_b_is_locked(False) + + def test_joined_lock_subquery_inner_for_update_outer(self): + A = self.classes.A + B = self.classes.B + with self.run_test() as s: + q = s.query(A).with_for_update().subquery() + s.query(q).join(B).with_for_update().all() + + # FOR UPDATE is inside the subquery, should be locked + self._assert_a_is_locked(True) + + # FOR UPDATE is also outside the subquery, B is locked + self._assert_b_is_locked(True) + + def test_joined_lock_subquery_order_for_update_outer(self): + A = self.classes.A + B = self.classes.B + with self.run_test() as s: + q = s.query(A).order_by(A.id).subquery() + s.query(q).join(B).with_for_update().all() + # FOR UPDATE is inside the subquery, should not be locked + self._assert_a_is_locked(False) + self._assert_b_is_locked(True) + + def test_joined_lock_no_subquery(self): + A = self.classes.A + with self.run_test() as s: + s.query(A).options(joinedload(A.bs)).with_for_update().all() + # no subquery, should be locked + self._assert_a_is_locked(True) + self._assert_b_is_locked(True) diff --git a/test/orm/test_lockmode.py b/test/orm/test_lockmode.py index cb35d6e27c..34ae52d3c7 100644 --- a/test/orm/test_lockmode.py +++ b/test/orm/test_lockmode.py @@ -2,10 +2,13 @@ from sqlalchemy.engine import default from sqlalchemy.databases import * from sqlalchemy.orm import mapper from sqlalchemy.orm import Session +from sqlalchemy.orm import joinedload +from sqlalchemy.orm import relationship from sqlalchemy.testing import AssertsCompiledSQL, eq_ from sqlalchemy.testing import assert_raises_message from sqlalchemy import exc from test.orm import _fixtures +from sqlalchemy import testing class LegacyLockModeTest(_fixtures.FixtureTest): @@ -102,6 +105,102 @@ class ForUpdateTest(_fixtures.FixtureTest): ) +class BackendTest(_fixtures.FixtureTest): + __backend__ = True + + # test against the major backends. We are naming specific databases + # here rather than using requirements rules since the behavior of + # "FOR UPDATE" as well as "OF" is very specific to each DB, and we need + # to run the query differently based on backend. + + @classmethod + def setup_mappers(cls): + User, users = cls.classes.User, cls.tables.users + Address, addresses = cls.classes.Address, cls.tables.addresses + mapper(User, users, properties={ + "addresses": relationship(Address) + }) + mapper(Address, addresses) + + def test_inner_joinedload_w_limit(self): + User = self.classes.User + sess = Session() + q = sess.query(User).options( + joinedload(User.addresses, innerjoin=True) + ).with_for_update().limit(1) + + if testing.against("oracle"): + assert_raises_message( + exc.DatabaseError, + "ORA-02014", + q.all + ) + else: + q.all() + sess.close() + + def test_inner_joinedload_wo_limit(self): + User = self.classes.User + sess = Session() + sess.query(User).options( + joinedload(User.addresses, innerjoin=True) + ).with_for_update().all() + sess.close() + + def test_outer_joinedload_w_limit(self): + User = self.classes.User + sess = Session() + q = sess.query(User).options( + joinedload(User.addresses, innerjoin=False) + ) + + if testing.against("postgresql"): + q = q.with_for_update(of=User) + else: + q = q.with_for_update() + + q = q.limit(1) + + if testing.against("oracle"): + assert_raises_message( + exc.DatabaseError, + "ORA-02014", + q.all + ) + else: + q.all() + sess.close() + + def test_outer_joinedload_wo_limit(self): + User = self.classes.User + sess = Session() + q = sess.query(User).options( + joinedload(User.addresses, innerjoin=False) + ) + + if testing.against("postgresql"): + q = q.with_for_update(of=User) + else: + q = q.with_for_update() + + q.all() + sess.close() + + def test_join_w_subquery(self): + User = self.classes.User + Address = self.classes.Address + sess = Session() + q1 = sess.query(User).with_for_update().subquery() + sess.query(q1).join(Address).all() + sess.close() + + def test_plain(self): + User = self.classes.User + sess = Session() + sess.query(User).with_for_update().all() + sess.close() + + class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL): """run some compile tests, even though these are redundant.""" run_inserts = None @@ -110,7 +209,9 @@ class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL): def setup_mappers(cls): User, users = cls.classes.User, cls.tables.users Address, addresses = cls.classes.Address, cls.tables.addresses - mapper(User, users) + mapper(User, users, properties={ + "addresses": relationship(Address) + }) mapper(Address, addresses) def test_default_update(self): @@ -214,7 +315,7 @@ class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL): .with_for_update(of=[User.id, User.id, User.id]), "SELECT users.id AS users_id FROM users FOR UPDATE OF users", dialect="postgresql" - ) + ) def test_postgres_update_skip_locked(self): User = self.classes.User @@ -251,3 +352,39 @@ class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL): "SELECT users.id AS users_id FROM users LOCK IN SHARE MODE", dialect="mysql" ) + + def test_for_update_on_inner_w_joinedload(self): + User = self.classes.User + sess = Session() + self.assert_compile( + sess.query(User).options( + joinedload(User.addresses)).with_for_update().limit(1), + "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name " + "AS anon_1_users_name, addresses_1.id AS addresses_1_id, " + "addresses_1.user_id AS addresses_1_user_id, " + "addresses_1.email_address AS addresses_1_email_address " + "FROM (SELECT users.id AS users_id, users.name AS users_name " + "FROM users LIMIT %s FOR UPDATE) AS anon_1 " + "LEFT OUTER JOIN addresses AS addresses_1 " + "ON anon_1.users_id = addresses_1.user_id FOR UPDATE", + dialect="mysql" + ) + + def test_for_update_on_inner_w_joinedload_no_render_oracle(self): + User = self.classes.User + sess = Session() + self.assert_compile( + sess.query(User).options( + joinedload(User.addresses)).with_for_update().limit(1), + "SELECT anon_1.users_id AS anon_1_users_id, " + "anon_1.users_name AS anon_1_users_name, " + "addresses_1.id AS addresses_1_id, " + "addresses_1.user_id AS addresses_1_user_id, " + "addresses_1.email_address AS addresses_1_email_addres_1 " + "FROM (SELECT users_id, users_name FROM " + "(SELECT users.id AS users_id, users.name AS users_name " + "FROM users) WHERE ROWNUM <= :param_1) anon_1 " + "LEFT OUTER JOIN addresses addresses_1 " + "ON anon_1.users_id = addresses_1.user_id FOR UPDATE", + dialect="oracle" + ) -- 2.47.2