]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fix 'No transaction found' error on Synapse.
authorGord Thompson <gord@gordthompson.com>
Fri, 22 Jul 2022 12:31:24 +0000 (08:31 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 2 Aug 2022 20:21:26 +0000 (16:21 -0400)
Fixed issue where the SQL Server dialect's query for the current isolation
level would fail on Azure Synapse Analytics, due to the way in which this
database handles transaction rollbacks after an error has occurred. The
initial query has been modified to no longer rely upon catching an error
when attempting to detect the appropriate system view. Additionally, to
better support this database's very specific "rollback" behavior,
implemented new parameter ``ignore_no_transaction_on_rollback`` indicating
that a rollback should ignore Azure Synapse error 'No corresponding
transaction found. (111214)', which is raised if no transaction is present
in conflict with the Python DBAPI.

Fixes: #8231
Closes: #8233
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/8233
Pull-request-sha: c48bd44a9f53d00e5e94f1b8bf996711b6419562

Change-Id: I6407a03148f45cc9eba8fe1d31d4f59ebf9c7ef7
(cherry picked from commit 8fe3cd69c5f2d8f73e75fb19ae929273282fba57)

doc/build/changelog/unreleased_14/8231.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/mssql/pyodbc.py
test/dialect/mssql/test_engine.py

diff --git a/doc/build/changelog/unreleased_14/8231.rst b/doc/build/changelog/unreleased_14/8231.rst
new file mode 100644 (file)
index 0000000..401ab71
--- /dev/null
@@ -0,0 +1,20 @@
+.. change::
+    :tags: bug, mssql
+    :tickets: 8231
+
+    Fixed issue where the SQL Server dialect's query for the current isolation
+    level would fail on Azure Synapse Analytics, due to the way in which this
+    database handles transaction rollbacks after an error has occurred. The
+    initial query has been modified to no longer rely upon catching an error
+    when attempting to detect the appropriate system view. Additionally, to
+    better support this database's very specific "rollback" behavior,
+    implemented new parameter ``ignore_no_transaction_on_rollback`` indicating
+    that a rollback should ignore Azure Synapse error 'No corresponding
+    transaction found. (111214)', which is raised if no transaction is present
+    in conflict with the Python DBAPI.
+
+    Initial patch and valuable debugging assistance courtesy of @ww2406.
+
+    .. seealso::
+
+        :ref:`azure_synapse_ignore_no_transaction_on_rollback`
index 3c22b9b7c964637f5bd8b5197415f48bd0714203..2d1d0f7008eb3f06b7c7969d740ab42229c22c40 100644 (file)
@@ -2752,6 +2752,7 @@ class MSDialect(default.DefaultDialect):
         json_serializer=None,
         json_deserializer=None,
         legacy_schema_aliasing=None,
+        ignore_no_transaction_on_rollback=False,
         **opts
     ):
         self.query_timeout = int(query_timeout or 0)
@@ -2759,6 +2760,9 @@ class MSDialect(default.DefaultDialect):
 
         self.use_scope_identity = use_scope_identity
         self.deprecate_large_types = deprecate_large_types
