]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Added support for create_engine(isolation_level=...); postgres &
authorJason Kirtland <jek@discorporate.us>
Tue, 31 Mar 2009 19:49:13 +0000 (19:49 +0000)
committerJason Kirtland <jek@discorporate.us>
Tue, 31 Mar 2009 19:49:13 +0000 (19:49 +0000)
  sqlite initially [ticket:443]
- Dialects gained visit_pool
- Pools gained a first_connect event
Patch from Adam Lowry.  Thank you Adam!

06CHANGES
lib/sqlalchemy/dialects/postgres/base.py
lib/sqlalchemy/dialects/postgres/psycopg2.py
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/strategies.py
lib/sqlalchemy/interfaces.py
lib/sqlalchemy/pool.py
test/dialect/postgres.py
test/engine/pool.py

index d8a48846363ce5b57e3d9d160d72181405fb7834..70834b35c13040318c315e005bf2e4b79334ceeb 100644 (file)
--- a/06CHANGES
+++ b/06CHANGES
@@ -7,11 +7,18 @@
       There is no implicit fallback onto "fetch".   Failure of evaluation is based
       on the structure of criteria, so success/failure is deterministic based on 
       code structure.
-    
+
+- engines
+    - transaction isolation level may be specified with
+      create_engine(... isolation_level="..."); available on
+      postgresql and sqlite. [ticket:443]
+
 - dialect refactor
     - server_version_info becomes a static attribute.
-    - create_engine() now establishes an initial connection immediately upon
-      creation, which is passed to the dialect to determine connection properties.
+    - dialects receive an initialize() event on initial connection to
+      determine connection properties.
+    - dialects receive a visit_pool event have an opportunity to
+      establish pool listeners.
     - cached TypeEngine classes are cached per-dialect class instead of per-dialect.
     
 - mysql
index 7ab1ac7a483dad587429c89d0fc8c51cb7dc81cb..c541cff95ef13d23433ec11eaf6fcb4278600b76 100644 (file)
@@ -445,7 +445,25 @@ class PGDialect(default.DefaultDialect):
     preparer = PGIdentifierPreparer
     defaultrunner = PGDefaultRunner
     inspector = PGInspector
-
+    isolation_level = None
+
+    def __init__(self, isolation_level=None, **kwargs):
+        default.DefaultDialect.__init__(self, **kwargs)
+        self.isolation_level = isolation_level
+
+    def visit_pool(self, pool):
+        if self.isolation_level is not None:
+            class SetIsolationLevel(object):
+                def __init__(self, isolation_level):
+                    self.isolation_level = isolation_level
+
+                def connect(self, conn, rec):
+                    cursor = conn.cursor()
+                    cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL %s"
+                                   % self.isolation_level)
+                    cursor.execute("COMMIT")
+                    cursor.close()
+            pool.add_listener(SetIsolationLevel(self.isolation_level))
 
     def do_begin_twophase(self, connection, xid):
         self.do_begin(connection.connection)
