]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Render FOR UPDATE on the inner subquery as well as the outer
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 30 Apr 2018 15:31:48 +0000 (11:31 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 2 May 2018 00:52:44 +0000 (20:52 -0400)
The ORM now doubles the "FOR UPDATE" clause within the subquery that
renders in conjunction with joined eager loading in some cases, as it has
been observed that MySQL does not lock the rows from a subquery.   This
means the query renders with two FOR UPDATE clauses; note that on some
backends such as Oracle, FOR UPDATE clauses on subqueries are silently
ignored since they are unnecessary.  Additionally, in the case of the "OF"
clause used primarily with Postgresql, the FOR UPDATE is rendered only on
the inner subquery when this is used so that the selectable can be targeted
to the table within the SELECT statement.

Change-Id: Ie5520d08d82bf0afd9e1bd2d43a0b2a0db0de16d
Fixes: #4246
doc/build/changelog/migration_13.rst
doc/build/changelog/unreleased_13/4246.rst [new file with mode: 0644]
lib/sqlalchemy/orm/query.py
test/dialect/mysql/test_for_update.py [new file with mode: 0644]
test/orm/test_lockmode.py

index 3786067b3fd877dd99d3d97b4e1d026747a9a5c5..71ef4d0113348bf61a66ac5305e882363a97ecf2 100644 (file)
@@ -23,6 +23,70 @@ New Features and Improvements - ORM
 Key Behavioral Changes - ORM
 =============================
 
+.. _change_4246:
+
+FOR UPDATE clause is rendered within the joined eager load subquery as well as outside
+--------------------------------------------------------------------------------------
+
+This change applies specifically to the use of the :func:`.joinedload` loading
+strategy in conjunction with a row limited query, e.g. using :meth:`.Query.first`
+or :meth:`.Query.limit`, as well as with use of the :class:`.Query.with_for_update` method.
+
+Given a query as::
+
+    session.query(A).options(joinedload(A.b)).limit(5)
+
+The :class:`.Query` object renders a SELECT of the following form when joined
+eager loading is combined with LIMIT::
+
+    SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+        SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5
+    ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id
+
+This is so that the limit of rows takes place for the primary entity without
+affecting the joined eager load of related items.   When the above query is
+combined with "SELECT..FOR UPDATE", the behavior has been this::
+
+    SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+        SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5
+    ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id FOR UPDATE
+
+However, MySQL due to https://bugs.mysql.com/bug.php?id=90693 does not lock
+the rows inside the subquery, unlike that of Postgresql and other databases.
+So the above query now renders as::
+
+    SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+        SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 FOR UPDATE
+    ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id FOR UPDATE
+
+On the Oracle dialect, the inner "FOR UPDATE" is not rendered as Oracle does
+not support this syntax and the dialect skips any "FOR UPDATE" that is against
+a subquery; it isn't necessary in any case since Oracle, like Postgresql,
+correctly locks all elements of the returned row.
+
+When using the :paramref:`.Query.with_for_update.of` modifier, typically on
+Postgresql, the outer "FOR UPDATE" is omitted, and the OF is now rendered
+on the inside; previously, the OF target would not be converted to accommodate
+for the subquery correctly.  So
+given::
+
+    session.query(A).options(joinedload(A.b)).with_for_update(of=A).limit(5)
+
+The query would now render as::
+
+    SELECT subq.a_id, subq.a_data, b_alias.id, b_alias.data FROM (
+        SELECT a.id AS a_id, a.data AS a_data FROM a LIMIT 5 FOR UPDATE OF a
+    ) AS subq LEFT OUTER JOIN b ON subq.a_id=b.a_id
+
+The above form should be helpful on Postgresql additionally since Postgresql
+will not allow the FOR UPDATE clause to be rendered after the LEFT OUTER JOIN
+target.
+
+Overall, FOR UPDATE remains highly specific to the target database in use
+and can't easily be generalized for more complex queries.
+
+:ticket:`4246`
+
 New Features and Improvements - Core
 ====================================
 
diff --git a/doc/build/changelog/unreleased_13/4246.rst b/doc/build/changelog/unreleased_13/4246.rst
new file mode 100644 (file)
index 0000000..0b47c9e
--- /dev/null
@@ -0,0 +1,17 @@
+.. change::
+    :tags: bug, orm, mysql, postgresql
+    :tickets: 4246
+
+    The ORM now doubles the "FOR UPDATE" clause within the subquery that
+    renders in conjunction with joined eager loading in some cases, as it has
+    been observed that MySQL does not lock the rows from a subquery.   This
+    means the query renders with two FOR UPDATE clauses; note that on some
+    backends such as Oracle, FOR UPDATE clauses on subqueries are silently
+    ignored since they are unnecessary.  Additionally, in the case of the "OF"
+    clause used primarily with Postgresql, the FOR UPDATE is rendered only on
+    the inner subquery when this is used so that the selectable can be targeted
+    to the table within the SELECT statement.
+
+    .. seealso::
+
+        :ref:`change_4246`
\ No newline at end of file
index 6d2b144e32c08109161cec1370a2cd867e29318e..ea8371f506112d6786ab922cfd0d7144d4990591 100644 (file)
@@ -3493,6 +3493,9 @@ class Query(object):
             order_by=context.order_by,
             **self._select_args
         )
