]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- execution_options() on Connection accepts
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 16 Jan 2011 22:04:07 +0000 (17:04 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 16 Jan 2011 22:04:07 +0000 (17:04 -0500)
"isolation_level" argument, sets transaction isolation
level for that connection only until returned to the
connection pool, for thsoe backends which support it
(SQLite, Postgresql) [ticket:2001]
- disallow the option on Engine (use isolation_level to create_engine()),
Executable (we don't want to check/set per statement)
- docs

CHANGES
doc/build/dialects/postgresql.rst
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/dialects/postgresql/psycopg2.py
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/pool.py
lib/sqlalchemy/sql/expression.py
test/engine/test_transaction.py

diff --git a/CHANGES b/CHANGES
index 2670b4dea5a1be95694f479d27075d735432bb0f..9d6e25c651b7ff44ede31eb94e1373e118223fdd 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,12 @@ CHANGES
     definition, using strings as column names, as an alternative
     to the creation of the index outside of the Table.
 
+  - execution_options() on Connection accepts 
+    "isolation_level" argument, sets transaction isolation
+    level for that connection only until returned to the
+    connection pool, for thsoe backends which support it
+    (SQLite, Postgresql) [ticket:2001]
+
   - A TypeDecorator of Integer can be used with a primary key
     column, and the "autoincrement" feature of various dialects
     as well as the "sqlite_autoincrement" flag will honor
index fe1fcb2f7f473db8a76a23b82c385174cdbd26d2..02e714b6a2c9cb33ff41c86fa84b32755836333b 100644 (file)
@@ -1,3 +1,5 @@
+.. _postgresql_toplevel:
+
 PostgreSQL
 ==========
 
index 9097c3a6eabbb8315b68676299e1f29ea62e0a27..4e7e114c9d7bc186cb234f64f52a40498d9e3373 100644 (file)
@@ -783,7 +783,7 @@ class PGDialect(default.DefaultDialect):
             raise exc.ArgumentError(
                 "Invalid value '%s' for isolation_level. "
                 "Valid isolation levels for %s are %s" % 
-                (self.name, level, ", ".join(self._isolation_lookup))
+                (level, self.name, ", ".join(self._isolation_lookup))
                 ) 
         cursor = connection.cursor()
         cursor.execute(
index 806ba41f8b62de45932532715bdbbf3b3e74e45b..21ce1211a144e4ba52560d6d5b76bb7a59a4cab2 100644 (file)
@@ -79,13 +79,19 @@ The psycopg2 dialect will log Postgresql NOTICE messages via the
     logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
 
 
-Per-Statement Execution Options
--------------------------------
-
-The following per-statement execution options are respected:
-
-* *stream_results* - Enable or disable usage of server side cursors for the SELECT-statement.
-  If *None* or not set, the *server_side_cursors* option of the connection is used. If
+Per-Statement/Connection Execution Options
+-------------------------------------------
+
+The following DBAPI-specific options are respected when used with 
+:meth:`.Connection.execution_options`, :meth:`.Executable.execution_options`,
+:meth:`.Query.execution_options`, in addition to those not specific to DBAPIs:
+
+* isolation_level - Set the transaction isolation level for the lifespan of a 
+  :class:`.Connection` (can only be set on a connection, not a statement or query).
+  This includes the options ``SERIALIZABLE``, ``READ COMMITTED``,
+  ``READ UNCOMMITTED`` and ``REPEATABLE READ``.
+* stream_results - Enable or disable usage of server side cursors.
+  If ``None`` or not set, the ``server_side_cursors`` option of the :class:`.Engine` is used. If
   auto-commit is enabled, the option is ignored.
 
 """
@@ -247,20 +253,20 @@ class PGDialect_psycopg2(PGDialect):
     def _isolation_lookup(self):
         extensions = __import__('psycopg2.extensions').extensions
         return {
-            'READ_COMMITTED':extensions.ISOLATION_LEVEL_READ_COMMITTED, 
-            'READ_UNCOMMITTED':extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 
-            'REPEATABLE_READ':extensions.ISOLATION_LEVEL_REPEATABLE_READ,
+            'READ COMMITTED':extensions.ISOLATION_LEVEL_READ_COMMITTED, 
+            'READ UNCOMMITTED':extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 
+            'REPEATABLE READ':extensions.ISOLATION_LEVEL_REPEATABLE_READ,
             'SERIALIZABLE':extensions.ISOLATION_LEVEL_SERIALIZABLE
         }
 
     def set_isolation_level(self, connection, level):
         try:
-            level = self._isolation_lookup[level.replace(' ', '_')]
+            level = self._isolation_lookup[level.replace('_', ' ')]
         except KeyError:
             raise exc.ArgumentError(
                 "Invalid value '%s' for isolation_level. "
                 "Valid isolation levels for %s are %s" % 
-                (self.name, level, ", ".join(self._isolation_lookup))
+                (level, self.name, ", ".join(self._isolation_lookup))
                 ) 
 
         connection.set_isolation_level(level)
index 9f1f6432582700d74a6c4622db66ddabfad66abd..32fe4e612329ea90697190591dc1a95b2a351593 100644 (file)
@@ -397,7 +397,7 @@ class SQLiteDialect(default.DefaultDialect):
             raise exc.ArgumentError(
                 "Invalid value '%s' for isolation_level. "
                 "Valid isolation levels for %s are %s" % 
-                (self.name, level, ", ".join(self._isolation_lookup))
+                (level, self.name, ", ".join(self._isolation_lookup))
                 ) 
         cursor = connection.cursor()
         cursor.execute("PRAGMA read_uncommitted = %d" % isolation_level)
index 1727e6905a10e28546dfd9cc52fb4594c2c88230..8de2e2a3a9c5a8c436d9f1dc5d4a9bc442451a04 100644 (file)
@@ -877,17 +877,45 @@ class Connection(Connectable):
         underlying resource, it is probably best to ensure that the copies
         would be discarded immediately, which is implicit if used as in::
 
-            result = connection.execution_options(stream_results=True).\
+            result = connection.execution_options(stream_results=True).\\
                                 execute(stmt)
 
-        The options are the same as those accepted by 
-        :meth:`sqlalchemy.sql.expression.Executable.execution_options`.
+        :meth:`.Connection.execution_options` accepts all options as those
+        accepted by :meth:`.Executable.execution_options`.  Additionally,
+        it includes options that are applicable only to 
+        :class:`.Connection`.
+        
+        :param isolation_level: Set the transaction isolation level for
+          the lifespan of this connection.   Valid values include
+          those string values accepted by the ``isolation_level``
+          parameter passed to :func:`.create_engine`, and are
+          database specific, including those for :ref:`sqlite_toplevel`, 
+          :ref:`postgresql_toplevel` - see those dialect's documentation
+          for further info.
+          
+          Note that this option necessarily affects the underying 
+          DBAPI connection for the lifespan of the originating 
+          :class:`.Connection`, and is not per-execution. This 
+          setting is not removed until the underying DBAPI connection 
+          is returned to the connection pool, i.e.
+          the :meth:`.Connection.close` method is called.
+
+        :param \**kw: All options accepted by :meth:`.Executable.execution_options`
+          are also accepted.
 
         """
         c = self._clone()
         c._execution_options = c._execution_options.union(opt)
+        if 'isolation_level' in opt:
+            c._set_isolation_level()
         return c
 
+    def _set_isolation_level(self):
+        self.dialect.set_isolation_level(self.connection, 
+                                self._execution_options['isolation_level'])
+        self.connection._connection_record.finalize_callback = \
+                    self.dialect.reset_isolation_level
+
     @property
     def closed(self):
         """Return True if this connection is closed."""
@@ -1724,6 +1752,13 @@ class Engine(Connectable, log.Identified):
         if proxy:
             interfaces.ConnectionProxy._adapt_listener(self, proxy)
         if execution_options:
+            if 'isolation_level' in execution_options:
+                raise exc.ArgumentError(
+                    "'isolation_level' execution option may "
+                    "only be specified on Connection.execution_options(). "
+                    "To set engine-wide isolation level, "
+                    "use the isolation_level argument to create_engine()."
+                )
             self.update_execution_options(**execution_options)
 
 
@@ -1736,7 +1771,6 @@ class Engine(Connectable, log.Identified):
         :meth:`Connection.execution_options` as well as
         :meth:`sqlalchemy.sql.expression.Executable.execution_options`.
 
-
         """
         self._execution_options = \
                 self._execution_options.union(opt)
index 23a4c6946fa153533d65187e12e02a69aec4e4eb..5150d282c5b6c163f1abf5c25fa75aca058fd6d2 100644 (file)
@@ -249,6 +249,8 @@ class Pool(log.Identified):
 
 
 class _ConnectionRecord(object):
+    finalize_callback = None
+
     def __init__(self, pool):
         self.__pool = pool
         self.connection = self.__connect()
@@ -347,6 +349,9 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo):
         if echo:
             pool.logger.debug("Connection %r being returned to pool", 
                                     connection)
+        if connection_record.finalize_callback:
+            connection_record.finalize_callback(connection)
+            del connection_record.finalize_callback 
         if pool.dispatch.checkin:
             pool.dispatch.checkin(connection, connection_record)
         pool._return_conn(connection_record)
index 6a368b8c05d6c5607afadddb0ad909d6f8ccdc33..8a9d33d5536076ca5a197a7c21832eed327aedc7 100644 (file)
@@ -2563,9 +2563,7 @@ class Executable(_Generative):
         """ Set non-SQL options for the statement which take effect during
         execution.
 
-        Current options include:
-
-        * autocommit - when True, a COMMIT will be invoked after execution 
+        :param autocommit: when True, a COMMIT will be invoked after execution 
           when executed in 'autocommit' mode, i.e. when an explicit
           transaction is not begun on the connection. Note that DBAPI
           connections by default are always in a transaction - SQLAlchemy uses
@@ -2577,15 +2575,15 @@ class Executable(_Generative):
           specific SQL construct where COMMIT is desired (typically when
           calling stored procedures and such).
 
-        * stream_results - indicate to the dialect that results should be 
+        :param stream_results: indicate to the dialect that results should be 
           "streamed" and not pre-buffered, if possible.  This is a limitation
           of many DBAPIs.  The flag is currently understood only by the
           psycopg2 dialect.
 
-        * compiled_cache - a dictionary where :class:`Compiled` objects
-          will be cached when the :class:`Connection` compiles a clause 
+        :param compiled_cache: a dictionary where :class:`.Compiled` objects
+          will be cached when the :class:`.Connection` compiles a clause 
           expression into a dialect- and parameter-specific 
-          :class:`Compiled` object.   It is the user's responsibility to
+          :class:`.Compiled` object.   It is the user's responsibility to
           manage the size of this dictionary, which will have keys
           corresponding to the dialect, clause element, the column
           names within the VALUES or SET clause of an INSERT or UPDATE, 
@@ -2595,17 +2593,33 @@ class Executable(_Generative):
 
           This option is usually more appropriate
           to use via the 
-          :meth:`sqlalchemy.engine.base.Connection.execution_options()`
-          method of :class:`Connection`, rather than upon individual 
+          :meth:`.Connection.execution_options()`
+          method of :class:`.Connection`, rather than upon individual 
           statement objects, though the effect is the same.
+          
+          Note that the ORM makes use of its own "compiled" caches for 
+          some operations, including flush operations.  The caching
+          used by the ORM internally supercedes a cache dictionary
+          specified here.
 
         See also:
 
-            :meth:`sqlalchemy.engine.base.Connection.execution_options()`
+            :meth:`.Connection.execution_options()` -
+            includes a connection-only option to specify transaction isolation
+            level.
 
-            :meth:`sqlalchemy.orm.query.Query.execution_options()`
+            :meth:`.Query.execution_options()` - applies options to the statement
+            generated by a :class:`.orm.Query` object.
 
         """
+        if 'isolation_level' in kw:
+            raise exc.ArgumentError(
+                "'isolation_level' execution option may only be specified "
+                "on Connection.execution_options(), or "
+                "per-engine using the isolation_level "
+                "argument to create_engine()."
+            )
+
         self._execution_options = self._execution_options.union(kw)
 
     def execute(self, *multiparams, **params):
index 6878fea39fa14de9d32aabe9edd732fb1ed525e1..1c94d1f5f7e1145e13498ac20436b306bb31d40c 100644 (file)
@@ -1110,6 +1110,8 @@ class ForUpdateTest(TestBase):
         self.assert_(len(errors) != 0)
 
 class IsolationLevelTest(TestBase):
+    __requires__ = ('isolation_level',)
+
     def _default_isolation_level(self):
         if testing.against('sqlite'):
             return 'SERIALIZABLE'
@@ -1126,7 +1128,6 @@ class IsolationLevelTest(TestBase):
         else:
             assert False, "non default isolation level not known"
 
-    @testing.requires.isolation_level
     def test_engine_param_stays(self):
 
         eng = create_engine(testing.db.url)
@@ -1157,13 +1158,11 @@ class IsolationLevelTest(TestBase):
         )
         conn.close()
 
-    @testing.requires.isolation_level
     def test_default_level(self):
         eng = create_engine(testing.db.url)
         isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
         eq_(isolation_level, self._default_isolation_level())
 
-    @testing.requires.isolation_level
     def test_reset_level(self):
         eng = create_engine(testing.db.url)
         conn = eng.connect()
@@ -1177,7 +1176,6 @@ class IsolationLevelTest(TestBase):
 
         conn.close()
 
-    @testing.requires.isolation_level
     def test_reset_level_with_setting(self):
         eng = create_engine(testing.db.url, isolation_level=self._non_default_isolation_level())
         conn = eng.connect()
@@ -1191,13 +1189,55 @@ class IsolationLevelTest(TestBase):
 
         conn.close()
 
-
-    @testing.requires.isolation_level
     def test_invalid_level(self):
         eng = create_engine(testing.db.url, isolation_level='FOO')
         assert_raises_message(
             exc.ArgumentError, 
                 "Invalid value '%s' for isolation_level. "
                 "Valid isolation levels for %s are %s" % 
-                (eng.dialect.name, "FOO", ", ".join(eng.dialect._isolation_lookup)),
+                ("FOO", eng.dialect.name, ", ".join(eng.dialect._isolation_lookup)),
             eng.connect)
+
+    def test_per_connection(self):
+        from sqlalchemy.pool import QueuePool
+        eng = create_engine(testing.db.url, poolclass=QueuePool, pool_size=2, max_overflow=0)
+
+        c1 = eng.connect()
+        c1 = c1.execution_options(isolation_level=self._non_default_isolation_level())
+
+        c2 = eng.connect()
+        eq_(eng.dialect.get_isolation_level(c1.connection), self._non_default_isolation_level())
+        eq_(eng.dialect.get_isolation_level(c2.connection), self._default_isolation_level())
+
+        c1.close()
+        c2.close()
+        c3 = eng.connect()
+        eq_(eng.dialect.get_isolation_level(c3.connection), self._default_isolation_level())
+
+        c4 = eng.connect()
+        eq_(eng.dialect.get_isolation_level(c4.connection), self._default_isolation_level())
+
+        c3.close()
+        c4.close()
+
+    def test_per_statement_bzzt(self):
+        assert_raises_message(
+            exc.ArgumentError,
+            r"'isolation_level' execution option may only be specified "
+            r"on Connection.execution_options\(\), or "
+            r"per-engine using the isolation_level "
+            r"argument to create_engine\(\).",
+            select([1]).execution_options, isolation_level=self._non_default_isolation_level()
+        )
+
+
+    def test_per_engine_bzzt(self):
+        assert_raises_message(
+            exc.ArgumentError,
+            r"'isolation_level' execution option may "
+            r"only be specified on Connection.execution_options\(\). "
+            r"To set engine-wide isolation level, "
+            r"use the isolation_level argument to create_engine\(\).",
+            create_engine,
+            testing.db.url, execution_options={'isolation_level':self._non_default_isolation_level}
+        )