]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
feat: Support MySQL FOR SHARE locking syntax.
authorJetDrag <taptube@gmail.com>
Wed, 5 Nov 2025 03:47:02 +0000 (22:47 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 11 Nov 2025 00:20:54 +0000 (19:20 -0500)
Added support for MySQL 8.0.1 + ``FOR SHARE`` to be emitted for the
:meth:`.Select.with_for_uddate` method, which offers compatibility with
``NOWAIT`` and ``SKIP LOCKED``.  The new syntax is used only for MySQL when
version 8.0.1 or higher is detected. Pull request courtesy JetDrag.

Fixes: #10134
Closes: #12964
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/12964
Pull-request-sha: 03d5e37cfda5da9dab8ae00aa682521f8ab9190b

Change-Id: Iafb7a24363284edcfeead94a348f50a470a88403

doc/build/changelog/unreleased_20/12964.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mysql/base.py
test/dialect/mysql/test_for_update.py
test/requirements.py

diff --git a/doc/build/changelog/unreleased_20/12964.rst b/doc/build/changelog/unreleased_20/12964.rst
new file mode 100644 (file)
index 0000000..27c445c
--- /dev/null
@@ -0,0 +1,8 @@
+.. change::
+    :tags: usecase, mysql
+    :tickets: 12964
+
+    Added support for MySQL 8.0.1 + ``FOR SHARE`` to be emitted for the
+    :meth:`.Select.with_for_uddate` method, which offers compatibility with
+    ``NOWAIT`` and ``SKIP LOCKED``.  The new syntax is used only for MySQL when
+    version 8.0.1 or higher is detected. Pull request courtesy JetDrag.
index 1c51302ba2a03dfd8a106766df16bd70f402d048..129c6e36ff0c2a83cb69c3002f44c7af5c689829 100644 (file)
@@ -1781,7 +1781,10 @@ class MySQLCompiler(compiler.SQLCompiler):
     ) -> str:
         assert select._for_update_arg is not None
         if select._for_update_arg.read:
-            tmp = " LOCK IN SHARE MODE"
+            if self.dialect.use_mysql_for_share:
+                tmp = " FOR SHARE"
+            else:
+                tmp = " LOCK IN SHARE MODE"
         else:
             tmp = " FOR UPDATE"
 
@@ -2729,16 +2732,19 @@ class MySQLDialect(default.DefaultDialect):
 
     returns_native_bytes = True
 
-    supports_sequences = False  # default for MySQL ...
     # ... may be updated to True for MariaDB 10.3+ in initialize()
+    supports_sequences = False
 
     sequences_optional = False
 
-    supports_for_update_of = False  # default for MySQL ...
     # ... may be updated to True for MySQL 8+ in initialize()
+    supports_for_update_of = False
+
+    # mysql 8.0.1 uses this syntax
+    use_mysql_for_share = False
 
-    _requires_alias_for_on_duplicate_key = False  # Only available ...
-    # ... in MySQL 8+
+    # Only available ... ... in MySQL 8+
+    _requires_alias_for_on_duplicate_key = False
 
     # MySQL doesn't support "DEFAULT VALUES" but *does* support
     # "VALUES (DEFAULT)"
@@ -3194,6 +3200,10 @@ class MySQLDialect(default.DefaultDialect):
             self._is_mysql and self.server_version_info >= (8,)
         )
 
+        self.use_mysql_for_share = (
+            self._is_mysql and self.server_version_info >= (8, 0, 1)
+        )
+
         self._needs_correct_for_88718_96365 = (
             not self.is_mariadb and self.server_version_info >= (8,)
         )
index 5c26d8eb6d5a009ece7ceeb3c2061e51ee25fd6c..eebba71b614d6adfb797f081f7ea22995a41a59e 100644 (file)
@@ -184,6 +184,17 @@ class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
             self._assert_a_is_locked(True)
             self._assert_b_is_locked(True)
 
+    @testing.requires.mysql_for_update_read
+    def test_for_update_read(self):
+        A = self.classes.A
+        with self.run_test() as s:
+            s.query(A).options(joinedload(A.bs)).with_for_update(
+                read=True
+            ).all()
+            # no subquery, should be locked
+            self._assert_a_is_locked(True)
+            self._assert_b_is_locked(True)
+
 
 class MySQLForUpdateCompileTest(fixtures.TestBase, AssertsCompiledSQL):
     __dialect__ = mysql.dialect()