+        # put FOR UPDATE on the inner query, where MySQL will honor it,
+        # as well as if it has an OF so Postgresql can use it.
+        inner._for_update_arg = context._for_update_arg
 
         for hint in self._with_hints:
             inner = inner.with_hint(*hint)
@@ -3510,7 +3513,13 @@ class Query(object):
             [inner] + context.secondary_columns,
             use_labels=context.labels)
 
-        statement._for_update_arg = context._for_update_arg
+        # Oracle however does not allow FOR UPDATE on the subquery,
+        # and the Oracle dialect ignores it, plus for Postgresql, MySQL
+        # we expect that all elements of the row are locked, so also put it
+        # on the outside (except in the case of PG when OF is used)
+        if context._for_update_arg is not None and \
+                context._for_update_arg.of is None:
+            statement._for_update_arg = context._for_update_arg
 
         from_clause = inner
         for eager_join in context.eager_joins.values():
diff --git a/test/dialect/mysql/test_for_update.py b/test/dialect/mysql/test_for_update.py
new file mode 100644 (file)
index 0000000..af467f9
--- /dev/null
@@ -0,0 +1,161 @@
+"""Test MySQL FOR UPDATE behavior.
+
+See #4246
+
+"""
+import contextlib
+
+from sqlalchemy import Column, Integer, ForeignKey, update
+from sqlalchemy.orm import relationship, Session, joinedload
+from sqlalchemy import exc
+
+from sqlalchemy.testing import fixtures
+from sqlalchemy import testing
+
+
+class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
+    __backend__ = True
+    __only_on__ = 'mysql'
+
+    @classmethod
+    def setup_classes(cls):
+        Base = cls.DeclarativeBasic
+
+        class A(Base):
+            __tablename__ = 'a'
+            id = Column(Integer, primary_key=True)
+            x = Column(Integer)
+            y = Column(Integer)
+            bs = relationship("B")
+            __table_args__ = {"mysql_engine": "InnoDB"}
+
+        class B(Base):
+            __tablename__ = 'b'
+            id = Column(Integer, primary_key=True)
+            a_id = Column(ForeignKey('a.id'))
+            x = Column(Integer)
+            y = Column(Integer)
+            __table_args__ = {"mysql_engine": "InnoDB"}
+
+    @classmethod
+    def insert_data(cls):
+        A = cls.classes.A
+        B = cls.classes.B
+
+        # all the x/y are < 10
+        s = Session()
+        s.add_all(
+            [
+                A(x=5, y=5, bs=[B(x=4, y=4), B(x=2, y=8), B(x=7, y=1)]),
+                A(x=7, y=5, bs=[B(x=4, y=4), B(x=5, y=8)])
+            ]
+        )
+        s.commit()
+
+    @contextlib.contextmanager
+    def run_test(self):
+        connection = testing.db.connect()
+        connection.execute("set innodb_lock_wait_timeout=1")
+        main_trans = connection.begin()
+        try:
+            yield Session(bind=connection)
+        finally:
+            main_trans.rollback()
+            connection.close()
+
+    def _assert_a_is_locked(self, should_be_locked):
+        A = self.classes.A
+        with testing.db.begin() as alt_trans:
+            alt_trans.execute("set innodb_lock_wait_timeout=1")
+            # set x/y > 10
+            try:
+                alt_trans.execute(
+                    update(A).values(x=15, y=19)
+                )
+            except (exc.InternalError, exc.OperationalError) as err:
+                assert "Lock wait timeout exceeded" in str(err)
+                assert should_be_locked
+            else:
+                assert not should_be_locked
+
+    def _assert_b_is_locked(self, should_be_locked):
+        B = self.classes.B
+        with testing.db.begin() as alt_trans:
+            alt_trans.execute("set innodb_lock_wait_timeout=1")
+            # set x/y > 10
+            try:
+                alt_trans.execute(
+                    update(B).values(x=15, y=19)
+                )
+            except (exc.InternalError, exc.OperationalError) as err:
+                assert "Lock wait timeout exceeded" in str(err)
+                assert should_be_locked
+            else:
+                assert not should_be_locked
+
+    def test_basic_lock(self):
+        A = self.classes.A
+        with self.run_test() as s:
+            s.query(A).with_for_update().all()
+            # test our fixture
+            self._assert_a_is_locked(True)
+
+    def test_basic_not_lock(self):
+        A = self.classes.A
+        with self.run_test() as s:
+            s.query(A).all()
+            # test our fixture
+            self._assert_a_is_locked(False)
+
+    def test_joined_lock_subquery(self):
+        A = self.classes.A
+        with self.run_test() as s:
+            s.query(A).options(joinedload(A.bs)).with_for_update().first()
+
+            # test for issue #4246, should be locked
+            self._assert_a_is_locked(True)
+            self._assert_b_is_locked(True)
+
+    def test_joined_lock_subquery_inner_for_update(self):
+        A = self.classes.A
+        B = self.classes.B
+        with self.run_test() as s:
+            q = s.query(A).with_for_update().subquery()
+            s.query(q).join(B).all()
+
+            # FOR UPDATE is inside the subquery, should be locked
+            self._assert_a_is_locked(True)
+
+            # FOR UPDATE is inside the subquery, B is not locked
+            self._assert_b_is_locked(False)
+
+    def test_joined_lock_subquery_inner_for_update_outer(self):
+        A = self.classes.A
+        B = self.classes.B
+        with self.run_test() as s:
+            q = s.query(A).with_for_update().subquery()
+            s.query(q).join(B).with_for_update().all()
+
+            # FOR UPDATE is inside the subquery, should be locked
+            self._assert_a_is_locked(True)
+
+            # FOR UPDATE is also outside the subquery, B is locked
+            self._assert_b_is_locked(True)
+
+    def test_joined_lock_subquery_order_for_update_outer(self):
+        A = self.classes.A
+        B = self.classes.B
+        with self.run_test() as s:
+            q = s.query(A).order_by(A.id).subquery()
+            s.query(q).join(B).with_for_update().all()
+            # FOR UPDATE is inside the subquery, should not be locked
+            self._assert_a_is_locked(False)
+            self._assert_b_is_locked(True)
+
+    def test_joined_lock_no_subquery(self):
+        A = self.classes.A
+        with self.run_test() as s:
+            s.query(A).options(joinedload(A.bs)).with_for_update().all()
+            # no subquery, should be locked
+            self._assert_a_is_locked(True)
+            self._assert_b_is_locked(True)
index cb35d6e27c774450b90ac861555b50f263249863..34ae52d3c7f12f680b4ca470723a0f0029c5ae0e 100644 (file)
@@ -2,10 +2,13 @@ from sqlalchemy.engine import default
 from sqlalchemy.databases import *
 from sqlalchemy.orm import mapper
 from sqlalchemy.orm import Session
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import relationship
 from sqlalchemy.testing import AssertsCompiledSQL, eq_
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy import exc
 from test.orm import _fixtures
