]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- parent transactions weren't started on the connection when adding a connection...
authorAnts Aasma <ants.aasma@gmail.com>
Sun, 20 Jan 2008 03:22:00 +0000 (03:22 +0000)
committerAnts Aasma <ants.aasma@gmail.com>
Sun, 20 Jan 2008 03:22:00 +0000 (03:22 +0000)
- session.transaction now always refers to the innermost active transaction, even when commit/rollback are called directly on the session transaction object.
- when preparing a two-phase transaction fails on one connection all the connections are rolled back.
- two phase transactions can now be prepared.
- session.close() didn't close all transactions when nested transactions were used.
- rollback() previously erroneously set the current transaction directly to the parent of the transaction that could be rolled back to.
- autoflush for commit() wasn't flushing for simple subtransactions.

CHANGES
doc/build/content/session.txt
lib/sqlalchemy/ext/activemapper.py
lib/sqlalchemy/orm/session.py
test/orm/session.py

diff --git a/CHANGES b/CHANGES
index 5d2d83375428473915dee32fad389d329036da8a..3b7a65d62ec5aaad51a300cc63a2742fa2c394e5 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -74,6 +74,29 @@ CHANGES
       down polymorphic queries
 
     - miscellaneous tickets:  [ticket:940]
+    
+    - session transaction management fixes:
+        - fixed bug with session transaction management. Parent transactions
+        weren't started on the connection when adding a connection to a
+        nested transaction.
+        - session.transaction now always refers to the innermost active
+        transaction, even when commit/rollback are called directly on the
+        session transaction object.
+        - when preparing a two-phase transaction fails on one connection
+        all the connections are rolled back.
+        - two phase transactions can now be prepared.
+        - session.close() didn't close all transactions when nested
+        transactions were used.
+        - rollback() previously erroneously set the current transaction
+        directly to the parent of the transaction that could be rolled back
+        to. Now it rolls back the next transaction up that can handle it, but
+        sets the current transaction to it's parent and inactivates the
+        transactions in between. Inactive transactions can only be
+        rolled back or closed, any other call results in an error. In effect
+        this means that every begin() must be accompanied by a corresponding
+        commit() or rollback(), including the implicit begin() in
+        transactional sessions.
+        - autoflush for commit() wasn't flushing for simple subtransactions.
         
 - general
     - warnings are now issued as type exceptions.SAWarning.
@@ -89,6 +112,11 @@ CHANGES
       allows columns to map properly to result sets even
       if long-name truncation kicks in [ticket:941]
 
+- ext
+    - Changed ext.activemapper to use a non-transactional session
+      for the objectstore as the test seem to imply that this is
+      the expected behavior.
+
 0.4.2p3
 ------
 - general
index 7698a1b5cf3abafa689bb4cd37aaf793c642f062..e17987cf539b19b55c7d99ea5ac496523236d79f 100644 (file)
@@ -476,7 +476,7 @@ Session also supports Python 2.5's with statement so that the example above can
         item1.foo = 'bar'
         item2.bar = 'foo'
 
-For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplished which use SAVEPOINT behavior, via the `begin_nested()` method:
+Subtransactions can be created by calling the `begin()` method repeatedly. For each transaction you `begin()` you must always call either `commit()` or `rollback()`. Note that this includes the implicit transaction created by the transactional session. When a subtransaction is created the current transaction of the session is set to that transaction. Commiting the subtransaction will return you to the next outer transaction. Rolling it back will also return you to the next outer transaction, but in addition it will roll back database state to the innermost transaction that supports rolling back to. Usually this means the root transaction, unless you use the nested transaction functionality via the `begin_nested()` method. MySQL and Postgres (and soon Oracle) support using "nested" transactions by creating SAVEPOINTs, :
 
     {python}
     Session = sessionmaker(transactional=False)
@@ -492,7 +492,7 @@ For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplis
 
     sess.commit() # commits u1 and u2
 
-Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics using the flag `twophase=True`, which coordinates transactions across multiple databases:
+Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics. This will coordinate the commiting of transactions across databases so that the transaction is either committed or rolled back in all databases. You can also `prepare()` the session for interacting with transactions not managed by SQLAlchemy. To use two phase transactions set the flag `twophase=True` on the session:
 
     {python}
     engine1 = create_engine('postgres://db1')
@@ -511,6 +511,8 @@ Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instru
     # before committing both transactions
     sess.commit()
 
