Fixed issue where the SQL Server dialect's query for the current isolation
level would fail on Azure Synapse Analytics, due to the way in which this
database handles transaction rollbacks after an error has occurred. The
initial query has been modified to no longer rely upon catching an error
when attempting to detect the appropriate system view. Additionally, to
better support this database's very specific "rollback" behavior,
implemented new parameter ``ignore_no_transaction_on_rollback`` indicating
that a rollback should ignore Azure Synapse error 'No corresponding
transaction found. (111214)', which is raised if no transaction is present
in conflict with the Python DBAPI.
Fixes: #8231
Closes: #8233
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/8233
Pull-request-sha:
c48bd44a9f53d00e5e94f1b8bf996711b6419562
Change-Id: I6407a03148f45cc9eba8fe1d31d4f59ebf9c7ef7
(cherry picked from commit
8fe3cd69c5f2d8f73e75fb19ae929273282fba57)
--- /dev/null
+.. change::
+ :tags: bug, mssql
+ :tickets: 8231
+
+ Fixed issue where the SQL Server dialect's query for the current isolation
+ level would fail on Azure Synapse Analytics, due to the way in which this
+ database handles transaction rollbacks after an error has occurred. The
+ initial query has been modified to no longer rely upon catching an error
+ when attempting to detect the appropriate system view. Additionally, to
+ better support this database's very specific "rollback" behavior,
+ implemented new parameter ``ignore_no_transaction_on_rollback`` indicating
+ that a rollback should ignore Azure Synapse error 'No corresponding
+ transaction found. (111214)', which is raised if no transaction is present
+ in conflict with the Python DBAPI.
+
+ Initial patch and valuable debugging assistance courtesy of @ww2406.
+
+ .. seealso::
+
+ :ref:`azure_synapse_ignore_no_transaction_on_rollback`
json_serializer=None,
json_deserializer=None,
legacy_schema_aliasing=None,
+ ignore_no_transaction_on_rollback=False,
**opts
):
self.query_timeout = int(query_timeout or 0)
self.use_scope_identity = use_scope_identity
self.deprecate_large_types = deprecate_large_types
+ self.ignore_no_transaction_on_rollback = (
+ ignore_no_transaction_on_rollback
+ )
if legacy_schema_aliasing is not None:
util.warn_deprecated(
# SQL Server does not support RELEASE SAVEPOINT
pass
+ def do_rollback(self, dbapi_connection):
+ try:
+ super(MSDialect, self).do_rollback(dbapi_connection)
+ except self.dbapi.ProgrammingError as e:
+ if self.ignore_no_transaction_on_rollback and re.match(
+ r".*\b111214\b", str(e)
+ ):
+ util.warn(
+ "ProgrammingError 111214 "
+ "'No corresponding transaction found.' "
+ "has been suppressed via "
+ "ignore_no_transaction_on_rollback=True"
+ )
+ else:
+ raise
+
_isolation_lookup = set(
[
"SERIALIZABLE",
if level == "SNAPSHOT":
connection.commit()
- def get_isolation_level(self, connection):
- last_error = None
+ def get_isolation_level(self, dbapi_connection):
+ cursor = dbapi_connection.cursor()
+ try:
+ cursor.execute(
+ "SELECT name FROM sys.system_views WHERE name IN "
+ "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
+ )
+ row = cursor.fetchone()
+ if not row:
+ raise NotImplementedError(
+ "Can't fetch isolation level on this particular "
+ "SQL Server version."
+ )
- views = ("sys.dm_exec_sessions", "sys.dm_pdw_nodes_exec_sessions")
- for view in views:
- cursor = connection.cursor()
- try:
- cursor.execute(
- """
- SELECT CASE transaction_isolation_level
+ view_name = "sys.{}".format(row[0])
+ cursor.execute(
+ """
+ SELECT CASE transaction_isolation_level
WHEN 0 THEN NULL
WHEN 1 THEN 'READ UNCOMMITTED'
WHEN 2 THEN 'READ COMMITTED'
WHEN 3 THEN 'REPEATABLE READ'
WHEN 4 THEN 'SERIALIZABLE'
WHEN 5 THEN 'SNAPSHOT' END AS TRANSACTION_ISOLATION_LEVEL
- FROM %s
+ FROM {}
where session_id = @@SPID
- """
- % view
+ """.format(
+ view_name
)
- val = cursor.fetchone()[0]
- except self.dbapi.Error as err:
- # Python3 scoping rules
- last_error = err
- continue
- else:
- return val.upper()
- finally:
- cursor.close()
- else:
- # note that the NotImplementedError is caught by
- # DefaultDialect, so the warning here is all that displays
- util.warn(
- "Could not fetch transaction isolation level, "
- "tried views: %s; final error was: %s" % (views, last_error)
- )
- raise NotImplementedError(
- "Can't fetch isolation level on this particular "
- "SQL Server version. tried views: %s; final error was: %s"
- % (views, last_error)
)
+ row = cursor.fetchone()
+ assert row is not None
+ val = row[0]
+ finally:
+ cursor.close()
+ return val.upper()
def initialize(self, connection):
super(MSDialect, self).initialize(connection)
stating that a connection string when using an access token must not contain
``UID``, ``PWD``, ``Authentication`` or ``Trusted_Connection`` parameters.
+.. _azure_synapse_ignore_no_transaction_on_rollback:
+
+Avoiding transaction-related exceptions on Azure Synapse Analytics
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Azure Synapse Analytics has a significant difference in its transaction
+handling compared to plain SQL Server; in some cases an error within a Synapse
+transaction can cause it to be arbitrarily terminated on the server side, which
+then causes the DBAPI ``.rollback()`` method (as well as ``.commit()``) to
+fail. The issue prevents the usual DBAPI contract of allowing ``.rollback()``
+to pass silently if no transaction is present as the driver does not expect
+this condition. The symptom of this failure is an exception with a message
+resembling 'No corresponding transaction found. (111214)' when attempting to
+emit a ``.rollback()`` after an operation had a failure of some kind.
+
+This specific case can be handled by passing ``ignore_no_transaction_on_rollback=True`` to
+the SQL Server dialect via the :func:`_sa.create_engine` function as follows::
+
+ engine = create_engine(connection_url, ignore_no_transaction_on_rollback=True)
+
+Using the above parameter, the dialect will catch ``ProgrammingError``
+exceptions raised during ``connection.rollback()`` and emit a warning
+if the error message contains code ``111214``, however will not raise
+an exception.
+
+.. versionadded:: 1.4.40 Added the
+ ``ignore_no_transaction_on_rollback=True`` parameter.
+
Enable autocommit for Azure SQL Data Warehouse (DW) connections
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# -*- encoding: utf-8
from decimal import Decimal
+import re
from sqlalchemy import Column
from sqlalchemy import event
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
class IsolationLevelDetectTest(fixtures.TestBase):
- def _fixture(self, view):
+ def _fixture(self, view_result):
class Error(Exception):
pass
def fail_on_exec(
stmt,
):
- if view is not None and view in stmt:
+ result[:] = []
+ if "SELECT name FROM sys.system_views" in stmt:
+ if view_result:
+ result.append((view_result,))
+ elif re.match(
+ ".*SELECT CASE transaction_isolation_level.*FROM sys.%s"
+ % (view_result,),
+ stmt,
+ re.S,
+ ):
result.append(("SERIALIZABLE",))
else:
- raise Error("that didn't work")
+ assert False
connection = Mock(
cursor=Mock(
return_value=Mock(
- execute=fail_on_exec, fetchone=lambda: result[0]
+ execute=fail_on_exec,
+ fetchone=lambda: result[0] if result else None,
)
)
)
def test_not_supported(self):
dialect, connection = self._fixture(None)
- with expect_warnings("Could not fetch transaction isolation level"):
- assert_raises_message(
- NotImplementedError,
- "Can't fetch isolation",
- dialect.get_isolation_level,
- connection,
- )
+ assert_raises_message(
+ NotImplementedError,
+ "Can't fetch isolation level on this particular ",
+ dialect.get_isolation_level,
+ connection,
+ )
class InvalidTransactionFalsePositiveTest(fixtures.TablesTest):
# "Can't reconnect until invalid transaction is rolled back."
result = connection.execute(t.select()).fetchall()
eq_(len(result), 1)
+
+
+class IgnoreNotransOnRollbackTest(fixtures.TestBase):
+ def test_ignore_no_transaction_on_rollback(self):
+ """test #8231"""
+
+ class ProgrammingError(Exception):
+ pass
+
+ dialect = base.dialect(ignore_no_transaction_on_rollback=True)
+ dialect.dbapi = mock.Mock(ProgrammingError=ProgrammingError)
+
+ connection = mock.Mock(
+ rollback=mock.Mock(
+ side_effect=ProgrammingError("Error 111214 happened")
+ )
+ )
+ with expect_warnings(
+ "ProgrammingError 111214 'No corresponding transaction found.' "
+ "has been suppressed via ignore_no_transaction_on_rollback=True"
+ ):
+ dialect.do_rollback(connection)
+
+ def test_other_programming_error_on_rollback(self):
+ """test #8231"""
+
+ class ProgrammingError(Exception):
+ pass
+
+ dialect = base.dialect(ignore_no_transaction_on_rollback=True)
+ dialect.dbapi = mock.Mock(ProgrammingError=ProgrammingError)
+
+ connection = mock.Mock(
+ rollback=mock.Mock(
+ side_effect=ProgrammingError("Some other error happened")
+ )
+ )
+ with expect_raises_message(
+ ProgrammingError, "Some other error happened"
+ ):
+ dialect.do_rollback(connection)