+from sqlalchemy import testing
 
 
 class LegacyLockModeTest(_fixtures.FixtureTest):
@@ -102,6 +105,102 @@ class ForUpdateTest(_fixtures.FixtureTest):
         )
 
 
+class BackendTest(_fixtures.FixtureTest):
+    __backend__ = True
+
+    # test against the major backends.   We are naming specific databases
+    # here rather than using requirements rules since the behavior of
+    # "FOR UPDATE" as well as "OF" is very specific to each DB, and we need
+    # to run the query differently based on backend.
+
+    @classmethod
+    def setup_mappers(cls):
+        User, users = cls.classes.User, cls.tables.users
+        Address, addresses = cls.classes.Address, cls.tables.addresses
+        mapper(User, users, properties={
+            "addresses": relationship(Address)
+        })
+        mapper(Address, addresses)
+
+    def test_inner_joinedload_w_limit(self):
+        User = self.classes.User
+        sess = Session()
+        q = sess.query(User).options(
+            joinedload(User.addresses, innerjoin=True)
+        ).with_for_update().limit(1)
+
+        if testing.against("oracle"):
+            assert_raises_message(
+                exc.DatabaseError,
+                "ORA-02014",
+                q.all
+            )
+        else:
+            q.all()
+        sess.close()
+
+    def test_inner_joinedload_wo_limit(self):
+        User = self.classes.User
+        sess = Session()
+        sess.query(User).options(
+            joinedload(User.addresses, innerjoin=True)
+        ).with_for_update().all()
+        sess.close()
+
+    def test_outer_joinedload_w_limit(self):
+        User = self.classes.User
+        sess = Session()
+        q = sess.query(User).options(
+            joinedload(User.addresses, innerjoin=False)
+        )
+
+        if testing.against("postgresql"):
+            q = q.with_for_update(of=User)
+        else:
+            q = q.with_for_update()
+
+        q = q.limit(1)
+
+        if testing.against("oracle"):
+            assert_raises_message(
+                exc.DatabaseError,
+                "ORA-02014",
+                q.all
+            )
+        else:
+            q.all()
+        sess.close()
+
+    def test_outer_joinedload_wo_limit(self):
+        User = self.classes.User
+        sess = Session()
+        q = sess.query(User).options(
+            joinedload(User.addresses, innerjoin=False)
+        )
+
+        if testing.against("postgresql"):
+            q = q.with_for_update(of=User)
+        else:
+            q = q.with_for_update()
+
+        q.all()
+        sess.close()
+
+    def test_join_w_subquery(self):
+        User = self.classes.User
+        Address = self.classes.Address
+        sess = Session()
+        q1 = sess.query(User).with_for_update().subquery()
+        sess.query(q1).join(Address).all()
+        sess.close()
+
+    def test_plain(self):
+        User = self.classes.User
+        sess = Session()
+        sess.query(User).with_for_update().all()
+        sess.close()
+
+
 class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL):
     """run some compile tests, even though these are redundant."""
     run_inserts = None