+Be aware that when a crash occurs in one of the databases while the the transactions are prepared you have to manually commit or rollback the prepared transactions in your database as appropriate.
+
 ## Embedding SQL Insert/Update Expressions into a Flush {@name=flushsql}
 
 This feature allows the value of a database column to be set to a SQL expression instead of a literal value.  It's especially useful for atomic updates, calling stored procedures, etc.  All you do is assign an expression to an attribute:
index b28ada0af817171d387301770a155d840724bed0..02f4b5b356252d0b3ab11cce2fd87072fe1c7125 100644 (file)
@@ -13,7 +13,7 @@ import sys
 #
 metadata = ThreadLocalMetaData()
 Objectstore = scoped_session
-objectstore = scoped_session(sessionmaker(autoflush=True))
+objectstore = scoped_session(sessionmaker(autoflush=True, transactional=False))
 
 #
 # declarative column declaration - this is so that we can infer the colname
index e2bc71b92e36731b3eb3d62eed218c17a3ba8146..f32ec25ec24a96df4185b9d23ba0c71754981d58 100644 (file)
@@ -144,110 +144,164 @@ class SessionTransaction(object):
 
     def __init__(self, session, parent=None, autoflush=True, nested=False):
         self.session = session
-        self.__connections = {}
-        self.__parent = parent
+        self._connections = {}
+        self._parent = parent
         self.autoflush = autoflush
         self.nested = nested
+        self._active = True
+        self._prepared = False
+
+    is_active = property(lambda s: s.session is not None and s._active)
+    
+    def _assert_is_active(self):
+        self._assert_is_open()
+        if not self._active:
+            raise exceptions.InvalidRequestError("The transaction is inactive due to a rollback in a subtransaction and should be closed")
+
+    def _assert_is_open(self):
+        if self.session is None:
+            raise exceptions.InvalidRequestError("The transaction is closed")
 
     def connection(self, bindkey, **kwargs):
+        self._assert_is_active()
         engine = self.session.get_bind(bindkey, **kwargs)
         return self.get_or_add(engine)
 
     def _begin(self, **kwargs):
+        self._assert_is_active()
         return SessionTransaction(self.session, self, **kwargs)
 
+    def _iterate_parents(self, upto=None):
+        if self._parent is upto:
+            return (self,)
+        else:
+            if self._parent is None:
+                raise exceptions.InvalidRequestError("Transaction %s is not on the active transaction list" % upto)
+            return (self,) + self._parent._iterate_parents(upto)
+
     def add(self, bind):
-        if self.__parent is not None:
-            return self.__parent.add(bind)
+        self._assert_is_active()
+        if self._parent is not None and not self.nested:
+            return self._parent.add(bind)
 
-        if bind.engine in self.__connections:
+        if bind.engine in self._connections:
             raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or ""))
         return self.get_or_add(bind)
 
-    def _connection_dict(self):
-        if self.__parent is not None and not self.nested:
-            return self.__parent._connection_dict()
-        else:
-            return self.__connections
-
     def get_or_add(self, bind):
-        if self.__parent is not None:
+        self._assert_is_active()
+        
+        if bind in self._connections:
+            return self._connections[bind][0]
+        
+        if self._parent is not None:
+            conn = self._parent.get_or_add(bind)
             if not self.nested:
-                return self.__parent.get_or_add(bind)
-
-            if bind in self.__connections:
-                return self.__connections[bind][0]
-
-            conn_dict = self.__parent._connection_dict()
-            if bind in conn_dict:
-                (conn, trans, autoclose) = conn_dict[bind]
-                self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose)
                 return conn
-        elif bind in self.__connections:
-            return self.__connections[bind][0]
-
-        if not isinstance(bind, engine.Connection):
-            e = bind
-            c = bind.contextual_connect()
-        else:
-            e = bind.engine
-            c = bind
-            if e in self.__connections:
-                raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
-        if self.nested:
-            trans = c.begin_nested()
-        elif self.session.twophase:
-            trans = c.begin_twophase()
         else:
-            trans = c.begin()
-        self.__connections[c] = self.__connections[e] = (c, trans, c is not bind)
-        return self.__connections[c][0]
+            if isinstance(bind, engine.Connection):
+                conn = bind
+                if conn.engine in self._connections:
+                    raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
+            else:
+                conn = bind.contextual_connect()
 