index 2a3eef47f00f0299a92a674875f479ab55ae2325..c865c9decfbe1b8e7f1556d1b648302f238773a2 100644 (file)
@@ -24,6 +24,10 @@ psycopg2-specific keyword arguments which are accepted by :func:`~sqlalchemy.cre
   uses special row-buffering behavior when this feature is enabled, such that groups of 100 rows 
   at a time are fetched over the wire to reduce conversational overhead.
 
+* *isolation_level* - Sets the transaction isolation level for each transaction
+  within the engine. Valid isolation levels are `READ_COMMITTED`,
+  `READ_UNCOMMITTED`, `REPEATABLE_READ`, and `SERIALIZABLE`.
+
 Transactions
 ------------
 
@@ -116,7 +120,6 @@ class Postgres_psycopg2(PGDialect):
         }
     )
 
-    
     def __init__(self, server_side_cursors=False, **kwargs):
         PGDialect.__init__(self, **kwargs)
         self.server_side_cursors = server_side_cursors
@@ -145,4 +148,4 @@ class Postgres_psycopg2(PGDialect):
             return False
 
 dialect = Postgres_psycopg2
-    
\ No newline at end of file
+    
index fb5e6c22182bf1c6a90c047e7c5ab9c183cb4448..a884221bd32d5c8a98bb790de9a1160fc19cb956 100644 (file)
@@ -390,6 +390,9 @@ class Dialect(object):
 
         raise NotImplementedError()
 
+    def visit_pool(self, pool):
+        """Executed after a pool is created."""
+
 
 class ExecutionContext(object):
     """A messenger object for a Dialect that corresponds to a single execution.
index 6b2e2a6834b962c977fc5a640a3e8c44c7deca30..d938c644f508ea35f8d9630fb2ee5456c408b42d 100644 (file)
@@ -102,7 +102,10 @@ class DefaultDialect(base.Dialect):
     def validate_identifier(self, ident):
         if len(ident) > self.max_identifier_length:
             raise exc.IdentifierError("Identifier '%s' exceeds maximum length of %d characters" % (ident, self.max_identifier_length))
-        
+
+    def connect(self, *cargs, **cparams):
+        return self.dbapi.connect(*cargs, **cparams)
+
     def do_begin(self, connection):
         """Implementations might want to put logic here for turning
         autocommit on/off, etc.
index d31f7855457671a324f8fa23f82601ccf9542ce4..c57acf242f9ab172cad4387783eb2a22b8176a0b 100644 (file)
@@ -13,7 +13,6 @@ from operator import attrgetter
 from sqlalchemy.engine import base, threadlocal, url
 from sqlalchemy import util, exc
 from sqlalchemy import pool as poollib
-from sqlalchemy import interfaces
 
 strategies = {}
 
@@ -75,7 +74,7 @@ class DefaultEngineStrategy(EngineStrategy):
         if pool is None:
             def connect():
                 try:
-                    return dbapi.connect(*cargs, **cparams)
+                    return dialect.connect(*cargs, **cparams)
                 except Exception, e:
                     import sys
                     raise exc.DBAPIError.instance(None, None, e), None, sys.exc_info()[2]
@@ -126,13 +125,14 @@ class DefaultEngineStrategy(EngineStrategy):
         engine = engineclass(pool, dialect, u, **engine_args)
 
         if _initialize:
-            class OnInit(interfaces.PoolListener):
-                def connect(self, conn, rec):
+            class OnInit(object):
+                def first_connect(self, conn, rec):
                     c = base.Connection(engine, connection=conn)
                     dialect.initialize(c)
-                    pool._on_connect.remove(self)
-            pool._on_connect.insert(0, OnInit())
-        
+            pool._on_first_connect.insert(0, OnInit())
+
+        dialect.visit_pool(pool)
+
         return engine
 
     def pool_threadlocal(self):
index dfceffe4451a0440d6d6b89a9a8c3adce9e4fb7f..e4a9adee1fb6351d5100632ff45d22341c82cf69 100644 (file)
@@ -71,6 +71,18 @@ class PoolListener(object):
 
         """
 
+    def first_connect(self, dbapi_con, con_record):
+        """Called exactly once for the first DB-API connection.
+
+        dbapi_con
+          A newly connected raw DB-API connection (not a SQLAlchemy
+          ``Connection`` wrapper).
+
+        con_record
+          The ``_ConnectionRecord`` that persistently manages the connection
+
+        """
+
     def checkout(self, dbapi_con, con_record, con_proxy):
         """Called when a connection is retrieved from the Pool.
 
index aca4663774d990adfc9462b5d133aef13149b3b7..c39b4b225bf29be9ca61bad09852a702c4f64856 100644 (file)
@@ -108,6 +108,7 @@ class Pool(object):
         self.echo = echo
         self.listeners = []
         self._on_connect = []
+        self._on_first_connect = []
         self._on_checkout = []
         self._on_checkin = []
 
@@ -178,12 +179,14 @@ class Pool(object):
 
         """
 
-        listener = as_interface(
-            listener, methods=('connect', 'checkout', 'checkin'))
+        listener = as_interface(listener,
+            methods=('connect', 'first_connect', 'checkout', 'checkin'))
 
         self.listeners.append(listener)
         if hasattr(listener, 'connect'):
             self._on_connect.append(listener)
+        if hasattr(listener, 'first_connect'):
+            self._on_first_connect.append(listener)
         if hasattr(listener, 'checkout'):
             self._on_checkout.append(listener)
         if hasattr(listener, 'checkin'):
@@ -197,6 +200,10 @@ class _ConnectionRecord(object):
         self.__pool = pool
         self.connection = self.__connect()
         self.info = {}
+        ls = pool.__dict__.pop('_on_first_connect', None)
+        if ls is not None:
+            for l in ls:
+                l.first_connect(self.connection, self)
         if pool._on_connect:
             for l in pool._on_connect:
                 l.connect(self.connection, self)
index bb4a10e605a2be049ec8a81a31aaa38f815fae31..b25270f3a9931af332ce9dcb204d4f2fab2f4195 100644 (file)
@@ -708,6 +708,20 @@ class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
             warnings.warn = capture_warnings._orig_showwarning
             m1.drop_all()
 
+    def test_set_isolation_level(self):
+        """Test setting the isolation level with create_engine"""
+        eng = create_engine(testing.db.url)
+        self.assertEquals(
+            eng.execute("show transaction isolation level").scalar(),
+            'read committed')
+        eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE")
+        self.assertEquals(
+            eng.execute("show transaction isolation level").scalar(),
+            'serializable')
+        eng = create_engine(testing.db.url, isolation_level="FOO")
+        self.assertRaises(eng.dialect.dbapi.ProgrammingError, eng.execute,
+            "show transaction isolation level")
+
 
 class TimezoneTest(TestBase, AssertsExecutionResults):
     """Test timezone-aware datetimes.
index b712e24128d9fdff472b45b59dcbda3571e29000..fdac6c0d85a51daf5d574b796ad8290acf4ec92e 100644 (file)
@@ -1,8 +1,8 @@
 import testenv; testenv.configure_for_tests()
 import threading, time, gc
-from sqlalchemy import pool, interfaces
+from sqlalchemy import pool, interfaces, create_engine
 import testlib.sa as tsa
-from testlib import TestBase
+from testlib import TestBase, testing
 
 
 mcid = 1
@@ -164,6 +164,8 @@ class PoolTest(PoolTestBase):
             def __init__(self):
                 if hasattr(self, 'connect'):
                     self.connect = self.inst_connect
+                if hasattr(self, 'first_connect'):
+                    self.first_connect = self.inst_first_connect
                 if hasattr(self, 'checkout'):
                     self.checkout = self.inst_checkout
                 if hasattr(self, 'checkin'):
@@ -171,14 +173,17 @@ class PoolTest(PoolTestBase):
                 self.clear()
             def clear(self):
                 self.connected = []
+                self.first_connected = []
                 self.checked_out = []
                 self.checked_in = []
-            def assert_total(innerself, conn, cout, cin):
+            def assert_total(innerself, conn, fconn, cout, cin):
                 self.assert_(len(innerself.connected) == conn)
+                self.assert_(len(innerself.first_connected) == fconn)
                 self.assert_(len(innerself.checked_out) == cout)
                 self.assert_(len(innerself.checked_in) == cin)
-            def assert_in(innerself, item, in_conn, in_cout, in_cin):
+            def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin):
                 self.assert_((item in innerself.connected) == in_conn)
+                self.assert_((item in innerself.first_connected) == in_fconn)
                 self.assert_((item in innerself.checked_out) == in_cout)
                 self.assert_((item in innerself.checked_in) == in_cin)
             def inst_connect(self, con, record):
@@ -186,6 +191,11 @@ class PoolTest(PoolTestBase):
                 assert con is not None
                 assert record is not None
                 self.connected.append(con)
+            def inst_first_connect(self, con, record):
+                print "first_connect(%s, %s)" % (con, record)
+                assert con is not None
+                assert record is not None
+                self.first_connected.append(con)
             def inst_checkout(self, con, record, proxy):
                 print "checkout(%s, %s, %s)" % (con, record, proxy)
                 assert con is not None
@@ -203,6 +213,9 @@ class PoolTest(PoolTestBase):
         class ListenConnect(InstrumentingListener):
             def connect(self, con, record):
                 pass
+        class ListenFirstConnect(InstrumentingListener):
+            def first_connect(self, con, record):
+                pass
         class ListenCheckOut(InstrumentingListener):
             def checkout(self, con, record, proxy, num):
                 pass
@@ -214,40 +227,44 @@ class PoolTest(PoolTestBase):
             return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
                                   use_threadlocal=False, **kw)
 
-        def assert_listeners(p, total, conn, cout, cin):
+        def assert_listeners(p, total, conn, fconn, cout, cin):
             for instance in (p, p.recreate()):
                 self.assert_(len(instance.listeners) == total)
                 self.assert_(len(instance._on_connect) == conn)
+                self.assert_(len(instance._on_first_connect) == fconn)
                 self.assert_(len(instance._on_checkout) == cout)
                 self.assert_(len(instance._on_checkin) == cin)
 
         p = _pool()
-        assert_listeners(p, 0, 0, 0, 0)
+        assert_listeners(p, 0, 0, 0, 0, 0)
 
         p.add_listener(ListenAll())
-        assert_listeners(p, 1, 1, 1, 1)
+        assert_listeners(p, 1, 1, 1, 1, 1)
 
         p.add_listener(ListenConnect())
-        assert_listeners(p, 2, 2, 1, 1)
+        assert_listeners(p, 2, 2, 1, 1, 1)
+
+        p.add_listener(ListenFirstConnect())
+        assert_listeners(p, 3, 2, 2, 1, 1)
 
         p.add_listener(ListenCheckOut())
-        assert_listeners(p, 3, 2, 2, 1)
+        assert_listeners(p, 4, 2, 2, 2, 1)
 
         p.add_listener(ListenCheckIn())
-        assert_listeners(p, 4, 2, 2, 2)
+        assert_listeners(p, 5, 2, 2, 2, 2)
         del p
 
         print "----"
         snoop = ListenAll()
         p = _pool(listeners=[snoop])
-        assert_listeners(p, 1, 1, 1, 1)
+        assert_listeners(p, 1, 1, 1, 1, 1)
 
         c = p.connect()
-        snoop.assert_total(1, 1, 0)
+        snoop.assert_total(1, 1, 1, 0)
         cc = c.connection
-        snoop.assert_in(cc, True, True, False)
+        snoop.assert_in(cc, True, True, True, False)
         c.close()
-        snoop.assert_in(cc, True, True, True)
+        snoop.assert_in(cc, True, True, True, True)
         del c, cc
 
         snoop.clear()
@@ -255,10 +272,10 @@ class PoolTest(PoolTestBase):
         # this one depends on immediate gc
         c = p.connect()
         cc = c.connection
-        snoop.assert_in(cc, False, True, False)
-        snoop.assert_total(0, 1, 0)
+        snoop.assert_in(cc, False, False, True, False)
+        snoop.assert_total(0, 0, 1, 0)
         del c, cc
-        snoop.assert_total(0, 1, 1)
+        snoop.assert_total(0, 0, 1, 1)
 
         p.dispose()
         snoop.clear()
@@ -266,44 +283,44 @@ class PoolTest(PoolTestBase):
         c = p.connect()
         c.close()
         c = p.connect()
-        snoop.assert_total(1, 2, 1)
+        snoop.assert_total(1, 0, 2, 1)
         c.close()
-        snoop.assert_total(1, 2, 2)
+        snoop.assert_total(1, 0, 2, 2)
 
         # invalidation
         p.dispose()
         snoop.clear()
 
         c = p.connect()
-        snoop.assert_total(1, 1, 0)
+        snoop.assert_total(1, 0, 1, 0)
         c.invalidate()
-        snoop.assert_total(1, 1, 1)
+        snoop.assert_total(1, 0, 1, 1)
         c.close()
-        snoop.assert_total(1, 1, 1)
+        snoop.assert_total(1, 0, 1, 1)
         del c
-        snoop.assert_total(1, 1, 1)
+        snoop.assert_total(1, 0, 1, 1)
         c = p.connect()
-        snoop.assert_total(2, 2, 1)
+        snoop.assert_total(2, 0, 2, 1)
         c.close()
         del c
-        snoop.assert_total(2, 2, 2)
+        snoop.assert_total(2, 0, 2, 2)
 
         # detached
         p.dispose()
         snoop.clear()
 
         c = p.connect()
-        snoop.assert_total(1, 1, 0)
+        snoop.assert_total(1, 0, 1, 0)
         c.detach()
-        snoop.assert_total(1, 1, 0)
+        snoop.assert_total(1, 0, 1, 0)
         c.close()
         del c
-        snoop.assert_total(1, 1, 0)
+        snoop.assert_total(1, 0, 1, 0)
         c = p.connect()
-        snoop.assert_total(2, 2, 0)
+        snoop.assert_total(2, 0, 2, 0)
         c.close()
         del c
-        snoop.assert_total(2, 2, 1)
+        snoop.assert_total(2, 0, 2, 1)
 
     def test_listeners_callables(self):
         dbapi = MockDBAPI()
@@ -362,6 +379,18 @@ class PoolTest(PoolTestBase):
         c.close()
         assert counts == [1, 2, 3]
 
+    def test_listener_after_oninit(self):
+        """Test that listeners are called after OnInit is removed"""
+        called = []
+        def listener(*args):
+            called.append(True)
+        listener.connect = listener
+        engine = create_engine(testing.db.url)
+        engine.pool.add_listener(listener)
+        engine.execute('select 1')
+        assert called, "Listener not called on connect"
+
+
 class QueuePoolTest(PoolTestBase):
 
    def testqueuepool_del(self):