+        self.ignore_no_transaction_on_rollback = (
+            ignore_no_transaction_on_rollback
+        )
 
         if legacy_schema_aliasing is not None:
             util.warn_deprecated(
@@ -2783,6 +2787,22 @@ class MSDialect(default.DefaultDialect):
         # SQL Server does not support RELEASE SAVEPOINT
         pass
 
+    def do_rollback(self, dbapi_connection):
+        try:
+            super(MSDialect, self).do_rollback(dbapi_connection)
+        except self.dbapi.ProgrammingError as e:
+            if self.ignore_no_transaction_on_rollback and re.match(
+                r".*\b111214\b", str(e)
+            ):
+                util.warn(
+                    "ProgrammingError 111214 "
+                    "'No corresponding transaction found.' "
+                    "has been suppressed via "
+                    "ignore_no_transaction_on_rollback=True"
+                )
+            else:
+                raise
+
     _isolation_lookup = set(
         [
             "SERIALIZABLE",
@@ -2807,48 +2827,42 @@ class MSDialect(default.DefaultDialect):
         if level == "SNAPSHOT":
             connection.commit()
 
-    def get_isolation_level(self, connection):
-        last_error = None
+    def get_isolation_level(self, dbapi_connection):
+        cursor = dbapi_connection.cursor()
+        try:
+            cursor.execute(
+                "SELECT name FROM sys.system_views WHERE name IN "
+                "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
+            )
+            row = cursor.fetchone()
+            if not row:
+                raise NotImplementedError(
+                    "Can't fetch isolation level on this particular "
+                    "SQL Server version."
+                )
 
-        views = ("sys.dm_exec_sessions", "sys.dm_pdw_nodes_exec_sessions")
-        for view in views:
-            cursor = connection.cursor()
-            try:
-                cursor.execute(
-                    """
-                  SELECT CASE transaction_isolation_level
+            view_name = "sys.{}".format(row[0])
+            cursor.execute(
+                """
+                    SELECT CASE transaction_isolation_level
                     WHEN 0 THEN NULL
                     WHEN 1 THEN 'READ UNCOMMITTED'
                     WHEN 2 THEN 'READ COMMITTED'
                     WHEN 3 THEN 'REPEATABLE READ'
                     WHEN 4 THEN 'SERIALIZABLE'
                     WHEN 5 THEN 'SNAPSHOT' END AS TRANSACTION_ISOLATION_LEVEL
-                    FROM %s
+                    FROM {}
                     where session_id = @@SPID
-                  """
-                    % view
+                """.format(
+                    view_name
                 )
-                val = cursor.fetchone()[0]
-            except self.dbapi.Error as err:
-                # Python3 scoping rules
-                last_error = err
-                continue
-            else:
-                return val.upper()
-            finally:
-                cursor.close()
-        else:
-            # note that the NotImplementedError is caught by
-            # DefaultDialect, so the warning here is all that displays
-            util.warn(
-                "Could not fetch transaction isolation level, "
-                "tried views: %s; final error was: %s" % (views, last_error)
-            )
-            raise NotImplementedError(
-                "Can't fetch isolation level on this particular "
-                "SQL Server version. tried views: %s; final error was: %s"
-                % (views, last_error)
             )
+            row = cursor.fetchone()
+            assert row is not None
+            val = row[0]
+        finally:
+            cursor.close()
+        return val.upper()
 
     def initialize(self, connection):
         super(MSDialect, self).initialize(connection)
index 91e8fd6b5a07bf235bdd343ab4e9db17ecd8c553..edb76f26525e8bcc272298e57f7851bfd4040f07 100644 (file)
@@ -155,6 +155,34 @@ database using Azure credentials::
     stating that a connection string when using an access token must not contain
     ``UID``, ``PWD``, ``Authentication`` or ``Trusted_Connection`` parameters.
 
+.. _azure_synapse_ignore_no_transaction_on_rollback:
+
+Avoiding transaction-related exceptions on Azure Synapse Analytics
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Azure Synapse Analytics has a significant difference in its transaction
+handling compared to plain SQL Server; in some cases an error within a Synapse
+transaction can cause it to be arbitrarily terminated on the server side, which
+then causes the DBAPI ``.rollback()`` method (as well as ``.commit()``) to
+fail. The issue prevents the usual DBAPI contract of allowing ``.rollback()``
+to pass silently if no transaction is present as the driver does not expect
+this condition. The symptom of this failure is an exception with a message
+resembling 'No corresponding transaction found. (111214)' when attempting to
+emit a ``.rollback()`` after an operation had a failure of some kind.
+
+This specific case can be handled by passing ``ignore_no_transaction_on_rollback=True`` to
+the SQL Server dialect via the :func:`_sa.create_engine` function as follows::
+
+    engine = create_engine(connection_url, ignore_no_transaction_on_rollback=True)
+
+Using the above parameter, the dialect will catch ``ProgrammingError``
+exceptions raised during ``connection.rollback()`` and emit a warning
+if the error message contains code ``111214``, however will not raise
+an exception.
+
+.. versionadded:: 1.4.40  Added the
+   ``ignore_no_transaction_on_rollback=True`` parameter.
+
 Enable autocommit for Azure SQL Data Warehouse (DW) connections
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
index b5a04f1405bf95efe3ac02476f995745197e2da4..af8db861611f8738af27ff1a0001773627c8f710 100644 (file)
@@ -1,6 +1,7 @@
 # -*- encoding: utf-8
 
 from decimal import Decimal
+import re
 
 from sqlalchemy import Column
 from sqlalchemy import event
@@ -23,6 +24,7 @@ from sqlalchemy.testing import assert_warnings
 from sqlalchemy.testing import engines
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing import expect_raises
+from sqlalchemy.testing import expect_raises_message
 from sqlalchemy.testing import expect_warnings
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing import mock
@@ -647,7 +649,7 @@ class RealIsolationLevelTest(fixtures.TestBase):
 
 
 class IsolationLevelDetectTest(fixtures.TestBase):
-    def _fixture(self, view):
+    def _fixture(self, view_result):
         class Error(Exception):
             pass
 
@@ -660,15 +662,25 @@ class IsolationLevelDetectTest(fixtures.TestBase):
         def fail_on_exec(
             stmt,
         ):
-            if view is not None and view in stmt:
+            result[:] = []
+            if "SELECT name FROM sys.system_views" in stmt:
+                if view_result:
+                    result.append((view_result,))
+            elif re.match(
+                ".*SELECT CASE transaction_isolation_level.*FROM sys.%s"
+                % (view_result,),
+                stmt,
+                re.S,
+            ):
                 result.append(("SERIALIZABLE",))
             else:
-                raise Error("that didn't work")
+                assert False
 
         connection = Mock(
             cursor=Mock(
                 return_value=Mock(
-                    execute=fail_on_exec, fetchone=lambda: result[0]
+                    execute=fail_on_exec,
+                    fetchone=lambda: result[0] if result else None,
                 )
             )
         )
@@ -688,13 +700,12 @@ class IsolationLevelDetectTest(fixtures.TestBase):
     def test_not_supported(self):
         dialect, connection = self._fixture(None)
 
-        with expect_warnings("Could not fetch transaction isolation level"):
-            assert_raises_message(
-                NotImplementedError,
-                "Can't fetch isolation",
-                dialect.get_isolation_level,
-                connection,
-            )
+        assert_raises_message(
+            NotImplementedError,
+            "Can't fetch isolation level on this particular ",
+            dialect.get_isolation_level,
+            connection,
+        )
 
 
 class InvalidTransactionFalsePositiveTest(fixtures.TablesTest):
@@ -732,3 +743,44 @@ class InvalidTransactionFalsePositiveTest(fixtures.TablesTest):
         # "Can't reconnect until invalid transaction is rolled back."
         result = connection.execute(t.select()).fetchall()
         eq_(len(result), 1)
+
+
+class IgnoreNotransOnRollbackTest(fixtures.TestBase):
+    def test_ignore_no_transaction_on_rollback(self):
+        """test #8231"""
+
+        class ProgrammingError(Exception):
+            pass
+
+        dialect = base.dialect(ignore_no_transaction_on_rollback=True)
+        dialect.dbapi = mock.Mock(ProgrammingError=ProgrammingError)
+
+        connection = mock.Mock(
+            rollback=mock.Mock(
+                side_effect=ProgrammingError("Error 111214 happened")
+            )
+        )
+        with expect_warnings(
+            "ProgrammingError 111214 'No corresponding transaction found.' "
+            "has been suppressed via ignore_no_transaction_on_rollback=True"
+        ):
+            dialect.do_rollback(connection)
+
+    def test_other_programming_error_on_rollback(self):
+        """test #8231"""
+
+        class ProgrammingError(Exception):
+            pass
+
+        dialect = base.dialect(ignore_no_transaction_on_rollback=True)
+        dialect.dbapi = mock.Mock(ProgrammingError=ProgrammingError)
+
+        connection = mock.Mock(
+            rollback=mock.Mock(
+                side_effect=ProgrammingError("Some other error happened")
+            )
+        )
+        with expect_raises_message(
+            ProgrammingError, "Some other error happened"
+        ):
+            dialect.do_rollback(connection)