definition, using strings as column names, as an alternative
to the creation of the index outside of the Table.
+ - execution_options() on Connection accepts
+ "isolation_level" argument, sets transaction isolation
+ level for that connection only until returned to the
+ connection pool, for thsoe backends which support it
+ (SQLite, Postgresql) [ticket:2001]
+
- A TypeDecorator of Integer can be used with a primary key
column, and the "autoincrement" feature of various dialects
as well as the "sqlite_autoincrement" flag will honor
+.. _postgresql_toplevel:
+
PostgreSQL
==========
raise exc.ArgumentError(
"Invalid value '%s' for isolation_level. "
"Valid isolation levels for %s are %s" %
- (self.name, level, ", ".join(self._isolation_lookup))
+ (level, self.name, ", ".join(self._isolation_lookup))
)
cursor = connection.cursor()
cursor.execute(
logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
-Per-Statement Execution Options
--------------------------------
-
-The following per-statement execution options are respected:
-
-* *stream_results* - Enable or disable usage of server side cursors for the SELECT-statement.
- If *None* or not set, the *server_side_cursors* option of the connection is used. If
+Per-Statement/Connection Execution Options
+-------------------------------------------
+
+The following DBAPI-specific options are respected when used with
+:meth:`.Connection.execution_options`, :meth:`.Executable.execution_options`,
+:meth:`.Query.execution_options`, in addition to those not specific to DBAPIs:
+
+* isolation_level - Set the transaction isolation level for the lifespan of a
+ :class:`.Connection` (can only be set on a connection, not a statement or query).
+ This includes the options ``SERIALIZABLE``, ``READ COMMITTED``,
+ ``READ UNCOMMITTED`` and ``REPEATABLE READ``.
+* stream_results - Enable or disable usage of server side cursors.
+ If ``None`` or not set, the ``server_side_cursors`` option of the :class:`.Engine` is used. If
auto-commit is enabled, the option is ignored.
"""
def _isolation_lookup(self):
extensions = __import__('psycopg2.extensions').extensions
return {
- 'READ_COMMITTED':extensions.ISOLATION_LEVEL_READ_COMMITTED,
- 'READ_UNCOMMITTED':extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
- 'REPEATABLE_READ':extensions.ISOLATION_LEVEL_REPEATABLE_READ,
+ 'READ COMMITTED':extensions.ISOLATION_LEVEL_READ_COMMITTED,
+ 'READ UNCOMMITTED':extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
+ 'REPEATABLE READ':extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE':extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def set_isolation_level(self, connection, level):
try:
- level = self._isolation_lookup[level.replace(' ', '_')]
+ level = self._isolation_lookup[level.replace('_', ' ')]
except KeyError:
raise exc.ArgumentError(
"Invalid value '%s' for isolation_level. "
"Valid isolation levels for %s are %s" %
- (self.name, level, ", ".join(self._isolation_lookup))
+ (level, self.name, ", ".join(self._isolation_lookup))
)
connection.set_isolation_level(level)
raise exc.ArgumentError(
"Invalid value '%s' for isolation_level. "
"Valid isolation levels for %s are %s" %
- (self.name, level, ", ".join(self._isolation_lookup))
+ (level, self.name, ", ".join(self._isolation_lookup))
)
cursor = connection.cursor()
cursor.execute("PRAGMA read_uncommitted = %d" % isolation_level)
underlying resource, it is probably best to ensure that the copies
would be discarded immediately, which is implicit if used as in::
- result = connection.execution_options(stream_results=True).\
+ result = connection.execution_options(stream_results=True).\\
execute(stmt)
- The options are the same as those accepted by
- :meth:`sqlalchemy.sql.expression.Executable.execution_options`.
+ :meth:`.Connection.execution_options` accepts all options as those
+ accepted by :meth:`.Executable.execution_options`. Additionally,
+ it includes options that are applicable only to
+ :class:`.Connection`.
+
+ :param isolation_level: Set the transaction isolation level for
+ the lifespan of this connection. Valid values include
+ those string values accepted by the ``isolation_level``
+ parameter passed to :func:`.create_engine`, and are
+ database specific, including those for :ref:`sqlite_toplevel`,
+ :ref:`postgresql_toplevel` - see those dialect's documentation
+ for further info.
+
+ Note that this option necessarily affects the underying
+ DBAPI connection for the lifespan of the originating
+ :class:`.Connection`, and is not per-execution. This
+ setting is not removed until the underying DBAPI connection
+ is returned to the connection pool, i.e.
+ the :meth:`.Connection.close` method is called.
+
+ :param \**kw: All options accepted by :meth:`.Executable.execution_options`
+ are also accepted.
"""
c = self._clone()
c._execution_options = c._execution_options.union(opt)
+ if 'isolation_level' in opt:
+ c._set_isolation_level()
return c
+ def _set_isolation_level(self):
+ self.dialect.set_isolation_level(self.connection,
+ self._execution_options['isolation_level'])
+ self.connection._connection_record.finalize_callback = \
+ self.dialect.reset_isolation_level
+
@property
def closed(self):
"""Return True if this connection is closed."""
if proxy:
interfaces.ConnectionProxy._adapt_listener(self, proxy)
if execution_options:
+ if 'isolation_level' in execution_options:
+ raise exc.ArgumentError(
+ "'isolation_level' execution option may "
+ "only be specified on Connection.execution_options(). "
+ "To set engine-wide isolation level, "
+ "use the isolation_level argument to create_engine()."
+ )
self.update_execution_options(**execution_options)
:meth:`Connection.execution_options` as well as
:meth:`sqlalchemy.sql.expression.Executable.execution_options`.
-
"""
self._execution_options = \
self._execution_options.union(opt)
class _ConnectionRecord(object):
+ finalize_callback = None
+
def __init__(self, pool):
self.__pool = pool
self.connection = self.__connect()
if echo:
pool.logger.debug("Connection %r being returned to pool",
connection)
+ if connection_record.finalize_callback:
+ connection_record.finalize_callback(connection)
+ del connection_record.finalize_callback
if pool.dispatch.checkin:
pool.dispatch.checkin(connection, connection_record)
pool._return_conn(connection_record)
""" Set non-SQL options for the statement which take effect during
execution.
- Current options include:
-
- * autocommit - when True, a COMMIT will be invoked after execution
+ :param autocommit: when True, a COMMIT will be invoked after execution
when executed in 'autocommit' mode, i.e. when an explicit
transaction is not begun on the connection. Note that DBAPI
connections by default are always in a transaction - SQLAlchemy uses
specific SQL construct where COMMIT is desired (typically when
calling stored procedures and such).
- * stream_results - indicate to the dialect that results should be
+ :param stream_results: indicate to the dialect that results should be
"streamed" and not pre-buffered, if possible. This is a limitation
of many DBAPIs. The flag is currently understood only by the
psycopg2 dialect.
- * compiled_cache - a dictionary where :class:`Compiled` objects
- will be cached when the :class:`Connection` compiles a clause
+ :param compiled_cache: a dictionary where :class:`.Compiled` objects
+ will be cached when the :class:`.Connection` compiles a clause
expression into a dialect- and parameter-specific
- :class:`Compiled` object. It is the user's responsibility to
+ :class:`.Compiled` object. It is the user's responsibility to
manage the size of this dictionary, which will have keys
corresponding to the dialect, clause element, the column
names within the VALUES or SET clause of an INSERT or UPDATE,
This option is usually more appropriate
to use via the
- :meth:`sqlalchemy.engine.base.Connection.execution_options()`
- method of :class:`Connection`, rather than upon individual
+ :meth:`.Connection.execution_options()`
+ method of :class:`.Connection`, rather than upon individual
statement objects, though the effect is the same.
+
+ Note that the ORM makes use of its own "compiled" caches for
+ some operations, including flush operations. The caching
+ used by the ORM internally supercedes a cache dictionary
+ specified here.
See also:
- :meth:`sqlalchemy.engine.base.Connection.execution_options()`
+ :meth:`.Connection.execution_options()` -
+ includes a connection-only option to specify transaction isolation
+ level.
- :meth:`sqlalchemy.orm.query.Query.execution_options()`
+ :meth:`.Query.execution_options()` - applies options to the statement
+ generated by a :class:`.orm.Query` object.
"""
+ if 'isolation_level' in kw:
+ raise exc.ArgumentError(
+ "'isolation_level' execution option may only be specified "
+ "on Connection.execution_options(), or "
+ "per-engine using the isolation_level "
+ "argument to create_engine()."
+ )
+
self._execution_options = self._execution_options.union(kw)
def execute(self, *multiparams, **params):
self.assert_(len(errors) != 0)
class IsolationLevelTest(TestBase):
+ __requires__ = ('isolation_level',)
+
def _default_isolation_level(self):
if testing.against('sqlite'):
return 'SERIALIZABLE'
else:
assert False, "non default isolation level not known"
- @testing.requires.isolation_level
def test_engine_param_stays(self):
eng = create_engine(testing.db.url)
)
conn.close()
- @testing.requires.isolation_level
def test_default_level(self):
eng = create_engine(testing.db.url)
isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
eq_(isolation_level, self._default_isolation_level())
- @testing.requires.isolation_level
def test_reset_level(self):
eng = create_engine(testing.db.url)
conn = eng.connect()
conn.close()
- @testing.requires.isolation_level
def test_reset_level_with_setting(self):
eng = create_engine(testing.db.url, isolation_level=self._non_default_isolation_level())
conn = eng.connect()
conn.close()
-
- @testing.requires.isolation_level
def test_invalid_level(self):
eng = create_engine(testing.db.url, isolation_level='FOO')
assert_raises_message(
exc.ArgumentError,
"Invalid value '%s' for isolation_level. "
"Valid isolation levels for %s are %s" %
- (eng.dialect.name, "FOO", ", ".join(eng.dialect._isolation_lookup)),
+ ("FOO", eng.dialect.name, ", ".join(eng.dialect._isolation_lookup)),
eng.connect)
+
+ def test_per_connection(self):
+ from sqlalchemy.pool import QueuePool
+ eng = create_engine(testing.db.url, poolclass=QueuePool, pool_size=2, max_overflow=0)
+
+ c1 = eng.connect()
+ c1 = c1.execution_options(isolation_level=self._non_default_isolation_level())
+
+ c2 = eng.connect()
+ eq_(eng.dialect.get_isolation_level(c1.connection), self._non_default_isolation_level())
+ eq_(eng.dialect.get_isolation_level(c2.connection), self._default_isolation_level())
+
+ c1.close()
+ c2.close()
+ c3 = eng.connect()
+ eq_(eng.dialect.get_isolation_level(c3.connection), self._default_isolation_level())
+
+ c4 = eng.connect()
+ eq_(eng.dialect.get_isolation_level(c4.connection), self._default_isolation_level())
+
+ c3.close()
+ c4.close()
+
+ def test_per_statement_bzzt(self):
+ assert_raises_message(
+ exc.ArgumentError,
+ r"'isolation_level' execution option may only be specified "
+ r"on Connection.execution_options\(\), or "
+ r"per-engine using the isolation_level "
+ r"argument to create_engine\(\).",
+ select([1]).execution_options, isolation_level=self._non_default_isolation_level()
+ )
+
+
+ def test_per_engine_bzzt(self):
+ assert_raises_message(
+ exc.ArgumentError,
+ r"'isolation_level' execution option may "
+ r"only be specified on Connection.execution_options\(\). "
+ r"To set engine-wide isolation level, "
+ r"use the isolation_level argument to create_engine\(\).",
+ create_engine,
+ testing.db.url, execution_options={'isolation_level':self._non_default_isolation_level}
+ )