-    def commit(self):
-        if self.__parent is not None and not self.nested:
-            return self.__parent
+        if self.session.twophase and self._parent is None:
+            transaction = conn.begin_twophase()
+        elif self.nested:
+            transaction = conn.begin_nested()
+        else:
+            transaction = conn.begin()
+        
+        self._connections[conn] = self._connections[conn.engine] = (conn, transaction, conn is not bind)
+        return conn
 
-        if self.session.extension is not None:
+    def prepare(self):
+        if self._parent is not None or not self.session.twophase:
+            raise exceptions.InvalidRequestError("Only root two phase transactions of can be prepared")
+        self._prepare_impl()
+        
+    def _prepare_impl(self):
+        self._assert_is_active()
+        if self.session.extension is not None and (self._parent is None or self.nested):
             self.session.extension.before_commit(self.session)
-
+        
+        if self.session.transaction is not self:
+            for subtransaction in self.session.transaction._iterate_parents(upto=self):
+                subtransaction.commit()
+            
         if self.autoflush:
             self.session.flush()
+        
+        if self._parent is None and self.session.twophase:
+            try:
+                for t in util.Set(self._connections.values()):
+                    t[1].prepare()
+            except:
+                for t in util.Set(self._connections.values()):
+                    try:
+                        t[1].rollback()
+                    except:
+                        pass
+                raise
+        
+        self._deactivate()
+        self._prepared = True
+    
+    def commit(self):
+        self._assert_is_open()
+        if not self._prepared:
+            self._prepare_impl()
+        
+        if self._parent is None or self.nested:
+            for t in util.Set(self._connections.values()):
+                t[1].commit()
 
-        if self.session.twophase:
-            for t in util.Set(self.__connections.values()):
-                t[1].prepare()
-
-        for t in util.Set(self.__connections.values()):
-            t[1].commit()
-
-        if self.session.extension is not None:
-            self.session.extension.after_commit(self.session)
+            if self.session.extension is not None:
+                self.session.extension.after_commit(self.session)
 
         self.close()
-        return self.__parent
+        return self._parent
 
     def rollback(self):
-        if self.__parent is not None and not self.nested:
-            return self.__parent.rollback()
-
-        for t in util.Set(self.__connections.values()):
+        self._assert_is_open()
+        
+        if self.session.transaction is not self:
+            for subtransaction in self.session.transaction._iterate_parents(upto=self):
+                subtransaction.close()
+        
+        if self.is_active:
+            for transaction in self._iterate_parents():
+                if transaction._parent is None or transaction.nested:
+                    transaction._rollback_impl()
+                    transaction._deactivate()
+                    break
+                else:
+                    transaction._deactivate()
+        self.close()
+        return self._parent
+    
+    def _rollback_impl(self):
+        for t in util.Set(self._connections.values()):
             t[1].rollback()
 
         if self.session.extension is not None:
             self.session.extension.after_rollback(self.session)
 
-        self.close()
-        return self.__parent
+    def _deactivate(self):
+        self._active = False
 
     def close(self):
-        if self.__parent is not None:
-            return
-        for t in util.Set(self.__connections.values()):
-            if t[2]:
-                t[0].close()
-            else:
-                t[1].close()
-        self.session.transaction = None
+        self.session.transaction = self._parent
+        if self._parent is None:
+            for connection, transaction, autoclose in util.Set(self._connections.values()):
+                if autoclose:
+                    connection.close()
+                else:
+                    transaction.close()
+        self._deactivate()
+        self.session = None
+        self._connections = None
 
     def __enter__(self):
         return self
@@ -458,7 +512,7 @@ class Session(object):
         if self.transaction is None:
             pass
         else:
-            self.transaction = self.transaction.rollback()
+            self.transaction.rollback()
         # TODO: we can rollback attribute values.  however
         # we would want to expand attributes.py to be able to save *two* rollback points, one to the
         # last flush() and the other to when the object first entered the transaction.
@@ -484,13 +538,29 @@ class Session(object):
         if self.transaction is None:
             if self.transactional:
                 self.begin()
-                self.transaction = self.transaction.commit()
             else:
                 raise exceptions.InvalidRequestError("No transaction is begun.")
-        else:
-            self.transaction = self.transaction.commit()
+
+        self.transaction.commit()
         if self.transaction is None and self.transactional:
             self.begin()
