--- /dev/null
+.. change::
+ :tags: oracle, usecase
+ :tickets: 5200
+
+ Implemented AUTOCOMMIT isolation level for Oracle when using cx_Oracle.
+ Also added a fixed default isolation level of READ COMMITTED for Oracle.
+
autoload=True
)
+Transaction Isolation Level / Autocommit
+----------------------------------------
+
+The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes
+of isolation, however the SQLAlchemy Oracle dialect currently only has
+explicit support for "READ COMMITTED". It is possible to emit a
+"SET TRANSACTION" statement on a connection in order to use SERIALIZABLE
+isolation, however the SQLAlchemy dialect will remain unaware of this setting,
+such as if the :meth:`.Connection.get_isolation_level` method is used;
+this method is hardcoded to return "READ COMMITTED" right now.
+
+The AUTOCOMMIT isolation level is also supported by the cx_Oracle dialect.
+
+To set using per-connection execution options::
+
+ connection = engine.connect()
+ connection = connection.execution_options(
+ isolation_level="AUTOCOMMIT"
+ )
+
+Valid values for ``isolation_level`` include:
+
+* ``READ COMMITTED``
+* ``AUTOCOMMIT``
+
+
+.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect
+ as well as the notion of a default isolation level, currently harcoded
+ to "READ COMMITTED".
+
Identifier Casing
-----------------
connection, additional_tests
)
+ _isolation_lookup = ["READ COMMITTED"]
+
+ def get_isolation_level(self, connection):
+ return "READ COMMITTED"
+
+ def set_isolation_level(self, connection, level):
+ # prior to adding AUTOCOMMIT support for cx_Oracle, the Oracle dialect
+ # had no notion of setting the isolation level. As Oracle
+ # does not have a straightforward way of getting the isolation level
+ # if a server-side transaction is not yet in progress, we currently
+ # hardcode to only support "READ COMMITTED" and "AUTOCOMMIT" at the
+ # cx_oracle level. See #5200.
+ pass
+
def has_table(self, connection, table_name, schema=None):
if not schema:
schema = self.default_schema_name
def do_recover_twophase(self, connection):
connection.info.pop("cx_oracle_prepared", None)
+ def set_isolation_level(self, connection, level):
+ if hasattr(connection, "connection"):
+ dbapi_connection = connection.connection
+ else:
+ dbapi_connection = connection
+ if level == "AUTOCOMMIT":
+ dbapi_connection.autocommit = True
+ else:
+ dbapi_connection.autocommit = False
+ super(OracleDialect_cx_oracle, self).set_isolation_level(
+ dbapi_connection, level
+ )
+
dialect = OracleDialect_cx_oracle
"""target dialect supports 'AUTOCOMMIT' as an isolation_level"""
return exclusions.closed()
+ @property
+ def isolation_level(self):
+ """target dialect supports general isolation level settings.
+
+ Note that this requirement, when enabled, also requires that
+ the get_isolation_levels() method be implemented.
+
+ """
+ return exclusions.closed()
+
+ def get_isolation_levels(self, config):
+ """Return a structure of supported isolation levels for the current
+ testing dialect.
+
+ The structure indicates to the testing suite what the expected
+ "default" isolation should be, as well as the other values that
+ are accepted. The dictionary has two keys, "default" and "supported".
+ The "supported" key refers to a list of all supported levels and
+ it should include AUTOCOMMIT if the dialect supports it.
+
+ If the :meth:`.DefaultRequirements.isolation_level` requirement is
+ not open, then this method has no return value.
+
+ E.g.::
+
+ >>> testing.requirements.get_isolation_levels()
+ {
+ "default": "READ_COMMITED",
+ "supported": [
+ "SERIALIZABLE", "READ UNCOMMITTED",
+ "READ COMMITTED", "REPEATABLE READ",
+ "AUTOCOMMIT"
+ ]
+ }
+ """
+
@property
def json_type(self):
"""target platform implements a native JSON type."""
from .. import config
from .. import eq_
from .. import fixtures
+from .. import ne_
from .. import provide_metadata
from ..config import requirements
from ..schema import Column
assert isinstance(err_str, str)
+class IsolationLevelTest(fixtures.TestBase):
+ __backend__ = True
+
+ __requires__ = ("isolation_level",)
+
+ def _get_non_default_isolation_level(self):
+ levels = requirements.get_isolation_levels(config)
+
+ default = levels["default"]
+ supported = levels["supported"]
+
+ s = set(supported).difference(["AUTOCOMMIT", default])
+ if s:
+ return s.pop()
+ else:
+ config.skip_test("no non-default isolation level available")
+
+ def test_default_isolation_level(self):
+ eq_(
+ config.db.dialect.default_isolation_level,
+ requirements.get_isolation_levels(config)["default"],
+ )
+
+ def test_non_default_isolation_level(self):
+ non_default = self._get_non_default_isolation_level()
+
+ with config.db.connect() as conn:
+ existing = conn.get_isolation_level()
+
+ ne_(existing, non_default)
+
+ conn.execution_options(isolation_level=non_default)
+
+ eq_(conn.get_isolation_level(), non_default)
+
+ conn.dialect.reset_isolation_level(conn.connection)
+
+ eq_(conn.get_isolation_level(), existing)
+
+
class AutocommitTest(fixtures.TablesTest):
run_deletes = "each"
conn = config.db.connect()
c2 = conn.execution_options(isolation_level="AUTOCOMMIT")
self._test_conn_autocommits(c2, True)
- conn.invalidate()
+
+ c2.dialect.reset_isolation_level(c2.connection)
+
self._test_conn_autocommits(conn, False)
def test_autocommit_off(self):
conn = config.db.connect()
self._test_conn_autocommits(conn, False)
+ def test_turn_autocommit_off_via_default_iso_level(self):
+ conn = config.db.connect()
+ conn.execution_options(isolation_level="AUTOCOMMIT")
+ self._test_conn_autocommits(conn, True)
+
+ conn.execution_options(
+ isolation_level=requirements.get_isolation_levels(config)[
+ "default"
+ ]
+ )
+ self._test_conn_autocommits(conn, False)
+
class EscapingTest(fixtures.TestBase):
@provide_metadata
__backend__ = True
def _default_isolation_level(self):
- if testing.against("sqlite"):
- return "SERIALIZABLE"
- elif testing.against("postgresql"):
- return "READ COMMITTED"
- elif testing.against("mysql"):
- return "REPEATABLE READ"
- elif testing.against("mssql"):
- return "READ COMMITTED"
- else:
- assert False, "default isolation level not known"
+ return testing.requires.get_isolation_levels(testing.config)["default"]
def _non_default_isolation_level(self):
- if testing.against("sqlite"):
- return "READ UNCOMMITTED"
- elif testing.against("postgresql"):
- return "SERIALIZABLE"
- elif testing.against("mysql"):
- return "SERIALIZABLE"
- elif testing.against("mssql"):
- return "SERIALIZABLE"
+ levels = testing.requires.get_isolation_levels(testing.config)
+
+ default = levels["default"]
+ supported = levels["supported"]
+
+ s = set(supported).difference(["AUTOCOMMIT", default])
+ if s:
+ return s.pop()
else:
- assert False, "non default isolation level not known"
+ assert False, "no non-default isolation level available"
def test_engine_param_stays(self):
@property
def isolation_level(self):
return only_on(
- ("postgresql", "sqlite", "mysql", "mssql"),
+ ("postgresql", "sqlite", "mysql", "mssql", "oracle"),
"DBAPI has no isolation level support",
) + fails_on(
"postgresql+pypostgresql",
"pypostgresql bombs on multiple isolation level calls",
)
+ def get_isolation_levels(self, config):
+ levels = set(config.db.dialect._isolation_lookup)
+
+ if against(config, "sqlite"):
+ default = "SERIALIZABLE"
+ elif against(config, "postgresql"):
+ default = "READ COMMITTED"
+ levels.add("AUTOCOMMIT")
+ elif against(config, "mysql"):
+ default = "REPEATABLE READ"
+ levels.add("AUTOCOMMIT")
+ elif against(config, "mssql"):
+ default = "READ COMMITTED"
+ levels.add("AUTOCOMMIT")
+ elif against(config, "oracle"):
+ default = "READ COMMITTED"
+ levels.add("AUTOCOMMIT")
+ else:
+ raise NotImplementedError()
+
+ return {"default": default, "supported": levels}
+
@property
def autocommit(self):
"""target dialect supports 'AUTOCOMMIT' as an isolation_level"""
- return only_on(
- ("postgresql", "mysql", "mssql+pyodbc", "mssql+pymssql"),
- "dialect does not support AUTOCOMMIT isolation mode",
+
+ return self.isolation_level + only_if(
+ lambda config: "AUTOCOMMIT"
+ in self.get_isolation_levels(config)["supported"]
)
@property