database itself, especially if database reflection features are
to be used.
+Transaction Isolation Level
+---------------------------
+
+:func:`.create_engine` accepts an ``isolation_level``
+parameter which results in the command ``SET SESSION
+TRANSACTION ISOLATION LEVEL <level>`` being invoked for
+every new connection. Valid values for this parameter are
+``READ COMMITTED``, ``READ UNCOMMITTED``,
+``REPEATABLE READ``, and ``SERIALIZABLE``::
+
+ engine = create_engine(
+ "mysql://scott:tiger@localhost/test",
+ isolation_level="READ UNCOMMITTED"
+ )
+
+(new in 0.7.6)
+
Keys
----
_backslash_escapes = True
_server_ansiquotes = False
- def __init__(self, use_ansiquotes=None, **kwargs):
+ def __init__(self, use_ansiquotes=None, isolation_level=None, **kwargs):
default.DefaultDialect.__init__(self, **kwargs)
+ self.isolation_level = isolation_level
+
+ def on_connect(self):
+ if self.isolation_level is not None:
+ def connect(conn):
+ self.set_isolation_level(conn, self.isolation_level)
+ return connect
+ else:
+ return None
+
+ _isolation_lookup = set(['SERIALIZABLE',
+ 'READ UNCOMMITTED', 'READ COMMITTED', 'REPEATABLE READ'])
+
+ def set_isolation_level(self, connection, level):
+ level = level.replace('_', ' ')
+ if level not in self._isolation_lookup:
+ raise exc.ArgumentError(
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s" %
+ (level, self.name, ", ".join(self._isolation_lookup))
+ )
+ cursor = connection.cursor()
+ cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL %s" % level)
+ cursor.execute("COMMIT")
+ cursor.close()
+
+ def get_isolation_level(self, connection):
+ cursor = connection.cursor()
+ cursor.execute('SELECT @@tx_isolation')
+ val = cursor.fetchone()[0]
+ cursor.close()
+ return val.upper().replace("-", " ")
def do_commit(self, connection):
"""Execute a COMMIT."""
return 'SERIALIZABLE'
elif testing.against('postgresql'):
return 'READ COMMITTED'
+ elif testing.against('mysql'):
+ return "REPEATABLE READ"
else:
assert False, "default isolation level not known"
return 'READ UNCOMMITTED'
elif testing.against('postgresql'):
return 'SERIALIZABLE'
+ elif testing.against('mysql'):
+ return "SERIALIZABLE"
else:
assert False, "non default isolation level not known"
def test_engine_param_stays(self):
eng = testing_engine()
- isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
+ isolation_level = eng.dialect.get_isolation_level(
+ eng.connect().connection)
level = self._non_default_isolation_level()
ne_(isolation_level, level)
eng = testing_engine(options=dict(isolation_level=level))
eq_(
- eng.dialect.get_isolation_level(eng.connect().connection),
+ eng.dialect.get_isolation_level(
+ eng.connect().connection),
level
)
def test_default_level(self):
eng = testing_engine(options=dict())
- isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
+ isolation_level = eng.dialect.get_isolation_level(
+ eng.connect().connection)
eq_(isolation_level, self._default_isolation_level())
def test_reset_level(self):
eng = testing_engine(options=dict())
conn = eng.connect()
- eq_(eng.dialect.get_isolation_level(conn.connection), self._default_isolation_level())
+ eq_(
+ eng.dialect.get_isolation_level(conn.connection),
+ self._default_isolation_level()
+ )
- eng.dialect.set_isolation_level(conn.connection, self._non_default_isolation_level())
- eq_(eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level())
+ eng.dialect.set_isolation_level(
+ conn.connection, self._non_default_isolation_level()
+ )
+ eq_(
+ eng.dialect.get_isolation_level(conn.connection),
+ self._non_default_isolation_level()
+ )
eng.dialect.reset_isolation_level(conn.connection)
- eq_(eng.dialect.get_isolation_level(conn.connection), self._default_isolation_level())
+ eq_(
+ eng.dialect.get_isolation_level(conn.connection),
+ self._default_isolation_level()
+ )
conn.close()
def test_reset_level_with_setting(self):
- eng = testing_engine(options=dict(isolation_level=self._non_default_isolation_level()))
+ eng = testing_engine(options=dict(
+ isolation_level=
+ self._non_default_isolation_level()))
conn = eng.connect()
- eq_(eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level())
-
- eng.dialect.set_isolation_level(conn.connection, self._default_isolation_level())
- eq_(eng.dialect.get_isolation_level(conn.connection), self._default_isolation_level())
-
+ eq_(eng.dialect.get_isolation_level(conn.connection),
+ self._non_default_isolation_level())
+ eng.dialect.set_isolation_level(conn.connection,
+ self._default_isolation_level())
+ eq_(eng.dialect.get_isolation_level(conn.connection),
+ self._default_isolation_level())
eng.dialect.reset_isolation_level(conn.connection)
- eq_(eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level())
-
+ eq_(eng.dialect.get_isolation_level(conn.connection),
+ self._non_default_isolation_level())
conn.close()
def test_invalid_level(self):
exc.ArgumentError,
"Invalid value '%s' for isolation_level. "
"Valid isolation levels for %s are %s" %
- ("FOO", eng.dialect.name, ", ".join(eng.dialect._isolation_lookup)),
+ ("FOO", eng.dialect.name,
+ ", ".join(eng.dialect._isolation_lookup)),
eng.connect)
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
- eng = testing_engine(options=dict(poolclass=QueuePool, pool_size=2, max_overflow=0))
+ eng = testing_engine(options=dict(
+ poolclass=QueuePool,
+ pool_size=2, max_overflow=0))
c1 = eng.connect()
- c1 = c1.execution_options(isolation_level=self._non_default_isolation_level())
-
+ c1 = c1.execution_options(
+ isolation_level=self._non_default_isolation_level()
+ )
c2 = eng.connect()
- eq_(eng.dialect.get_isolation_level(c1.connection), self._non_default_isolation_level())
- eq_(eng.dialect.get_isolation_level(c2.connection), self._default_isolation_level())
-
+ eq_(
+ eng.dialect.get_isolation_level(c1.connection),
+ self._non_default_isolation_level()
+ )
+ eq_(
+ eng.dialect.get_isolation_level(c2.connection),
+ self._default_isolation_level()
+ )
c1.close()
c2.close()
c3 = eng.connect()
- eq_(eng.dialect.get_isolation_level(c3.connection), self._default_isolation_level())
-
+ eq_(
+ eng.dialect.get_isolation_level(c3.connection),
+ self._default_isolation_level()
+ )
c4 = eng.connect()
- eq_(eng.dialect.get_isolation_level(c4.connection), self._default_isolation_level())
+ eq_(
+ eng.dialect.get_isolation_level(c4.connection),
+ self._default_isolation_level()
+ )
c3.close()
c4.close()
r"on Connection.execution_options\(\), or "
r"per-engine using the isolation_level "
r"argument to create_engine\(\).",
- select([1]).execution_options, isolation_level=self._non_default_isolation_level()
+ select([1]).execution_options,
+ isolation_level=self._non_default_isolation_level()
)
r"To set engine-wide isolation level, "
r"use the isolation_level argument to create_engine\(\).",
create_engine,
- testing.db.url, execution_options={'isolation_level':self._non_default_isolation_level}
+ testing.db.url,
+ execution_options={'isolation_level':
+ self._non_default_isolation_level}
)