--- /dev/null
+.. change::
+ :tags: engine, usecase
+ :tickets: 6342
+
+ Generalized the :paramref:`_sa.create_engine.isolation_level` parameter to
+ the base dialect so that it is no longer dependent on individual dialects
+ to be present. This parameter sets up the "isolation level" setting to
+ occur for all new database connections as soon as they are created by the
+ connection pool, where the value then stays set without being reset on
+ every checkin.
+
+ The :paramref:`_sa.create_engine.isolation_level` parameter is essentially
+ equivalent in functionality to using the
+ :paramref:`_engine.Engine.execution_options.isolation_level` parameter via
+ :meth:`_engine.Engine.execution_options` for an engine-wide setting. The
+ difference is in that the former setting assigns the isolation level just
+ once when a connection is created, the latter sets and resets the given
+ level on each connection checkout.
not change autocommit mode**).
SQLAlchemy dialects should support these isolation levels as well as autocommit
-to as great a degree as possible. The levels are set via family of
-"execution_options" parameters and methods that are throughout the Core, such
-as the :meth:`_engine.Connection.execution_options` method. The parameter is
-known as :paramref:`_engine.Connection.execution_options.isolation_level` and
-the values are strings which are typically a subset of the following names::
+to as great a degree as possible.
+
+Setting Isolation Level or DBAPI Autocommit for a Connection
+------------------------------------------------------------
+
+For an individual :class:`_engine.Connection` object that's acquired from
+:meth:`.Engine.connect`, the isolation level can be set for the duration of
+that :class:`_engine.Connection` object using the
+:meth:`_engine.Connection.execution_options` method. The parameter is known as
+:paramref:`_engine.Connection.execution_options.isolation_level` and the values
+are strings which are typically a subset of the following names::
# possible values for Connection.execution_options(isolation_level="<value>")
objects with different execution options, which nonetheless share the same
dialect and connection pool.
+.. note:: The :paramref:`_engine.Connection.execution_options.isolation_level`
+ parameter necessarily does not apply to statement level options, such as
+ that of :meth:`_sql.Executable.execution_options`, and will be rejected if
+ set at this level. This because the option must be set on a DBAPI connection
+ on a per-transaction basis.
+
+Setting Isolation Level or DBAPI Autocommit for an Engine
+----------------------------------------------------------
+
The :paramref:`_engine.Connection.execution_options.isolation_level` option may
-also be set engine wide, as is often preferable. This is achieved by
-passing it within the :paramref:`_sa.create_engine.execution_options`
-parameter to :func:`_sa.create_engine`::
+also be set engine wide, as is often preferable. This may be
+achieved by passing the :paramref:`_sa.create_engine.isolation_level`
+parameter to :func:`.sa.create_engine`::
+
+ from sqlalchemy import create_engine
+ eng = create_engine(
+ "postgresql://scott:tiger@localhost/test",
+ isolation_level="REPEATABLE READ"
+ )
+
+With the above setting, each new DBAPI connection the moment it's created will
+be set to use a ``"REPEATABLE READ"`` isolation level setting for all
+subsequent operations.
+
+.. _dbapi_autocommit_multiple:
+
+Maintaining Multiple Isolation Levels for a Single Engine
+----------------------------------------------------------
+
+The isolation level may also be set per engine, with a potentially greater
+level of flexibility, using either the
+:paramref:`_sa.create_engine.execution_options` parameter to
+:func:`_sa.create_engine` or the :meth:`_engine.Engine.execution_options`
+method, the latter of which will create a copy of the :class:`.Engine` that
+shares the dialect and connection pool of the original engine, but has its own
+per-connection isolation level setting::
from sqlalchemy import create_engine
With the above setting, the DBAPI connection will be set to use a
``"REPEATABLE READ"`` isolation level setting for each new transaction
-begun.
+begun; but the connection as pooled will be reset to the original isolation
+level that was present when the connection first occurred. At the level
+of :func:`_sa.create_engine`, the end effect is not any different
+from using the :paramref:`_sa.create_engine.isolation_level` parameter.
-An application that frequently chooses to run operations within different
-isolation levels may wish to create multiple "sub-engines" of a lead
+However, an application that frequently chooses to run operations within
+different isolation levels may wish to create multiple "sub-engines" of a lead
:class:`_engine.Engine`, each of which will be configured to a different
-isolation level. One such use case is an application that has operations
-that break into "transactional" and "read-only" operations, a separate
-:class:`_engine.Engine` that makes use of ``"AUTOCOMMIT"`` may be
-separated off from the main engine::
+isolation level. One such use case is an application that has operations that
+break into "transactional" and "read-only" operations, a separate
+:class:`_engine.Engine` that makes use of ``"AUTOCOMMIT"`` may be separated off
+from the main engine::
from sqlalchemy import create_engine
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
-
Above, the :meth:`_engine.Engine.execution_options` method creates a shallow
copy of the original :class:`_engine.Engine`. Both ``eng`` and
``autocommit_engine`` share the same dialect and connection pool. However, the
reverted when a connection is returned to the connection pool.
-.. note:: The :paramref:`_engine.Connection.execution_options.isolation_level`
- parameter necessarily does not apply to statement level options, such as
- that of :meth:`_sql.Executable.execution_options`. This because the option
- must be set on a DBAPI connection on a per-transaction basis.
-
.. seealso::
:ref:`SQLite Transaction Isolation <sqlite_isolation_level>`
:ref:`SQL Server Transaction Isolation <mssql_isolation_level>`
+ :ref:`Oracle Transaction Isolation <oracle_isolation_level>`
+
:ref:`session_transaction_isolation` - for the ORM
:ref:`faq_execute_retry_autocommit` - a recipe that uses DBAPI autocommit
]
)
+ def get_isolation_level_values(self, dbapi_conn):
+ return super().get_isolation_level_values(dbapi_conn) + ["AUTOCOMMIT"]
+
def set_isolation_level(self, connection, level):
# adjust for ConnectionFairy being present
# allows attribute set e.g. "connection.autocommit = True"
query_timeout=None,
use_scope_identity=True,
schema_name="dbo",
- isolation_level=None,
deprecate_large_types=None,
json_serializer=None,
json_deserializer=None,
super(MSDialect, self).__init__(**opts)
- self.isolation_level = isolation_level
self._json_serializer = json_serializer
self._json_deserializer = json_deserializer
]
)
+ def get_isolation_level_values(self, dbapi_conn):
+ return list(self._isolation_lookup)
+
def set_isolation_level(self, connection, level):
- level = level.replace("_", " ")
- if level not in self._isolation_lookup:
- raise exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (level, self.name, ", ".join(self._isolation_lookup))
- )
cursor = connection.cursor()
cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level)
cursor.close()
self._setup_version_attributes()
self._setup_supports_nvarchar_max(connection)
- def on_connect(self):
- if self.isolation_level is not None:
-
- def connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
-
- return connect
- else:
- return None
-
def _setup_version_attributes(self):
if self.server_version_info[0] not in list(range(8, 17)):
util.warn(
else:
return False
+ def get_isolation_level_values(self, dbapi_conn):
+ return super().get_isolation_level_values(dbapi_conn) + ["AUTOCOMMIT"]
+
def set_isolation_level(self, connection, level):
if level == "AUTOCOMMIT":
connection.autocommit(True)
def __init__(
self,
- isolation_level=None,
json_serializer=None,
json_deserializer=None,
is_mariadb=None,
):
kwargs.pop("use_ansiquotes", None) # legacy
default.DefaultDialect.__init__(self, **kwargs)
- self.isolation_level = isolation_level
self._json_serializer = json_serializer
self._json_deserializer = json_deserializer
self._set_mariadb(is_mariadb, None)
- def on_connect(self):
- if self.isolation_level is not None:
-
- def connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
-
- return connect
- else:
- return None
-
- _isolation_lookup = set(
- [
+ def get_isolation_level_values(self, dbapi_conn):
+ return (
"SERIALIZABLE",
"READ UNCOMMITTED",
"READ COMMITTED",
"REPEATABLE READ",
- ]
- )
-
- def set_isolation_level(self, connection, level):
- level = level.replace("_", " ")
-
- # adjust for ConnectionFairy being present
- # allows attribute set e.g. "connection.autocommit = True"
- # to work properly
- if hasattr(connection, "dbapi_connection"):
- connection = connection.dbapi_connection
-
- self._set_isolation_level(connection, level)
+ )
- def _set_isolation_level(self, connection, level):
- if level not in self._isolation_lookup:
- raise exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (level, self.name, ", ".join(self._isolation_lookup))
- )
- cursor = connection.cursor()
+ def set_isolation_level(self, dbapi_conn, level):
+ cursor = dbapi_conn.cursor()
cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL %s" % level)
cursor.execute("COMMIT")
cursor.close()
else:
return cset_name()
- _isolation_lookup = set(
- [
+ def get_isolation_level_values(self, dbapi_conn):
+ return (
"SERIALIZABLE",
"READ UNCOMMITTED",
"READ COMMITTED",
"REPEATABLE READ",
"AUTOCOMMIT",
- ]
- )
+ )
- def _set_isolation_level(self, connection, level):
+ def set_isolation_level(self, dbapi_conn, level):
if level == "AUTOCOMMIT":
- connection.autocommit(True)
+ dbapi_conn.autocommit(True)
else:
- connection.autocommit(False)
- super(MySQLDialect_mysqldb, self)._set_isolation_level(
- connection, level
+ dbapi_conn.autocommit(False)
+ super(MySQLDialect_mysqldb, self).set_isolation_level(
+ dbapi_conn, level
)
# use the default
return None
- _isolation_lookup = ["READ COMMITTED", "SERIALIZABLE"]
-
- def get_isolation_level(self, connection):
- raise NotImplementedError("implemented by cx_Oracle dialect")
+ def get_isolation_level_values(self, dbapi_conn):
+ return ["READ COMMITTED", "SERIALIZABLE"]
def get_default_isolation_level(self, dbapi_conn):
try:
except:
return "READ COMMITTED"
- def set_isolation_level(self, connection, level):
- raise NotImplementedError("implemented by cx_Oracle dialect")
-
def has_table(self, connection, table_name, schema=None):
self._ensure_has_table_connection(connection)
return result
+ def get_isolation_level_values(self, dbapi_conn):
+ return super().get_isolation_level_values(dbapi_conn) + ["AUTOCOMMIT"]
+
def set_isolation_level(self, connection, level):
if hasattr(connection, "dbapi_connection"):
dbapi_connection = connection.dbapi_connection
"SERIALIZABLE": "serializable",
}
- def set_isolation_level(self, connection, level):
- try:
- level = self._isolation_lookup[level.replace("_", " ")]
- except KeyError as err:
- util.raise_(
- exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (level, self.name, ", ".join(self._isolation_lookup))
- ),
- replace_context=err,
- )
+ def get_isolation_level_values(self, dbapi_conn):
+ return list(self._isolation_lookup)
- connection.set_isolation_level(level)
+ def set_isolation_level(self, connection, level):
+ connection.set_isolation_level(self._isolation_lookup[level])
def set_readonly(self, connection, value):
connection.readonly = value
preparer = PGIdentifierPreparer
execution_ctx_cls = PGExecutionContext
inspector = PGInspector
- isolation_level = None
implicit_returning = True
full_returning = True
_supports_create_index_concurrently = True
_supports_drop_index_concurrently = True
- def __init__(
- self,
- isolation_level=None,
- json_serializer=None,
- json_deserializer=None,
- **kwargs
- ):
+ def __init__(self, json_serializer=None, json_deserializer=None, **kwargs):
default.DefaultDialect.__init__(self, **kwargs)
- # the isolation_level parameter to the PGDialect itself is legacy.
- # still works however the execution_options method is the one that
- # is documented.
- self.isolation_level = isolation_level
self._json_deserializer = json_deserializer
self._json_serializer = json_serializer
)
self.supports_identity_columns = self.server_version_info >= (10,)
- def on_connect(self):
- if self.isolation_level is not None:
-
- def connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
-
- return connect
- else:
- return None
-
- _isolation_lookup = set(
- [
+ def get_isolation_level_values(self, dbapi_conn):
+ # note the generic dialect doesn't have AUTOCOMMIT, however
+ # all postgresql dialects should include AUTOCOMMIT.
+ return (
"SERIALIZABLE",
"READ UNCOMMITTED",
"READ COMMITTED",
"REPEATABLE READ",
- ]
- )
+ )
def set_isolation_level(self, connection, level):
- level = level.replace("_", " ")
- if level not in self._isolation_lookup:
- raise exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (level, self.name, ", ".join(self._isolation_lookup))
- )
cursor = connection.cursor()
cursor.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION "
# connection was closed normally
return "connection is closed" in str(e)
+ def get_isolation_level_values(self, dbapi_conn):
+ return (
+ "AUTOCOMMIT",
+ "READ COMMITTED",
+ "READ UNCOMMITTED",
+ "REPEATABLE READ",
+ "SERIALIZABLE",
+ )
+
def set_isolation_level(self, connection, level):
level = level.replace("_", " ")
if level == "AUTOCOMMIT":
connection.autocommit = True
- elif level in self._isolation_lookup:
+ else:
connection.autocommit = False
cursor = connection.cursor()
cursor.execute(
)
cursor.execute("COMMIT")
cursor.close()
- else:
- raise exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s or AUTOCOMMIT"
- % (level, self.name, ", ".join(self._isolation_lookup))
- )
def set_readonly(self, connection, value):
cursor = connection.cursor()
fns.append(on_connect)
- if self.isolation_level is not None:
-
- def on_connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
-
- fns.append(on_connect)
-
if self._json_deserializer:
def on_connect(conn):
"SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE,
}
- def set_isolation_level(self, connection, level):
- try:
- level = self._isolation_lookup[level.replace("_", " ")]
- except KeyError as err:
- util.raise_(
- exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (level, self.name, ", ".join(self._isolation_lookup))
- ),
- replace_context=err,
- )
+ def get_isolation_level_values(self, dbapi_conn):
+ return list(self._isolation_lookup)
- connection.set_isolation_level(level)
+ def set_isolation_level(self, connection, level):
+ connection.set_isolation_level(self._isolation_lookup[level])
def set_readonly(self, connection, value):
connection.readonly = value
fns.append(on_connect)
- if self.isolation_level is not None:
-
- def on_connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
-
- fns.append(on_connect)
-
if self.dbapi and self.use_native_uuid:
def on_connect(conn):
preparer = SQLiteIdentifierPreparer
ischema_names = ischema_names
colspecs = colspecs
- isolation_level = None
construct_arguments = [
(
)
def __init__(
self,
- isolation_level=None,
native_datetime=False,
json_serializer=None,
json_deserializer=None,
**kwargs
):
default.DefaultDialect.__init__(self, **kwargs)
- self.isolation_level = isolation_level
if _json_serializer:
json_serializer = _json_serializer
{"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
)
+ def get_isolation_level_values(self, dbapi_conn):
+ return list(self._isolation_lookup)
+
def set_isolation_level(self, connection, level):
- try:
- isolation_level = self._isolation_lookup[level.replace("_", " ")]
- except KeyError as err:
- util.raise_(
- exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (
- level,
- self.name,
- ", ".join(self._isolation_lookup),
- )
- ),
- replace_context=err,
- )
+ isolation_level = self._isolation_lookup[level]
+
cursor = connection.cursor()
cursor.execute("PRAGMA read_uncommitted = %d" % isolation_level)
cursor.close()
else:
assert False, "Unknown isolation level %s" % value
- def on_connect(self):
- if self.isolation_level is not None:
-
- def connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
-
- return connect
- else:
- return None
-
@reflection.cache
def get_schema_names(self, connection, **kw):
s = "PRAGMA database_list"
)
def on_connect(self):
- connect = super(SQLiteDialect_pysqlite, self).on_connect()
-
def regexp(a, b):
if b is None:
return None
fns = [set_regexp]
- if self.isolation_level is not None:
-
- def iso_level(conn):
- self.set_isolation_level(conn, self.isolation_level)
-
- fns.append(iso_level)
-
def connect(conn):
for fn in fns:
fn(conn)
* The logging configuration and logging_name is copied from the parent
:class:`_engine.Engine`.
- .. TODO: the below autocommit link will have a more specific ref
- for the example in an upcoming commit
-
The intent of the :meth:`_engine.Engine.execution_options` method is
to implement schemes where multiple :class:`_engine.Engine`
objects refer to the same connection pool, but are differentiated
:class:`_engine.Engine`
has a lower :term:`isolation level` setting configured or is even
transaction-disabled using "autocommit". An example of this
- configuration is at :ref:`dbapi_autocommit`.
+ configuration is at :ref:`dbapi_autocommit_multiple`.
Another example is one that
uses a custom option ``shard_id`` which is consumed by an event
dialect.reset_isolation_level(dbapi_conn)
def set_characteristic(self, dialect, dbapi_conn, value):
- dialect.set_isolation_level(dbapi_conn, value)
+ dialect._assert_and_set_isolation_level(dbapi_conn, value)
def get_characteristic(self, dialect, dbapi_conn):
return dialect.get_isolation_level(dbapi_conn)
should **always be set to True**. Some SQLAlchemy features will
fail to function properly if this flag is set to ``False``.
- :param isolation_level: this string parameter is interpreted by various
- dialects in order to affect the transaction isolation level of the
- database connection. The parameter essentially accepts some subset of
- these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE READ"``,
- ``"READ COMMITTED"``, ``"READ UNCOMMITTED"`` and ``"AUTOCOMMIT"``.
- Behavior here varies per backend, and
- individual dialects should be consulted directly.
-
- Note that the isolation level can also be set on a
- per-:class:`_engine.Connection` basis as well, using the
+ :param isolation_level: optional string name of an isolation level
+ which will be set on all new connections unconditionally.
+ Isolation levels are typically some subset of the string names
+ ``"SERIALIZABLE"``, ``"REPEATABLE READ"``,
+ ``"READ COMMITTED"``, ``"READ UNCOMMITTED"`` and ``"AUTOCOMMIT"``
+ based on backend.
+
+ The :paramref:`_sa.create_engine.isolation_level` parameter is
+ in contrast to the
:paramref:`.Connection.execution_options.isolation_level`
- feature.
+ execution option, which may be set on an individual
+ :class:`.Connection`, as well as the same parameter passed to
+ :meth:`.Engine.execution_options`, where it may be used to create
+ multiple engines with different isolation levels that share a common
+ connection pool and dialect.
+
+ .. versionchanged:: 2.0 The
+ :paramref:`_sa.create_engine.isolation_level`
+ parameter has been generalized to work on all dialects which support
+ the concept of isolation level, and is provided as a more succinct,
+ up front configuration switch in contrast to the execution option
+ which is more of an ad-hoc programmatic option.
.. seealso::
- :attr:`_engine.Connection.default_isolation_level`
- - view default level
-
- :paramref:`.Connection.execution_options.isolation_level`
- - set per :class:`_engine.Connection` isolation level
-
- :ref:`SQLite Transaction Isolation <sqlite_isolation_level>`
-
- :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>`
-
- :ref:`MySQL Transaction Isolation <mysql_isolation_level>`
-
- :ref:`session_transaction_isolation` - for the ORM
+ :ref:`dbapi_autocommit`
:param json_deserializer: for dialects that support the
:class:`_types.JSON`
event.listen(pool, "connect", on_connect)
+ builtin_on_connect = dialect._builtin_onconnect()
+ if builtin_on_connect:
+ event.listen(pool, "connect", builtin_on_connect)
+
def first_connect(dbapi_connection, connection_record):
c = base.Connection(
engine,
self,
encoding="utf-8",
paramstyle=None,
+ isolation_level=None,
dbapi=None,
implicit_returning=None,
supports_native_boolean=None,
**kwargs
):
- if not getattr(self, "ported_sqla_06", True):
- util.warn(
- "The %s dialect is not yet ported to the 0.6 format"
- % self.name
- )
-
if server_side_cursors:
if not self.supports_server_side_cursors:
raise exc.ArgumentError(
self.implicit_returning = implicit_returning
self.positional = self.paramstyle in ("qmark", "format", "numeric")
self.identifier_preparer = self.preparer(self)
+ self._on_connect_isolation_level = isolation_level
self.type_compiler = self.type_compiler(self)
if supports_native_boolean is not None:
self.supports_native_boolean = supports_native_boolean
except ImportError:
pass
+ def _builtin_onconnect(self):
+ if self._on_connect_isolation_level is not None:
+
+ def builtin_connect(dbapi_conn, conn_rec):
+ self._assert_and_set_isolation_level(
+ dbapi_conn, self._on_connect_isolation_level
+ )
+
+ return builtin_connect
+ else:
+ return None
+
def initialize(self, connection):
try:
self.server_version_info = self._get_server_version_info(
def is_disconnect(self, e, connection, cursor):
return False
+ @util.memoized_instancemethod
+ def _gen_allowed_isolation_levels(self, dbapi_conn):
+
+ try:
+ raw_levels = list(self.get_isolation_level_values(dbapi_conn))
+ except NotImplementedError:
+ return None
+ else:
+ normalized_levels = [
+ level.replace("_", " ").upper() for level in raw_levels
+ ]
+ if raw_levels != normalized_levels:
+ raise ValueError(
+ f"Dialect {self.name!r} get_isolation_level_values() "
+ f"method should return names as UPPERCASE using spaces, "
+ f"not underscores; got "
+ f"{sorted(set(raw_levels).difference(normalized_levels))}"
+ )
+ return tuple(normalized_levels)
+
+ def _assert_and_set_isolation_level(self, dbapi_conn, level):
+ level = level.replace("_", " ").upper()
+
+ _allowed_isolation_levels = self._gen_allowed_isolation_levels(
+ dbapi_conn
+ )
+ if (
+ _allowed_isolation_levels
+ and level not in _allowed_isolation_levels
+ ):
+ raise exc.ArgumentError(
+ f"Invalid value {level!r} for isolation_level. "
+ f"Valid isolation levels for {self.name!r} are "
+ f"{', '.join(_allowed_isolation_levels)}"
+ )
+
+ self.set_isolation_level(dbapi_conn, level)
+
def reset_isolation_level(self, dbapi_conn):
# default_isolation_level is read from the first connection
# after the initial set of 'isolation_level', if any, so is
# the configured default of this dialect.
- self.set_isolation_level(dbapi_conn, self.default_isolation_level)
+ self._assert_and_set_isolation_level(
+ dbapi_conn, self.default_isolation_level
+ )
def normalize_name(self, name):
if name is None:
isolation level facilities; these APIs should be preferred for
most typical use cases.
+ If the dialect also implements the
+ :meth:`.Dialect.get_isolation_level_values` method, then the given
+ level is guaranteed to be one of the string names within that sequence,
+ and the method will not need to anticipate a lookup failure.
+
.. seealso::
:meth:`_engine.Connection.get_isolation_level`
"""
raise NotImplementedError()
+ def get_isolation_level_values(self, dbapi_conn):
+ """return a sequence of string isolation level names that are accepted
+ by this dialect.
+
+ The available names should use the following conventions:
+
+ * use UPPERCASE names. isolation level methods will accept lowercase
+ names but these are normalized into UPPERCASE before being passed
+ along to the dialect.
+ * separate words should be separated by spaces, not underscores, e.g.
+ ``REPEATABLE READ``. isolation level names will have underscores
+ converted to spaces before being passed along to the dialect.
+ * The names for the four standard isolation names to the extent that
+ they are supported by the backend should be ``READ UNCOMMITTED``
+ ``READ COMMITTED``, ``REPEATABLE READ``, ``SERIALIZABLE``
+ * if the dialect supports an autocommit option it should be provided
+ using the isolation level name ``AUTOCOMMIT``.
+ * Other isolation modes may also be present, provided that they
+ are named in UPPERCASE and use spaces not underscores.
+
+ This function is used so that the default dialect can check that
+ a given isolation level parameter is valid, else raises an
+ :class:`_exc.ArgumentError`.
+
+ A DBAPI connection is passed to the method, in the unlikely event that
+ the dialect needs to interrogate the connection itself to determine
+ this list, however it is expected that most backends will return
+ a hardcoded list of values. If the dialect supports "AUTOCOMMIT",
+ that value should also be present in the sequence returned.
+
+ The method raises ``NotImplementedError`` by default. If a dialect
+ does not implement this method, then the default dialect will not
+ perform any checking on a given isolation level value before passing
+ it onto the :meth:`.Dialect.set_isolation_level` method. This is
+ to allow backwards-compatibility with third party dialects that may
+ not yet be implementing this method.
+
+ .. versionadded:: 2.0
+
+ """
+ raise NotImplementedError()
+
@classmethod
def get_dialect_cls(cls, url):
"""Given a URL, return the :class:`.Dialect` that will be used.
ec.error = err
success = True
if msg is not None:
- assert re.search(
- msg, util.text_type(err), re.UNICODE
- ), "%r !~ %s" % (msg, err)
+ # I'm often pdbing here, and "err" above isn't
+ # in scope, so assign the string explicitly
+ error_as_string = util.text_type(err)
+ assert re.search(msg, error_as_string, re.UNICODE), "%r !~ %s" % (
+ msg,
+ error_as_string,
+ )
if check_context and not are_we_already_in_a_traceback:
_assert_proper_exception_context(err)
print(util.text_type(err).encode("utf-8"))
from . import exclusions
from . import only_on
+from .. import create_engine
from .. import util
from ..pool import QueuePool
]
}
"""
+ with config.db.connect() as conn:
+
+ try:
+ supported = conn.dialect.get_isolation_level_values(
+ conn.connection.dbapi_connection
+ )
+ except NotImplementedError:
+ return None
+ else:
+ return {
+ "default": conn.dialect.default_isolation_level,
+ "supported": supported,
+ }
+
+ @property
+ def get_isolation_level_values(self):
+ """target dialect supports the
+ :meth:`_engine.Dialect.get_isolation_level_values`
+ method added in SQLAlchemy 2.0.
+
+ """
+
+ def go(config):
+ with config.db.connect() as conn:
+ try:
+ conn.dialect.get_isolation_level_values(
+ conn.connection.dbapi_connection
+ )
+ except NotImplementedError:
+ return False
+ else:
+ return True
+
+ return exclusions.only_if(go)
+
+ @property
+ def dialect_level_isolation_level_param(self):
+ """test that the dialect allows the 'isolation_level' argument
+ to be handled by DefaultDialect"""
+
+ def go(config):
+ try:
+ e = create_engine(
+ config.db.url, isolation_level="READ COMMITTED"
+ )
+ except:
+ return False
+ else:
+ return (
+ e.dialect._on_connect_isolation_level == "READ COMMITTED"
+ )
+
+ return exclusions.only_if(go)
@property
def json_type(self):
from .. import fixtures
from .. import ne_
from .. import provide_metadata
+from ..assertions import expect_raises_message
from ..config import requirements
from ..provision import set_default_schema_on_connection
from ..schema import Column
levels["default"],
)
+ @testing.requires.get_isolation_level_values
+ def test_invalid_level_execution_option(self, connection_no_trans):
+ """test for the new get_isolation_level_values() method"""
+
+ connection = connection_no_trans
+ with expect_raises_message(
+ exc.ArgumentError,
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for '%s' are %s"
+ % (
+ "FOO",
+ connection.dialect.name,
+ ", ".join(
+ requirements.get_isolation_levels(config)["supported"]
+ ),
+ ),
+ ):
+ connection.execution_options(isolation_level="FOO")
+
+ @testing.requires.get_isolation_level_values
+ @testing.requires.dialect_level_isolation_level_param
+ def test_invalid_level_engine_param(self, testing_engine):
+ """test for the new get_isolation_level_values() method
+ and support for the dialect-level 'isolation_level' parameter.
+
+ """
+
+ eng = testing_engine(options=dict(isolation_level="FOO"))
+ with expect_raises_message(
+ exc.ArgumentError,
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for '%s' are %s"
+ % (
+ "FOO",
+ eng.dialect.name,
+ ", ".join(
+ requirements.get_isolation_levels(config)["supported"]
+ ),
+ ),
+ ):
+ eng.connect()
+
class AutocommitIsolationTest(fixtures.TablesTest):
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
-from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.types import Boolean
from sqlalchemy.types import Date
from sqlalchemy.types import DateTime
)
)
- @testing.only_on("sqlite+pysqlite")
- def test_isolation_level_message(self):
- # needs to test that all three words are present and we also
- # dont want to default all isolation level messages to use
- # sorted(), so rely on python 3.7 for ordering of keywords
- # in the message
- with expect_raises_message(
- exc.ArgumentError,
- "Invalid value 'invalid' for "
- "isolation_level. Valid isolation levels for "
- "sqlite are READ UNCOMMITTED, SERIALIZABLE, AUTOCOMMIT",
- ):
- with testing.db.connect() as conn:
- conn.execution_options(isolation_level="invalid")
-
@testing.only_on("sqlite+pysqlcipher")
def test_pysqlcipher_connects(self):
"""test #6586"""
class IsolationLevelTest(fixtures.TestBase):
+ """see also sqlalchemy/testing/suite/test_dialect.py::IsolationLevelTest"""
+
__requires__ = (
"isolation_level",
"ad_hoc_engines",
else:
assert False, "no non-default isolation level available"
- @testing.requires.legacy_isolation_level
def test_engine_param_stays(self):
eng = testing_engine()
conn.close()
- @testing.requires.legacy_isolation_level
def test_reset_level_with_setting(self):
eng = testing_engine(
options=dict(isolation_level=self._non_default_isolation_level())
)
conn.close()
- @testing.requires.legacy_isolation_level
+ def test_underscore_replacement(self, connection_no_trans):
+ conn = connection_no_trans
+ with mock.patch.object(
+ conn.dialect, "set_isolation_level"
+ ) as mock_sil, mock.patch.object(
+ conn.dialect,
+ "_gen_allowed_isolation_levels",
+ mock.Mock(return_value=["READ COMMITTED", "REPEATABLE READ"]),
+ ):
+ conn.execution_options(isolation_level="REPEATABLE_READ")
+ dbapi_conn = conn.connection.dbapi_connection
+
+ eq_(mock_sil.mock_calls, [mock.call(dbapi_conn, "REPEATABLE READ")])
+
+ def test_casing_replacement(self, connection_no_trans):
+ conn = connection_no_trans
+ with mock.patch.object(
+ conn.dialect, "set_isolation_level"
+ ) as mock_sil, mock.patch.object(
+ conn.dialect,
+ "_gen_allowed_isolation_levels",
+ mock.Mock(return_value=["READ COMMITTED", "REPEATABLE READ"]),
+ ):
+ conn.execution_options(isolation_level="repeatable_read")
+ dbapi_conn = conn.connection.dbapi_connection
+
+ eq_(mock_sil.mock_calls, [mock.call(dbapi_conn, "REPEATABLE READ")])
+
+ def test_dialect_doesnt_follow_naming_guidelines(
+ self, connection_no_trans
+ ):
+ conn = connection_no_trans
+
+ conn.dialect.__dict__.pop("_gen_allowed_isolation_levels", None)
+ with mock.patch.object(
+ conn.dialect,
+ "get_isolation_level_values",
+ mock.Mock(
+ return_value=[
+ "READ COMMITTED",
+ "REPEATABLE_READ",
+ "serializable",
+ ]
+ ),
+ ):
+ with expect_raises_message(
+ ValueError,
+ f"Dialect {conn.dialect.name!r} "
+ r"get_isolation_level_values\(\) method "
+ r"should "
+ r"return names as UPPERCASE using spaces, not underscores; "
+ r"got \['REPEATABLE_READ', 'serializable'\]",
+ ):
+ conn.execution_options(isolation_level="READ COMMITTED")
+
def test_invalid_level_engine_param(self):
eng = testing_engine(options=dict(isolation_level="FOO"))
assert_raises_message(
exc.ArgumentError,
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (
- "FOO",
- eng.dialect.name,
- ", ".join(eng.dialect._isolation_lookup),
- ),
+ f"Invalid value 'FOO' for isolation_level. "
+ f"Valid isolation levels for {eng.dialect.name!r} are "
+ f"""{', '.join(
+ testing.requires.get_isolation_levels(
+ testing.config
+ )['supported']
+ )}""",
eng.connect,
)
- # TODO: all the dialects seem to be manually raising ArgumentError
- # individually within their set_isolation_level() methods, when this
- # should be a default dialect feature so that
- # error messaging etc. is consistent, including that it works for 3rd
- # party dialects.
- # TODO: barring that, at least implement this for the Oracle dialect
- @testing.fails_on(
- "oracle",
- "cx_oracle dialect doesnt have argument error here, "
- "raises it via the DB rejecting it",
- )
def test_invalid_level_execution_option(self):
eng = testing_engine(
options=dict(execution_options=dict(isolation_level="FOO"))
)
assert_raises_message(
exc.ArgumentError,
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (
- "FOO",
- eng.dialect.name,
- ", ".join(eng.dialect._isolation_lookup),
- ),
+ f"Invalid value 'FOO' for isolation_level. "
+ f"Valid isolation levels for {eng.dialect.name!r} are "
+ f"""{', '.join(
+ testing.requires.get_isolation_levels(
+ testing.config
+ )['supported']
+ )}""",
eng.connect,
)
"DBAPI has no isolation level support",
)
- @property
- def legacy_isolation_level(self):
- # refers dialects where "isolation_level" can be passed to
- # create_engine
- return only_on(
- ("postgresql", "sqlite", "mysql", "mariadb", "mssql"),
- "DBAPI has no isolation level support",
- )
-
- def get_isolation_levels(self, config):
- levels = set(config.db.dialect._isolation_lookup)
-
- if against(config, "sqlite"):
- default = "SERIALIZABLE"
- levels.add("AUTOCOMMIT")
- elif against(config, "postgresql"):
- default = "READ COMMITTED"
- levels.add("AUTOCOMMIT")
- elif against(config, "mysql"):
- default = "REPEATABLE READ"
- levels.add("AUTOCOMMIT")
- elif against(config, "mariadb"):
- default = "REPEATABLE READ"
- levels.add("AUTOCOMMIT")
- elif against(config, "mssql"):
- default = "READ COMMITTED"
- levels.add("AUTOCOMMIT")
- elif against(config, "oracle"):
- default = "READ COMMITTED"
- levels.add("AUTOCOMMIT")
- else:
- raise NotImplementedError()
-
- return {"default": default, "supported": levels}
-
@property
def autocommit(self):
"""target dialect supports 'AUTOCOMMIT' as an isolation_level"""