@@ -197,6 +208,11 @@ class MySQLForUpdateCompileTest(fixtures.TestBase, AssertsCompiledSQL):
     for_update_of_dialect.server_version_info = (8, 0, 0)
     for_update_of_dialect.supports_for_update_of = True
 
+    for_share_dialect = mysql.dialect()
+    for_share_dialect.server_version_info = (8, 0, 1)
+    for_share_dialect.supports_for_update_of = True
+    for_share_dialect.use_mysql_for_share = True
+
     def test_for_update_basic(self):
         self.assert_compile(
             self.table1.select()
@@ -206,13 +222,17 @@ class MySQLForUpdateCompileTest(fixtures.TestBase, AssertsCompiledSQL):
             "FROM mytable WHERE mytable.myid = %s FOR UPDATE",
         )
 
-    def test_for_update_read(self):
+    @testing.variation("dialect_type", ["generic", "mysql801"])
+    def test_for_update_read(self, dialect_type):
         self.assert_compile(
             self.table1.select()
             .where(self.table1.c.myid == 7)
             .with_for_update(read=True),
             "SELECT mytable.myid, mytable.name, mytable.description "
-            "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
+            "FROM mytable WHERE mytable.myid = %s "
+            f"""{'FOR SHARE'
+                 if dialect_type.mysql801 else 'LOCK IN SHARE MODE'}""",
+            dialect=self.for_share_dialect if dialect_type.mysql801 else None,
         )
 
     def test_for_update_skip_locked(self):
@@ -225,14 +245,18 @@ class MySQLForUpdateCompileTest(fixtures.TestBase, AssertsCompiledSQL):
             "FOR UPDATE SKIP LOCKED",
         )
 
-    def test_for_update_read_and_skip_locked(self):
+    @testing.variation("dialect_type", ["generic", "mysql801"])
+    def test_for_update_read_and_skip_locked(self, dialect_type):
         self.assert_compile(
             self.table1.select()
             .where(self.table1.c.myid == 7)
             .with_for_update(read=True, skip_locked=True),
             "SELECT mytable.myid, mytable.name, mytable.description "
             "FROM mytable WHERE mytable.myid = %s "
-            "LOCK IN SHARE MODE SKIP LOCKED",
+            f"""{'FOR SHARE'
+                 if dialect_type.mysql801 else 'LOCK IN SHARE MODE'}"""
+            " SKIP LOCKED",
+            dialect=self.for_share_dialect if dialect_type.mysql801 else None,
         )
 
     def test_for_update_nowait(self):
@@ -245,14 +269,18 @@ class MySQLForUpdateCompileTest(fixtures.TestBase, AssertsCompiledSQL):
             "FOR UPDATE NOWAIT",
         )
 
-    def test_for_update_read_and_nowait(self):
+    @testing.variation("dialect_type", ["generic", "mysql801"])
+    def test_for_update_read_and_nowait(self, dialect_type):
         self.assert_compile(
             self.table1.select()
             .where(self.table1.c.myid == 7)
             .with_for_update(read=True, nowait=True),
             "SELECT mytable.myid, mytable.name, mytable.description "
             "FROM mytable WHERE mytable.myid = %s "
-            "LOCK IN SHARE MODE NOWAIT",
+            f"""{'FOR SHARE'
+                 if dialect_type.mysql801 else 'LOCK IN SHARE MODE'}"""
+            " NOWAIT",
+            dialect=self.for_share_dialect if dialect_type.mysql801 else None,
         )
 
     def test_for_update_of_nowait(self):
@@ -333,29 +361,44 @@ class MySQLForUpdateCompileTest(fixtures.TestBase, AssertsCompiledSQL):
             dialect=self.for_update_of_dialect,
         )
 
