down polymorphic queries
- miscellaneous tickets: [ticket:940]
+
+ - session transaction management fixes:
+ - fixed bug with session transaction management. Parent transactions
+ weren't started on the connection when adding a connection to a
+ nested transaction.
+ - session.transaction now always refers to the innermost active
+ transaction, even when commit/rollback are called directly on the
+ session transaction object.
+ - when preparing a two-phase transaction fails on one connection
+ all the connections are rolled back.
+ - two phase transactions can now be prepared.
+ - session.close() didn't close all transactions when nested
+ transactions were used.
+ - rollback() previously erroneously set the current transaction
+ directly to the parent of the transaction that could be rolled back
+ to. Now it rolls back the next transaction up that can handle it, but
+ sets the current transaction to it's parent and inactivates the
+ transactions in between. Inactive transactions can only be
+ rolled back or closed, any other call results in an error. In effect
+ this means that every begin() must be accompanied by a corresponding
+ commit() or rollback(), including the implicit begin() in
+ transactional sessions.
+ - autoflush for commit() wasn't flushing for simple subtransactions.
- general
- warnings are now issued as type exceptions.SAWarning.
allows columns to map properly to result sets even
if long-name truncation kicks in [ticket:941]
+- ext
+ - Changed ext.activemapper to use a non-transactional session
+ for the objectstore as the test seem to imply that this is
+ the expected behavior.
+
0.4.2p3
------
- general
def __init__(self, session, parent=None, autoflush=True, nested=False):
self.session = session
- self.__connections = {}
- self.__parent = parent
+ self._connections = {}
+ self._parent = parent
self.autoflush = autoflush
self.nested = nested
+ self._active = True
+ self._prepared = False
+
+ is_active = property(lambda s: s.session is not None and s._active)
+
+ def _assert_is_active(self):
+ self._assert_is_open()
+ if not self._active:
+ raise exceptions.InvalidRequestError("The transaction is inactive due to a rollback in a subtransaction and should be closed")
+
+ def _assert_is_open(self):
+ if self.session is None:
+ raise exceptions.InvalidRequestError("The transaction is closed")
def connection(self, bindkey, **kwargs):
+ self._assert_is_active()
engine = self.session.get_bind(bindkey, **kwargs)
return self.get_or_add(engine)
def _begin(self, **kwargs):
+ self._assert_is_active()
return SessionTransaction(self.session, self, **kwargs)
+ def _iterate_parents(self, upto=None):
+ if self._parent is upto:
+ return (self,)
+ else:
+ if self._parent is None:
+ raise exceptions.InvalidRequestError("Transaction %s is not on the active transaction list" % upto)
+ return (self,) + self._parent._iterate_parents(upto)
+
def add(self, bind):
- if self.__parent is not None:
- return self.__parent.add(bind)
+ self._assert_is_active()
+ if self._parent is not None and not self.nested:
+ return self._parent.add(bind)
- if bind.engine in self.__connections:
+ if bind.engine in self._connections:
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or ""))
return self.get_or_add(bind)
- def _connection_dict(self):
- if self.__parent is not None and not self.nested:
- return self.__parent._connection_dict()
- else:
- return self.__connections
-
def get_or_add(self, bind):
- if self.__parent is not None:
+ self._assert_is_active()
+
+ if bind in self._connections:
+ return self._connections[bind][0]
+
+ if self._parent is not None:
+ conn = self._parent.get_or_add(bind)
if not self.nested:
- return self.__parent.get_or_add(bind)
-
- if bind in self.__connections:
- return self.__connections[bind][0]
-
- conn_dict = self.__parent._connection_dict()
- if bind in conn_dict:
- (conn, trans, autoclose) = conn_dict[bind]
- self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose)
return conn
- elif bind in self.__connections:
- return self.__connections[bind][0]
-
- if not isinstance(bind, engine.Connection):
- e = bind
- c = bind.contextual_connect()
- else:
- e = bind.engine
- c = bind
- if e in self.__connections:
- raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
- if self.nested:
- trans = c.begin_nested()
- elif self.session.twophase:
- trans = c.begin_twophase()
else:
- trans = c.begin()
- self.__connections[c] = self.__connections[e] = (c, trans, c is not bind)
- return self.__connections[c][0]
+ if isinstance(bind, engine.Connection):
+ conn = bind
+ if conn.engine in self._connections:
+ raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
+ else:
+ conn = bind.contextual_connect()
- def commit(self):
- if self.__parent is not None and not self.nested:
- return self.__parent
+ if self.session.twophase and self._parent is None:
+ transaction = conn.begin_twophase()
+ elif self.nested:
+ transaction = conn.begin_nested()
+ else:
+ transaction = conn.begin()
+
+ self._connections[conn] = self._connections[conn.engine] = (conn, transaction, conn is not bind)
+ return conn
- if self.session.extension is not None:
+ def prepare(self):
+ if self._parent is not None or not self.session.twophase:
+ raise exceptions.InvalidRequestError("Only root two phase transactions of can be prepared")
+ self._prepare_impl()
+
+ def _prepare_impl(self):
+ self._assert_is_active()
+ if self.session.extension is not None and (self._parent is None or self.nested):
self.session.extension.before_commit(self.session)
-
+
+ if self.session.transaction is not self:
+ for subtransaction in self.session.transaction._iterate_parents(upto=self):
+ subtransaction.commit()
+
if self.autoflush:
self.session.flush()
+
+ if self._parent is None and self.session.twophase:
+ try:
+ for t in util.Set(self._connections.values()):
+ t[1].prepare()
+ except:
+ for t in util.Set(self._connections.values()):
+ try:
+ t[1].rollback()
+ except:
+ pass
+ raise
+
+ self._deactivate()
+ self._prepared = True
+
+ def commit(self):
+ self._assert_is_open()
+ if not self._prepared:
+ self._prepare_impl()
+
+ if self._parent is None or self.nested:
+ for t in util.Set(self._connections.values()):
+ t[1].commit()
- if self.session.twophase:
- for t in util.Set(self.__connections.values()):
- t[1].prepare()
-
- for t in util.Set(self.__connections.values()):
- t[1].commit()
-
- if self.session.extension is not None:
- self.session.extension.after_commit(self.session)
+ if self.session.extension is not None:
+ self.session.extension.after_commit(self.session)
self.close()
- return self.__parent
+ return self._parent
def rollback(self):
- if self.__parent is not None and not self.nested:
- return self.__parent.rollback()
-
- for t in util.Set(self.__connections.values()):
+ self._assert_is_open()
+
+ if self.session.transaction is not self:
+ for subtransaction in self.session.transaction._iterate_parents(upto=self):
+ subtransaction.close()
+
+ if self.is_active:
+ for transaction in self._iterate_parents():
+ if transaction._parent is None or transaction.nested:
+ transaction._rollback_impl()
+ transaction._deactivate()
+ break
+ else:
+ transaction._deactivate()
+ self.close()
+ return self._parent
+
+ def _rollback_impl(self):
+ for t in util.Set(self._connections.values()):
t[1].rollback()
if self.session.extension is not None:
self.session.extension.after_rollback(self.session)
- self.close()
- return self.__parent
+ def _deactivate(self):
+ self._active = False
def close(self):
- if self.__parent is not None:
- return
- for t in util.Set(self.__connections.values()):
- if t[2]:
- t[0].close()
- else:
- t[1].close()
- self.session.transaction = None
+ self.session.transaction = self._parent
+ if self._parent is None:
+ for connection, transaction, autoclose in util.Set(self._connections.values()):
+ if autoclose:
+ connection.close()
+ else:
+ transaction.close()
+ self._deactivate()
+ self.session = None
+ self._connections = None
def __enter__(self):
return self
if self.transaction is None:
pass
else:
- self.transaction = self.transaction.rollback()
+ self.transaction.rollback()
# TODO: we can rollback attribute values. however
# we would want to expand attributes.py to be able to save *two* rollback points, one to the
# last flush() and the other to when the object first entered the transaction.
if self.transaction is None:
if self.transactional:
self.begin()
- self.transaction = self.transaction.commit()
else:
raise exceptions.InvalidRequestError("No transaction is begun.")
- else:
- self.transaction = self.transaction.commit()
+
+ self.transaction.commit()
if self.transaction is None and self.transactional:
self.begin()
+
+ def prepare(self):
+ """Prepare the current transaction in progress for two phase commit.
+
+ If no transaction is in progress, this method raises
+ an InvalidRequestError.
+
+ Only root transactions of two phase sessions can be prepared. If the current transaction is
+ not such, an InvalidRequestError is raised.
+ """
+ if self.transaction is None:
+ if self.transactional:
+ self.begin()
+ else:
+ raise exceptions.InvalidRequestError("No transaction is begun.")
+
+ self.transaction.prepare()
def connection(self, mapper=None, **kwargs):
"""Return a ``Connection`` corresponding to this session's
self.clear()
if self.transaction is not None:
- self.transaction.close()
+ for transaction in self.transaction._iterate_parents():
+ transaction.close()
if self.transactional:
# note this doesnt use any connection resources
self.begin()
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
-from sqlalchemy import exceptions
+from sqlalchemy import exceptions, util
from sqlalchemy.orm import *
from sqlalchemy.orm.session import SessionExtension
from sqlalchemy.orm.session import Session as SessionCls
assert len(sess.query(User).all()) == 1
sess.close()
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_nested_transaction_connection_add(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=False)
+
+ sess.begin()
+ sess.begin_nested()
+
+ u1 = User()
+ sess.save(u1)
+ sess.flush()
+
+ sess.rollback()
+
+ u2 = User()
+ sess.save(u2)
+
+ sess.commit()
+
+ self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
+
+ sess.begin()
+ sess.begin_nested()
+
+ u3 = User()
+ sess.save(u3)
+ sess.commit() # commit the nested transaction
+ sess.rollback()
+
+ self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
+
+ sess.close()
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_mixed_transaction_control(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=False)
+
+ sess.begin()
+ sess.begin_nested()
+ transaction = sess.begin()
+
+ sess.save(User())
+
+ transaction.commit()
+ sess.commit()
+ sess.commit()
+
+ sess.close()
+
+ self.assertEquals(len(sess.query(User).all()), 1)
+
+ t1 = sess.begin()
+ t2 = sess.begin_nested()
+
+ sess.save(User())
+
+ t2.commit()
+ assert sess.transaction is t1
+
+ sess.close()
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_mixed_transaction_close(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=True)
+
+ sess.begin_nested()
+
+ sess.save(User())
+ sess.flush()
+
+ sess.close()
+
+ sess.save(User())
+ sess.commit()
+
+ sess.close()
+
+ self.assertEquals(len(sess.query(User).all()), 1)
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_error_on_using_inactive_session(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=False)
+
+ try:
+ sess.begin()
+ sess.begin()
+
+ sess.save(User())
+ sess.flush()
+
+ sess.rollback()
+ sess.begin()
+ assert False
+ except exceptions.InvalidRequestError, e:
+ self.assertEquals(str(e), "The transaction is inactive due to a rollback in a subtransaction and should be closed")
+ sess.close()
+
@engines.close_open_connections
def test_bound_connection(self):
class User(object):pass