From: Mike Bayer Date: Wed, 28 Nov 2007 22:03:14 +0000 (+0000) Subject: - fixed bug which could arise when using session.begin_nested() in conjunction X-Git-Tag: rel_0_4_2~125 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=efdd638cd253f656cb1d6d47e961040c0ca0b855;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - fixed bug which could arise when using session.begin_nested() in conjunction with more than one level deep of enclosing session.begin() statements --- diff --git a/CHANGES b/CHANGES index 936204e381..d6607e935d 100644 --- a/CHANGES +++ b/CHANGES @@ -48,6 +48,9 @@ CHANGES We also copy the instances over without using any events now, so that the 'dirty' list on the new session remains unaffected. + - fixed bug which could arise when using session.begin_nested() in conjunction + with more than one level deep of enclosing session.begin() statements + - dialects - MSSQL/PyODBC no longer has a global "set nocount on". diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index b8995140e9..6b6d678359 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -175,8 +175,9 @@ class SessionTransaction(object): if bind in self.__connections: return self.__connections[bind][0] - if bind in self.__parent._connection_dict(): - (conn, trans, autoclose) = self.__parent.__connections[bind] + conn_dict = self.__parent._connection_dict() + if bind in conn_dict: + (conn, trans, autoclose) = conn_dict[bind] self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose) return conn elif bind in self.__connections: diff --git a/test/orm/session.py b/test/orm/session.py index 0639a7d950..a662371480 100644 --- a/test/orm/session.py +++ b/test/orm/session.py @@ -241,6 +241,29 @@ class SessionTest(AssertMixin): conn.close() raise + @testing.supported('postgres', 'mysql') + @engines.close_open_connections + def test_heavy_nesting(self): + session = create_session(bind=testbase.db) + + session.begin() + session.connection().execute("insert into users (user_name) values ('user1')") + + session.begin() + + session.begin_nested() + + session.connection().execute("insert into users (user_name) values ('user2')") + assert session.connection().execute("select count(1) from users").scalar() == 2 + + session.rollback() + assert session.connection().execute("select count(1) from users").scalar() == 1 + session.connection().execute("insert into users (user_name) values ('user3')") + + session.commit() + assert session.connection().execute("select count(1) from users").scalar() == 2 + + @testing.supported('postgres', 'mysql') @testing.exclude('mysql', '<', (5, 0, 3)) def test_twophase(self):