From: Ants Aasma Date: Sun, 20 Jan 2008 03:22:00 +0000 (+0000) Subject: - parent transactions weren't started on the connection when adding a connection... X-Git-Tag: rel_0_4_3~94 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=9f366afdda4b508eb4ef3e626da2fec98ad04773;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - parent transactions weren't started on the connection when adding a connection to a nested session transaction. - session.transaction now always refers to the innermost active transaction, even when commit/rollback are called directly on the session transaction object. - when preparing a two-phase transaction fails on one connection all the connections are rolled back. - two phase transactions can now be prepared. - session.close() didn't close all transactions when nested transactions were used. - rollback() previously erroneously set the current transaction directly to the parent of the transaction that could be rolled back to. - autoflush for commit() wasn't flushing for simple subtransactions. --- diff --git a/CHANGES b/CHANGES index 5d2d833754..3b7a65d62e 100644 --- a/CHANGES +++ b/CHANGES @@ -74,6 +74,29 @@ CHANGES down polymorphic queries - miscellaneous tickets: [ticket:940] + + - session transaction management fixes: + - fixed bug with session transaction management. Parent transactions + weren't started on the connection when adding a connection to a + nested transaction. + - session.transaction now always refers to the innermost active + transaction, even when commit/rollback are called directly on the + session transaction object. + - when preparing a two-phase transaction fails on one connection + all the connections are rolled back. + - two phase transactions can now be prepared. + - session.close() didn't close all transactions when nested + transactions were used. + - rollback() previously erroneously set the current transaction + directly to the parent of the transaction that could be rolled back + to. Now it rolls back the next transaction up that can handle it, but + sets the current transaction to it's parent and inactivates the + transactions in between. Inactive transactions can only be + rolled back or closed, any other call results in an error. In effect + this means that every begin() must be accompanied by a corresponding + commit() or rollback(), including the implicit begin() in + transactional sessions. + - autoflush for commit() wasn't flushing for simple subtransactions. - general - warnings are now issued as type exceptions.SAWarning. @@ -89,6 +112,11 @@ CHANGES allows columns to map properly to result sets even if long-name truncation kicks in [ticket:941] +- ext + - Changed ext.activemapper to use a non-transactional session + for the objectstore as the test seem to imply that this is + the expected behavior. + 0.4.2p3 ------ - general diff --git a/doc/build/content/session.txt b/doc/build/content/session.txt index 7698a1b5cf..e17987cf53 100644 --- a/doc/build/content/session.txt +++ b/doc/build/content/session.txt @@ -476,7 +476,7 @@ Session also supports Python 2.5's with statement so that the example above can item1.foo = 'bar' item2.bar = 'foo' -For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplished which use SAVEPOINT behavior, via the `begin_nested()` method: +Subtransactions can be created by calling the `begin()` method repeatedly. For each transaction you `begin()` you must always call either `commit()` or `rollback()`. Note that this includes the implicit transaction created by the transactional session. When a subtransaction is created the current transaction of the session is set to that transaction. Commiting the subtransaction will return you to the next outer transaction. Rolling it back will also return you to the next outer transaction, but in addition it will roll back database state to the innermost transaction that supports rolling back to. Usually this means the root transaction, unless you use the nested transaction functionality via the `begin_nested()` method. MySQL and Postgres (and soon Oracle) support using "nested" transactions by creating SAVEPOINTs, : {python} Session = sessionmaker(transactional=False) @@ -492,7 +492,7 @@ For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplis sess.commit() # commits u1 and u2 -Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics using the flag `twophase=True`, which coordinates transactions across multiple databases: +Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics. This will coordinate the commiting of transactions across databases so that the transaction is either committed or rolled back in all databases. You can also `prepare()` the session for interacting with transactions not managed by SQLAlchemy. To use two phase transactions set the flag `twophase=True` on the session: {python} engine1 = create_engine('postgres://db1') @@ -511,6 +511,8 @@ Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instru # before committing both transactions sess.commit() +Be aware that when a crash occurs in one of the databases while the the transactions are prepared you have to manually commit or rollback the prepared transactions in your database as appropriate. + ## Embedding SQL Insert/Update Expressions into a Flush {@name=flushsql} This feature allows the value of a database column to be set to a SQL expression instead of a literal value. It's especially useful for atomic updates, calling stored procedures, etc. All you do is assign an expression to an attribute: diff --git a/lib/sqlalchemy/ext/activemapper.py b/lib/sqlalchemy/ext/activemapper.py index b28ada0af8..02f4b5b356 100644 --- a/lib/sqlalchemy/ext/activemapper.py +++ b/lib/sqlalchemy/ext/activemapper.py @@ -13,7 +13,7 @@ import sys # metadata = ThreadLocalMetaData() Objectstore = scoped_session -objectstore = scoped_session(sessionmaker(autoflush=True)) +objectstore = scoped_session(sessionmaker(autoflush=True, transactional=False)) # # declarative column declaration - this is so that we can infer the colname diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index e2bc71b92e..f32ec25ec2 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -144,110 +144,164 @@ class SessionTransaction(object): def __init__(self, session, parent=None, autoflush=True, nested=False): self.session = session - self.__connections = {} - self.__parent = parent + self._connections = {} + self._parent = parent self.autoflush = autoflush self.nested = nested + self._active = True + self._prepared = False + + is_active = property(lambda s: s.session is not None and s._active) + + def _assert_is_active(self): + self._assert_is_open() + if not self._active: + raise exceptions.InvalidRequestError("The transaction is inactive due to a rollback in a subtransaction and should be closed") + + def _assert_is_open(self): + if self.session is None: + raise exceptions.InvalidRequestError("The transaction is closed") def connection(self, bindkey, **kwargs): + self._assert_is_active() engine = self.session.get_bind(bindkey, **kwargs) return self.get_or_add(engine) def _begin(self, **kwargs): + self._assert_is_active() return SessionTransaction(self.session, self, **kwargs) + def _iterate_parents(self, upto=None): + if self._parent is upto: + return (self,) + else: + if self._parent is None: + raise exceptions.InvalidRequestError("Transaction %s is not on the active transaction list" % upto) + return (self,) + self._parent._iterate_parents(upto) + def add(self, bind): - if self.__parent is not None: - return self.__parent.add(bind) + self._assert_is_active() + if self._parent is not None and not self.nested: + return self._parent.add(bind) - if bind.engine in self.__connections: + if bind.engine in self._connections: raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or "")) return self.get_or_add(bind) - def _connection_dict(self): - if self.__parent is not None and not self.nested: - return self.__parent._connection_dict() - else: - return self.__connections - def get_or_add(self, bind): - if self.__parent is not None: + self._assert_is_active() + + if bind in self._connections: + return self._connections[bind][0] + + if self._parent is not None: + conn = self._parent.get_or_add(bind) if not self.nested: - return self.__parent.get_or_add(bind) - - if bind in self.__connections: - return self.__connections[bind][0] - - conn_dict = self.__parent._connection_dict() - if bind in conn_dict: - (conn, trans, autoclose) = conn_dict[bind] - self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose) return conn - elif bind in self.__connections: - return self.__connections[bind][0] - - if not isinstance(bind, engine.Connection): - e = bind - c = bind.contextual_connect() - else: - e = bind.engine - c = bind - if e in self.__connections: - raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine") - if self.nested: - trans = c.begin_nested() - elif self.session.twophase: - trans = c.begin_twophase() else: - trans = c.begin() - self.__connections[c] = self.__connections[e] = (c, trans, c is not bind) - return self.__connections[c][0] + if isinstance(bind, engine.Connection): + conn = bind + if conn.engine in self._connections: + raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine") + else: + conn = bind.contextual_connect() - def commit(self): - if self.__parent is not None and not self.nested: - return self.__parent + if self.session.twophase and self._parent is None: + transaction = conn.begin_twophase() + elif self.nested: + transaction = conn.begin_nested() + else: + transaction = conn.begin() + + self._connections[conn] = self._connections[conn.engine] = (conn, transaction, conn is not bind) + return conn - if self.session.extension is not None: + def prepare(self): + if self._parent is not None or not self.session.twophase: + raise exceptions.InvalidRequestError("Only root two phase transactions of can be prepared") + self._prepare_impl() + + def _prepare_impl(self): + self._assert_is_active() + if self.session.extension is not None and (self._parent is None or self.nested): self.session.extension.before_commit(self.session) - + + if self.session.transaction is not self: + for subtransaction in self.session.transaction._iterate_parents(upto=self): + subtransaction.commit() + if self.autoflush: self.session.flush() + + if self._parent is None and self.session.twophase: + try: + for t in util.Set(self._connections.values()): + t[1].prepare() + except: + for t in util.Set(self._connections.values()): + try: + t[1].rollback() + except: + pass + raise + + self._deactivate() + self._prepared = True + + def commit(self): + self._assert_is_open() + if not self._prepared: + self._prepare_impl() + + if self._parent is None or self.nested: + for t in util.Set(self._connections.values()): + t[1].commit() - if self.session.twophase: - for t in util.Set(self.__connections.values()): - t[1].prepare() - - for t in util.Set(self.__connections.values()): - t[1].commit() - - if self.session.extension is not None: - self.session.extension.after_commit(self.session) + if self.session.extension is not None: + self.session.extension.after_commit(self.session) self.close() - return self.__parent + return self._parent def rollback(self): - if self.__parent is not None and not self.nested: - return self.__parent.rollback() - - for t in util.Set(self.__connections.values()): + self._assert_is_open() + + if self.session.transaction is not self: + for subtransaction in self.session.transaction._iterate_parents(upto=self): + subtransaction.close() + + if self.is_active: + for transaction in self._iterate_parents(): + if transaction._parent is None or transaction.nested: + transaction._rollback_impl() + transaction._deactivate() + break + else: + transaction._deactivate() + self.close() + return self._parent + + def _rollback_impl(self): + for t in util.Set(self._connections.values()): t[1].rollback() if self.session.extension is not None: self.session.extension.after_rollback(self.session) - self.close() - return self.__parent + def _deactivate(self): + self._active = False def close(self): - if self.__parent is not None: - return - for t in util.Set(self.__connections.values()): - if t[2]: - t[0].close() - else: - t[1].close() - self.session.transaction = None + self.session.transaction = self._parent + if self._parent is None: + for connection, transaction, autoclose in util.Set(self._connections.values()): + if autoclose: + connection.close() + else: + transaction.close() + self._deactivate() + self.session = None + self._connections = None def __enter__(self): return self @@ -458,7 +512,7 @@ class Session(object): if self.transaction is None: pass else: - self.transaction = self.transaction.rollback() + self.transaction.rollback() # TODO: we can rollback attribute values. however # we would want to expand attributes.py to be able to save *two* rollback points, one to the # last flush() and the other to when the object first entered the transaction. @@ -484,13 +538,29 @@ class Session(object): if self.transaction is None: if self.transactional: self.begin() - self.transaction = self.transaction.commit() else: raise exceptions.InvalidRequestError("No transaction is begun.") - else: - self.transaction = self.transaction.commit() + + self.transaction.commit() if self.transaction is None and self.transactional: self.begin() + + def prepare(self): + """Prepare the current transaction in progress for two phase commit. + + If no transaction is in progress, this method raises + an InvalidRequestError. + + Only root transactions of two phase sessions can be prepared. If the current transaction is + not such, an InvalidRequestError is raised. + """ + if self.transaction is None: + if self.transactional: + self.begin() + else: + raise exceptions.InvalidRequestError("No transaction is begun.") + + self.transaction.prepare() def connection(self, mapper=None, **kwargs): """Return a ``Connection`` corresponding to this session's @@ -554,7 +624,8 @@ class Session(object): self.clear() if self.transaction is not None: - self.transaction.close() + for transaction in self.transaction._iterate_parents(): + transaction.close() if self.transactional: # note this doesnt use any connection resources self.begin() diff --git a/test/orm/session.py b/test/orm/session.py index 0a6157e0ac..930efe07ac 100644 --- a/test/orm/session.py +++ b/test/orm/session.py @@ -1,6 +1,6 @@ import testenv; testenv.configure_for_tests() from sqlalchemy import * -from sqlalchemy import exceptions +from sqlalchemy import exceptions, util from sqlalchemy.orm import * from sqlalchemy.orm.session import SessionExtension from sqlalchemy.orm.session import Session as SessionCls @@ -355,6 +355,122 @@ class SessionTest(AssertMixin): assert len(sess.query(User).all()) == 1 sess.close() + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') + @testing.exclude('mysql', '<', (5, 0, 3)) + def test_nested_transaction_connection_add(self): + class User(object): pass + mapper(User, users) + + sess = create_session(transactional=False) + + sess.begin() + sess.begin_nested() + + u1 = User() + sess.save(u1) + sess.flush() + + sess.rollback() + + u2 = User() + sess.save(u2) + + sess.commit() + + self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2])) + + sess.begin() + sess.begin_nested() + + u3 = User() + sess.save(u3) + sess.commit() # commit the nested transaction + sess.rollback() + + self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2])) + + sess.close() + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') + @testing.exclude('mysql', '<', (5, 0, 3)) + def test_mixed_transaction_control(self): + class User(object): pass + mapper(User, users) + + sess = create_session(transactional=False) + + sess.begin() + sess.begin_nested() + transaction = sess.begin() + + sess.save(User()) + + transaction.commit() + sess.commit() + sess.commit() + + sess.close() + + self.assertEquals(len(sess.query(User).all()), 1) + + t1 = sess.begin() + t2 = sess.begin_nested() + + sess.save(User()) + + t2.commit() + assert sess.transaction is t1 + + sess.close() + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') + @testing.exclude('mysql', '<', (5, 0, 3)) + def test_mixed_transaction_close(self): + class User(object): pass + mapper(User, users) + + sess = create_session(transactional=True) + + sess.begin_nested() + + sess.save(User()) + sess.flush() + + sess.close() + + sess.save(User()) + sess.commit() + + sess.close() + + self.assertEquals(len(sess.query(User).all()), 1) + + @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', + 'oracle', 'maxdb') + @testing.exclude('mysql', '<', (5, 0, 3)) + def test_error_on_using_inactive_session(self): + class User(object): pass + mapper(User, users) + + sess = create_session(transactional=False) + + try: + sess.begin() + sess.begin() + + sess.save(User()) + sess.flush() + + sess.rollback() + sess.begin() + assert False + except exceptions.InvalidRequestError, e: + self.assertEquals(str(e), "The transaction is inactive due to a rollback in a subtransaction and should be closed") + sess.close() + @engines.close_open_connections def test_bound_connection(self): class User(object):pass