-    def test_for_update_of_read_nowait(self):
+    @testing.variation("dialect_type", ["mysql800", "mysql801"])
+    def test_for_update_of_read_nowait(self, dialect_type):
         self.assert_compile(
             self.table1.select()
             .where(self.table1.c.myid == 7)
             .with_for_update(read=True, of=self.table1, nowait=True),
             "SELECT mytable.myid, mytable.name, mytable.description "
             "FROM mytable WHERE mytable.myid = %s "
-            "LOCK IN SHARE MODE OF mytable NOWAIT",
-            dialect=self.for_update_of_dialect,
+            f"""{'FOR SHARE'
+                 if dialect_type.mysql801 else 'LOCK IN SHARE MODE'} """
+            "OF mytable NOWAIT",
+            dialect=(
+                self.for_update_of_dialect
+                if dialect_type.mysql800
+                else self.for_share_dialect
+            ),
         )
 
-    def test_for_update_of_read_skip_locked(self):
+    @testing.variation("dialect_type", ["mysql800", "mysql801"])
+    def test_for_update_of_read_skip_locked(self, dialect_type):
         self.assert_compile(
             self.table1.select()
             .where(self.table1.c.myid == 7)
             .with_for_update(read=True, of=self.table1, skip_locked=True),
             "SELECT mytable.myid, mytable.name, mytable.description "
             "FROM mytable WHERE mytable.myid = %s "
-            "LOCK IN SHARE MODE OF mytable SKIP LOCKED",
-            dialect=self.for_update_of_dialect,
+            f"""{'FOR SHARE'
+                 if dialect_type.mysql801 else 'LOCK IN SHARE MODE'} """
+            "OF mytable SKIP LOCKED",
+            dialect=(
+                self.for_update_of_dialect
+                if dialect_type.mysql800
+                else self.for_share_dialect
+            ),
         )
 
-    def test_for_update_of_read_nowait_column_list(self):
+    @testing.variation("dialect_type", ["mysql800", "mysql801"])
+    def test_for_update_of_read_nowait_column_list(self, dialect_type):
         self.assert_compile(
             self.table1.select()
             .where(self.table1.c.myid == 7)
@@ -366,19 +409,32 @@ class MySQLForUpdateCompileTest(fixtures.TestBase, AssertsCompiledSQL):
             ),
             "SELECT mytable.myid, mytable.name, mytable.description "
             "FROM mytable WHERE mytable.myid = %s "
-            "LOCK IN SHARE MODE OF mytable NOWAIT",
-            dialect=self.for_update_of_dialect,
+            f"""{'FOR SHARE'
+                 if dialect_type.mysql801 else 'LOCK IN SHARE MODE'} """
+            "OF mytable NOWAIT",
+            dialect=(
+                self.for_update_of_dialect
+                if dialect_type.mysql800
+                else self.for_share_dialect
+            ),
         )
 
-    def test_for_update_of_read(self):
+    @testing.variation("dialect_type", ["mysql800", "mysql801"])
+    def test_for_update_of_read(self, dialect_type):
         self.assert_compile(
             self.table1.select()
             .where(self.table1.c.myid == 7)
             .with_for_update(read=True, of=self.table1),
             "SELECT mytable.myid, mytable.name, mytable.description "
             "FROM mytable WHERE mytable.myid = %s "
-            "LOCK IN SHARE MODE OF mytable",
-            dialect=self.for_update_of_dialect,
+            f"""{'FOR SHARE'
+                 if dialect_type.mysql801 else 'LOCK IN SHARE MODE'} """
+            "OF mytable",
+            dialect=(
+                self.for_update_of_dialect
+                if dialect_type.mysql800
+                else self.for_share_dialect
+            ),
         )
 
     def test_for_update_textual_of(self):
index c23e29042849e933ee9b3d497053ff04ea5ca6bd..2695cdbcd0089e45f3733bcbb17e5b9aa3f7f2df 100644 (file)
@@ -1816,6 +1816,10 @@ class DefaultRequirements(SuiteRequirements):
             "lock-sensitive operations crash on mysqlconnector",
         )
 
+    @property
+    def mysql_for_update_read(self):
+        return self.mysql_for_update + only_on(["mysql >= 8.0.0", "mariadb"])
+
     @property
     def mysql_fsp(self):
         return only_if(["mysql >= 5.6.4", "mariadb"])