+    
+    def prepare(self):
+        """Prepare the current transaction in progress for two phase commit.
+
+        If no transaction is in progress, this method raises
+        an InvalidRequestError.
+
+        Only root transactions of two phase sessions can be prepared. If the current transaction is
+        not such, an InvalidRequestError is raised.
+        """
+        if self.transaction is None:
+            if self.transactional:
+                self.begin()
+            else:
+                raise exceptions.InvalidRequestError("No transaction is begun.")
+
+        self.transaction.prepare()
 
     def connection(self, mapper=None, **kwargs):
         """Return a ``Connection`` corresponding to this session's
@@ -554,7 +624,8 @@ class Session(object):
 
         self.clear()
         if self.transaction is not None:
-            self.transaction.close()
+            for transaction in self.transaction._iterate_parents():
+                transaction.close()
         if self.transactional:
             # note this doesnt use any connection resources
             self.begin()
index 0a6157e0ac8d7c654f5d710c5165e0dc91029cac..930efe07ac3e804036feb343696a98129e655b63 100644 (file)
@@ -1,6 +1,6 @@
 import testenv; testenv.configure_for_tests()
 from sqlalchemy import *
-from sqlalchemy import exceptions
+from sqlalchemy import exceptions, util
 from sqlalchemy.orm import *
 from sqlalchemy.orm.session import SessionExtension
 from sqlalchemy.orm.session import Session as SessionCls
@@ -355,6 +355,122 @@ class SessionTest(AssertMixin):
         assert len(sess.query(User).all()) == 1
         sess.close()
 
+    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+                         'oracle', 'maxdb')
+    @testing.exclude('mysql', '<', (5, 0, 3))
+    def test_nested_transaction_connection_add(self):
+        class User(object): pass
+        mapper(User, users)
+        
+        sess = create_session(transactional=False)
+        
+        sess.begin()
+        sess.begin_nested()
+        
+        u1 = User()
+        sess.save(u1)
+        sess.flush()
+        
+        sess.rollback()
+        
+        u2 = User()
+        sess.save(u2)
+        
+        sess.commit()
+        
+        self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
+        
+        sess.begin()
+        sess.begin_nested()
+        
+        u3 = User()
+        sess.save(u3)
+        sess.commit() # commit the nested transaction
+        sess.rollback()
+        
+        self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
+        
+        sess.close()
+    
+    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+                         'oracle', 'maxdb')
+    @testing.exclude('mysql', '<', (5, 0, 3))
+    def test_mixed_transaction_control(self):
+        class User(object): pass
+        mapper(User, users)
+        
+        sess = create_session(transactional=False)
+        
+        sess.begin()
+        sess.begin_nested()
+        transaction = sess.begin()
+    
+        sess.save(User())
+        
+        transaction.commit()
+        sess.commit()
+        sess.commit()
+        
+        sess.close()
+        
+        self.assertEquals(len(sess.query(User).all()), 1)
+        
+        t1 = sess.begin()
+        t2 = sess.begin_nested()
+    
+        sess.save(User())
+        
+        t2.commit()
+        assert sess.transaction is t1
+        
+        sess.close()
+    
+    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+                         'oracle', 'maxdb')
+    @testing.exclude('mysql', '<', (5, 0, 3))
+    def test_mixed_transaction_close(self):
+        class User(object): pass
+        mapper(User, users)
+        
+        sess = create_session(transactional=True)
+        
+        sess.begin_nested()
+    
+        sess.save(User())
+        sess.flush()
+        
+        sess.close()
+        
+        sess.save(User())
+        sess.commit()
+        
+        sess.close()
+        
+        self.assertEquals(len(sess.query(User).all()), 1)
+    
+    @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+                         'oracle', 'maxdb')
+    @testing.exclude('mysql', '<', (5, 0, 3))
+    def test_error_on_using_inactive_session(self):
+        class User(object): pass
+        mapper(User, users)
+        
+        sess = create_session(transactional=False)
+        
+        try:
+            sess.begin()
+            sess.begin()
+        
+            sess.save(User())
+            sess.flush()
+            
+            sess.rollback()
+            sess.begin()
+            assert False
+        except exceptions.InvalidRequestError, e:
+            self.assertEquals(str(e), "The transaction is inactive due to a rollback in a subtransaction and should be closed")
+        sess.close()
+    
     @engines.close_open_connections
     def test_bound_connection(self):
         class User(object):pass