]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fix 'No transaction found' error on Synapse.
authorGord Thompson <gord@gordthompson.com>
Fri, 22 Jul 2022 12:31:24 +0000 (08:31 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 2 Aug 2022 17:59:16 +0000 (13:59 -0400)
Fixed issue where the SQL Server dialect's query for the current isolation
level would fail on Azure Synapse Analytics, due to the way in which this
database handles transaction rollbacks after an error has occurred. The
initial query has been modified to no longer rely upon catching an error
when attempting to detect the appropriate system view. Additionally, to
better support this database's very specific "rollback" behavior,
implemented new parameter ``ignore_no_transaction_on_rollback`` indicating
that a rollback should ignore Azure Synapse error 'No corresponding
transaction found. (111214)', which is raised if no transaction is present
in conflict with the Python DBAPI.

Fixes: #8231
Closes: #8233
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/8233
Pull-request-sha: c48bd44a9f53d00e5e94f1b8bf996711b6419562

Change-Id: I6407a03148f45cc9eba8fe1d31d4f59ebf9c7ef7

doc/build/changelog/unreleased_14/8231.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/mssql/pyodbc.py
test/dialect/mssql/test_engine.py

diff --git a/doc/build/changelog/unreleased_14/8231.rst b/doc/build/changelog/unreleased_14/8231.rst
new file mode 100644 (file)
index 0000000..401ab71
--- /dev/null
@@ -0,0 +1,20 @@
+.. change::
+    :tags: bug, mssql
+    :tickets: 8231
+
+    Fixed issue where the SQL Server dialect's query for the current isolation
+    level would fail on Azure Synapse Analytics, due to the way in which this
+    database handles transaction rollbacks after an error has occurred. The
+    initial query has been modified to no longer rely upon catching an error
+    when attempting to detect the appropriate system view. Additionally, to
+    better support this database's very specific "rollback" behavior,
+    implemented new parameter ``ignore_no_transaction_on_rollback`` indicating
+    that a rollback should ignore Azure Synapse error 'No corresponding
+    transaction found. (111214)', which is raised if no transaction is present
+    in conflict with the Python DBAPI.
+
+    Initial patch and valuable debugging assistance courtesy of @ww2406.
+
+    .. seealso::
+
+        :ref:`azure_synapse_ignore_no_transaction_on_rollback`
index e33ae4dbb686ec2de0de0515a7b7b20875d41e60..73e35d4bb7190219cf1cd3729ba3702f6d149d58 100644 (file)
@@ -2863,6 +2863,7 @@ class MSDialect(default.DefaultDialect):
         json_serializer=None,
         json_deserializer=None,
         legacy_schema_aliasing=None,
+        ignore_no_transaction_on_rollback=False,
         **opts,
     ):
         self.query_timeout = int(query_timeout or 0)
@@ -2870,6 +2871,9 @@ class MSDialect(default.DefaultDialect):
 
         self.use_scope_identity = use_scope_identity
         self.deprecate_large_types = deprecate_large_types