@@ -110,7 +209,9 @@ class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL):
     def setup_mappers(cls):
         User, users = cls.classes.User, cls.tables.users
         Address, addresses = cls.classes.Address, cls.tables.addresses
-        mapper(User, users)
+        mapper(User, users, properties={
+            "addresses": relationship(Address)
+        })
         mapper(Address, addresses)
 
     def test_default_update(self):
@@ -214,7 +315,7 @@ class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL):
             .with_for_update(of=[User.id, User.id, User.id]),
             "SELECT users.id AS users_id FROM users FOR UPDATE OF users",
             dialect="postgresql"
-                            )
+        )
 
     def test_postgres_update_skip_locked(self):
         User = self.classes.User
@@ -251,3 +352,39 @@ class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL):
             "SELECT users.id AS users_id FROM users LOCK IN SHARE MODE",
             dialect="mysql"
         )
+
+    def test_for_update_on_inner_w_joinedload(self):
+        User = self.classes.User
+        sess = Session()
+        self.assert_compile(
+            sess.query(User).options(
+                joinedload(User.addresses)).with_for_update().limit(1),
+            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name "
+            "AS anon_1_users_name, addresses_1.id AS addresses_1_id, "
+            "addresses_1.user_id AS addresses_1_user_id, "
+            "addresses_1.email_address AS addresses_1_email_address "
+            "FROM (SELECT users.id AS users_id, users.name AS users_name "
+            "FROM users  LIMIT %s FOR UPDATE) AS anon_1 "
+            "LEFT OUTER JOIN addresses AS addresses_1 "
+            "ON anon_1.users_id = addresses_1.user_id FOR UPDATE",
+            dialect="mysql"
+        )
+
+    def test_for_update_on_inner_w_joinedload_no_render_oracle(self):
+        User = self.classes.User
+        sess = Session()
+        self.assert_compile(
+            sess.query(User).options(
+                joinedload(User.addresses)).with_for_update().limit(1),
+            "SELECT anon_1.users_id AS anon_1_users_id, "
+            "anon_1.users_name AS anon_1_users_name, "
+            "addresses_1.id AS addresses_1_id, "
+            "addresses_1.user_id AS addresses_1_user_id, "
+            "addresses_1.email_address AS addresses_1_email_addres_1 "
+            "FROM (SELECT users_id, users_name FROM "
+            "(SELECT users.id AS users_id, users.name AS users_name "
+            "FROM users) WHERE ROWNUM <= :param_1) anon_1 "
+            "LEFT OUTER JOIN addresses addresses_1 "
+            "ON anon_1.users_id = addresses_1.user_id FOR UPDATE",
+            dialect="oracle"
+        )