+        self.ignore_no_transaction_on_rollback = (
+            ignore_no_transaction_on_rollback
+        )
 
         if legacy_schema_aliasing is not None:
             util.warn_deprecated(
@@ -2893,6 +2897,22 @@ class MSDialect(default.DefaultDialect):
         # SQL Server does not support RELEASE SAVEPOINT
         pass
 
+    def do_rollback(self, dbapi_connection):
+        try:
+            super(MSDialect, self).do_rollback(dbapi_connection)
+        except self.dbapi.ProgrammingError as e:
+            if self.ignore_no_transaction_on_rollback and re.match(
+                r".*\b111214\b", str(e)
+            ):
+                util.warn(
+                    "ProgrammingError 111214 "
+                    "'No corresponding transaction found.' "
+                    "has been suppressed via "
+                    "ignore_no_transaction_on_rollback=True"
+                )
+            else:
+                raise
+
     _isolation_lookup = set(
         [
             "SERIALIZABLE",
@@ -2914,46 +2934,41 @@ class MSDialect(default.DefaultDialect):
             dbapi_connection.commit()
 
     def get_isolation_level(self, dbapi_connection):
-        last_error = None
+        cursor = dbapi_connection.cursor()
+        try:
+            cursor.execute(
+                "SELECT name FROM sys.system_views WHERE name IN "
+                "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
+            )
+            row = cursor.fetchone()
+            if not row:
+                raise NotImplementedError(
+                    "Can't fetch isolation level on this particular "
+                    "SQL Server version."
+                )
 
-        views = ("sys.dm_exec_sessions", "sys.dm_pdw_nodes_exec_sessions")
-        for view in views:
-            cursor = dbapi_connection.cursor()
-            try:
-                cursor.execute(
-                    f"""
-                  SELECT CASE transaction_isolation_level
+            view_name = "sys.{}".format(row[0])
+            cursor.execute(
+                """
+                    SELECT CASE transaction_isolation_level
                     WHEN 0 THEN NULL
                     WHEN 1 THEN 'READ UNCOMMITTED'
                     WHEN 2 THEN 'READ COMMITTED'
                     WHEN 3 THEN 'REPEATABLE READ'
                     WHEN 4 THEN 'SERIALIZABLE'
                     WHEN 5 THEN 'SNAPSHOT' END AS TRANSACTION_ISOLATION_LEVEL
-                    FROM {view}
+                    FROM {}
                     where session_id = @@SPID
-                  """
+                """.format(
+                    view_name
                 )
-                val = cursor.fetchone()[0]
-            except self.dbapi.Error as err:
-                # Python3 scoping rules
-                last_error = err
-                continue
-            else:
-                return val.upper()
-            finally:
-                cursor.close()
-        else:
-            # note that the NotImplementedError is caught by
-            # DefaultDialect, so the warning here is all that displays
-            util.warn(
-                "Could not fetch transaction isolation level, "
-                f"tried views: {views}; final error was: {last_error}"
-            )
-            raise NotImplementedError(
-                "Can't fetch isolation level on this particular "
-                f"SQL Server version. tried views: {views}; final "
-                f"error was: {last_error}"
             )
+            row = cursor.fetchone()
+            assert row is not None
+            val = row[0]
+        finally:
+            cursor.close()
+        return val.upper()
 
     def initialize(self, connection):
         super(MSDialect, self).initialize(connection)
index 9f73ed28c1a14a1536360644f40ec9053914e47b..22e385865e9c8e0c7e1104386d87fa7c040eed45 100644 (file)
@@ -157,6 +157,34 @@ database using Azure credentials::
     stating that a connection string when using an access token must not contain
     ``UID``, ``PWD``, ``Authentication`` or ``Trusted_Connection`` parameters.
 
+.. _azure_synapse_ignore_no_transaction_on_rollback:
+
+Avoiding transaction-related exceptions on Azure Synapse Analytics
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Azure Synapse Analytics has a significant difference in its transaction
+handling compared to plain SQL Server; in some cases an error within a Synapse
+transaction can cause it to be arbitrarily terminated on the server side, which
+then causes the DBAPI ``.rollback()`` method (as well as ``.commit()``) to
+fail. The issue prevents the usual DBAPI contract of allowing ``.rollback()``
+to pass silently if no transaction is present as the driver does not expect
+this condition. The symptom of this failure is an exception with a message
+resembling 'No corresponding transaction found. (111214)' when attempting to
+emit a ``.rollback()`` after an operation had a failure of some kind.
+
+This specific case can be handled by passing ``ignore_no_transaction_on_rollback=True`` to
+the SQL Server dialect via the :func:`_sa.create_engine` function as follows::
+
+    engine = create_engine(connection_url, ignore_no_transaction_on_rollback=True)
+
+Using the above parameter, the dialect will catch ``ProgrammingError``
+exceptions raised during ``connection.rollback()`` and emit a warning
+if the error message contains code ``111214``, however will not raise
+an exception.
+
+.. versionadded:: 1.4.40  Added the
+   ``ignore_no_transaction_on_rollback=True`` parameter.
+
 Enable autocommit for Azure SQL Data Warehouse (DW) connections
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
@@ -591,7 +619,10 @@ class MSDialect_pyodbc(PyODBCConnector, MSDialect):
     )
 
     def __init__(
-        self, fast_executemany=False, use_setinputsizes=True, **params
+        self,
+        fast_executemany=False,
+        use_setinputsizes=True,
+        **params,
     ):
         super(MSDialect_pyodbc, self).__init__(
             use_setinputsizes=use_setinputsizes, **params
index 967ca85fe9327356a04e63c52cd5657e19c3adef..0289202035fd5c3b41e403fa6a0174a9411f27ea 100644 (file)
@@ -1,6 +1,7 @@
 # -*- encoding: utf-8
 
 from decimal import Decimal
+import re
 from unittest.mock import Mock
 
 from sqlalchemy import Column
@@ -24,6 +25,7 @@ from sqlalchemy.testing import assert_warnings
 from sqlalchemy.testing import engines
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing import expect_raises
+from sqlalchemy.testing import expect_raises_message
 from sqlalchemy.testing import expect_warnings
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing import mock
@@ -645,7 +647,7 @@ class RealIsolationLevelTest(fixtures.TestBase):
 
 
 class IsolationLevelDetectTest(fixtures.TestBase):
-    def _fixture(self, view):
+    def _fixture(self, view_result):
         class Error(Exception):
             pass
 
@@ -658,15 +660,25 @@ class IsolationLevelDetectTest(fixtures.TestBase):
         def fail_on_exec(
             stmt,
         ):
-            if view is not None and view in stmt:
+            result.clear()
+            if "SELECT name FROM sys.system_views" in stmt:
+                if view_result:
+                    result.append((view_result,))
+            elif re.match(
+                ".*SELECT CASE transaction_isolation_level.*FROM sys.%s"
+                % (view_result,),
+                stmt,
+                re.S,
+            ):
                 result.append(("SERIALIZABLE",))
             else:
-                raise Error("that didn't work")
+                assert False
 
         connection = Mock(
             cursor=Mock(
                 return_value=Mock(
-                    execute=fail_on_exec, fetchone=lambda: result[0]
+                    execute=fail_on_exec,
+                    fetchone=lambda: result[0] if result else None,
                 )
             )
         )
@@ -686,13 +698,12 @@ class IsolationLevelDetectTest(fixtures.TestBase):
     def test_not_supported(self):
         dialect, connection = self._fixture(None)
 
-        with expect_warnings("Could not fetch transaction isolation level"):
-            assert_raises_message(
-                NotImplementedError,
-                "Can't fetch isolation",
-                dialect.get_isolation_level,
-                connection,
-            )
+        assert_raises_message(
+            NotImplementedError,
+            "Can't fetch isolation level on this particular ",
+            dialect.get_isolation_level,
+            connection,
+        )
 
 
 class InvalidTransactionFalsePositiveTest(fixtures.TablesTest):
@@ -730,3 +741,44 @@ class InvalidTransactionFalsePositiveTest(fixtures.TablesTest):
         # "Can't reconnect until invalid transaction is rolled back."
         result = connection.execute(t.select()).fetchall()
         eq_(len(result), 1)
+
+
+class IgnoreNotransOnRollbackTest(fixtures.TestBase):
+    def test_ignore_no_transaction_on_rollback(self):
+        """test #8231"""
+
+        class ProgrammingError(Exception):
+            pass
+
+        dialect = base.dialect(ignore_no_transaction_on_rollback=True)
+        dialect.dbapi = mock.Mock(ProgrammingError=ProgrammingError)
+
+        connection = mock.Mock(
+            rollback=mock.Mock(
+                side_effect=ProgrammingError("Error 111214 happened")
+            )
+        )
+        with expect_warnings(
+            "ProgrammingError 111214 'No corresponding transaction found.' "
+            "has been suppressed via ignore_no_transaction_on_rollback=True"
+        ):
+            dialect.do_rollback(connection)
+
+    def test_other_programming_error_on_rollback(self):
+        """test #8231"""
+
+        class ProgrammingError(Exception):
+            pass
+
+        dialect = base.dialect(ignore_no_transaction_on_rollback=True)
+        dialect.dbapi = mock.Mock(ProgrammingError=ProgrammingError)
+
+        connection = mock.Mock(
+            rollback=mock.Mock(
+                side_effect=ProgrammingError("Some other error happened")
+            )
+        )
+        with expect_raises_message(
+            ProgrammingError, "Some other error happened"
+        ):
+            dialect.do_rollback(connection)