]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- jam 0.8's rework of the session tests here so future merges have a chance of working
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 6 Oct 2012 16:16:39 +0000 (12:16 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 6 Oct 2012 16:16:39 +0000 (12:16 -0400)
test/orm/test_session.py
test/orm/test_transaction.py

index 2c5e03c41792d2627a5bebd5f088be311ccce35a..4842b04f0e4cad47af384cf8c61fac8458bd4a64 100644 (file)
@@ -7,77 +7,20 @@ import inspect
 from sqlalchemy.orm import create_session, sessionmaker, attributes, \
     make_transient, Session
 import sqlalchemy as sa
-from test.lib import engines, testing, config
+from test.lib import engines, config
+from test.lib import testing
 from sqlalchemy import Integer, String, Sequence
 from test.lib.schema import Table, Column
 from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
     exc as orm_exc, object_session
 from sqlalchemy.util import pypy
 from test.lib import fixtures
-from test.lib import fixtures
 from test.orm import _fixtures
+from sqlalchemy import event, ForeignKey
 
-class SessionTest(_fixtures.FixtureTest):
+class BindTest(_fixtures.FixtureTest):
     run_inserts = None
 
-    def test_no_close_on_flush(self):
-        """Flush() doesn't close a connection the session didn't open"""
-
-        User, users = self.classes.User, self.tables.users
-
-        c = testing.db.connect()
-        c.execute("select * from users")
-
-        mapper(User, users)
-        s = create_session(bind=c)
-        s.add(User(name='first'))
-        s.flush()
-        c.execute("select * from users")
-
-    def test_close(self):
-        """close() doesn't close a connection the session didn't open"""
-
-        User, users = self.classes.User, self.tables.users
-
-        c = testing.db.connect()
-        c.execute("select * from users")
-
-        mapper(User, users)
-        s = create_session(bind=c)
-        s.add(User(name='first'))
-        s.flush()
-        c.execute("select * from users")
-        s.close()
-        c.execute("select * from users")
-
-    def test_object_session_raises(self):
-        User = self.classes.User
-
-        assert_raises(
-            orm_exc.UnmappedInstanceError,
-            object_session,
-            object()
-        )
-
-        assert_raises(
-            orm_exc.UnmappedInstanceError,
-            object_session,
-            User()
-        )
-
-    @testing.requires.sequences
-    def test_sequence_execute(self):
-        seq = Sequence("some_sequence")
-        seq.create(testing.db)
-        try:
-            sess = create_session(bind=testing.db)
-            eq_(sess.execute(seq), 1)
-        finally:
-            seq.drop(testing.db)
-
-
-
-    @engines.close_open_connections
     def test_mapped_binds(self):
         Address, addresses, users, User = (self.classes.Address,
                                 self.tables.addresses,
@@ -120,7 +63,6 @@ class SessionTest(_fixtures.FixtureTest):
 
         sess.close()
 
-    @engines.close_open_connections
     def test_table_binds(self):
         Address, addresses, users, User = (self.classes.Address,
                                 self.tables.addresses,
@@ -161,7 +103,6 @@ class SessionTest(_fixtures.FixtureTest):
 
         sess.close()
 
-    @engines.close_open_connections
     def test_bind_from_metadata(self):
         users, User = self.tables.users, self.classes.User
 
@@ -177,149 +118,344 @@ class SessionTest(_fixtures.FixtureTest):
         assert len(session.query(User).filter_by(name='Johnny').all()) == 0
         session.close()
 
-    @testing.requires.independent_connections
-    @engines.close_open_connections
-    def test_transaction(self):
-        User, users = self.classes.User, self.tables.users
+    def test_bind_arguments(self):
+        users, Address, addresses, User = (self.tables.users,
+                                self.classes.Address,
+                                self.tables.addresses,
+                                self.classes.User)
 
         mapper(User, users)
-        conn1 = testing.db.connect()
-        conn2 = testing.db.connect()
+        mapper(Address, addresses)
 
-        sess = create_session(autocommit=False, bind=conn1)
-        u = User(name='x')
-        sess.add(u)
-        sess.flush()
-        assert conn1.execute("select count(1) from users").scalar() == 1
-        assert conn2.execute("select count(1) from users").scalar() == 0
-        sess.commit()
-        assert conn1.execute("select count(1) from users").scalar() == 1
+        e1 = engines.testing_engine()
+        e2 = engines.testing_engine()
+        e3 = engines.testing_engine()
+
+        sess = Session(e3)
+        sess.bind_mapper(User, e1)
+        sess.bind_mapper(Address, e2)
+
+        assert sess.connection().engine is e3
+        assert sess.connection(bind=e1).engine is e1
+        assert sess.connection(mapper=Address, bind=e1).engine is e1
+        assert sess.connection(mapper=Address).engine is e2
+        assert sess.connection(clause=addresses.select()).engine is e2
+        assert sess.connection(mapper=User,
+                                clause=addresses.select()).engine is e1
+        assert sess.connection(mapper=User,
+                                clause=addresses.select(),
+                                bind=e2).engine is e2
 
-        assert testing.db.connect().execute('select count(1) from users'
-                ).scalar() == 1
         sess.close()
 
-    @testing.requires.independent_connections
     @engines.close_open_connections
-    def test_autoflush(self):
-        User, users = self.classes.User, self.tables.users
+    def test_bound_connection(self):
+        users, User = self.tables.users, self.classes.User
 
-        bind = self.metadata.bind
         mapper(User, users)
-        conn1 = bind.connect()
-        conn2 = bind.connect()
-
-        sess = create_session(bind=conn1, autocommit=False, autoflush=True)
-        u = User()
-        u.name='ed'
+        c = testing.db.connect()
+        sess = create_session(bind=c)
+        sess.begin()
+        transaction = sess.transaction
+        u = User(name='u1')
         sess.add(u)
-        u2 = sess.query(User).filter_by(name='ed').one()
-        assert u2 is u
-        eq_(conn1.execute("select count(1) from users").scalar(), 1)
-        eq_(conn2.execute("select count(1) from users").scalar(), 0)
-        sess.commit()
-        eq_(conn1.execute("select count(1) from users").scalar(), 1)
-        eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
+        sess.flush()
+        assert transaction._connection_for_bind(testing.db) \
+            is transaction._connection_for_bind(c) is c
+
+        assert_raises_message(sa.exc.InvalidRequestError,
+                              'Session already has a Connection '
+                              'associated',
+                              transaction._connection_for_bind,
+                              testing.db.connect())
+        transaction.rollback()
+        assert len(sess.query(User).all()) == 0
         sess.close()
 
-    @testing.requires.python26
-    def test_with_no_autoflush(self):
+    def test_bound_connection_transactional(self):
         User, users = self.classes.User, self.tables.users
 
         mapper(User, users)
-        sess = Session()
+        c = testing.db.connect()
 
-        u = User()
-        u.name = 'ed'
+        sess = create_session(bind=c, autocommit=False)
+        u = User(name='u1')
         sess.add(u)
-        def go(obj):
-            assert u not in sess.query(User).all()
-        testing.run_as_contextmanager(sess.no_autoflush, go)
-        assert u in sess.new
-        assert u in sess.query(User).all()
-        assert u not in sess.new
-
-    def test_make_transient(self):
-        users, User = self.tables.users, self.classes.User
-
-        mapper(User, users)
-        sess = create_session()
-        sess.add(User(name='test'))
         sess.flush()
-
-        u1 = sess.query(User).first()
-        make_transient(u1)
-        assert u1 not in sess
-        sess.add(u1)
-        assert u1 in sess.new
-
-        u1 = sess.query(User).first()
-        sess.expunge(u1)
-        make_transient(u1)
-        sess.add(u1)
-        assert u1 in sess.new
-
-        # test expired attributes
-        # get unexpired
-        u1 = sess.query(User).first()
-        sess.expire(u1)
-        make_transient(u1)
-        assert u1.id is None
-        assert u1.name is None
-
-        # works twice
-        make_transient(u1)
-
         sess.close()
+        assert not c.in_transaction()
+        assert c.scalar("select count(1) from users") == 0
 
-        u1.name = 'test2'
-        sess.add(u1)
-        sess.flush()
-        assert u1 in sess
-        sess.delete(u1)
+        sess = create_session(bind=c, autocommit=False)
+        u = User(name='u2')
+        sess.add(u)
         sess.flush()
-        assert u1 not in sess
+        sess.commit()
+        assert not c.in_transaction()
+        assert c.scalar("select count(1) from users") == 1
+        c.execute("delete from users")
+        assert c.scalar("select count(1) from users") == 0
 
-        assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
-        make_transient(u1)
-        sess.add(u1)
+        c = testing.db.connect()
+
+        trans = c.begin()
+        sess = create_session(bind=c, autocommit=True)
+        u = User(name='u3')
+        sess.add(u)
         sess.flush()
-        assert u1 in sess
+        assert c.in_transaction()
+        trans.commit()
+        assert not c.in_transaction()
+        assert c.scalar("select count(1) from users") == 1
 
-    def test_make_transient_plus_rollback(self):
-        # test for [ticket:2182]
-        users, User = self.tables.users, self.classes.User
+class ExecutionTest(_fixtures.FixtureTest):
+    run_inserts = None
 
-        mapper(User, users)
-        sess = Session()
-        u1 = User(name='test')
-        sess.add(u1)
-        sess.commit()
+    @testing.requires.sequences
+    def test_sequence_execute(self):
+        seq = Sequence("some_sequence")
+        seq.create(testing.db)
+        try:
+            sess = create_session(bind=testing.db)
+            eq_(sess.execute(seq), 1)
+        finally:
+            seq.drop(testing.db)
 
-        sess.delete(u1)
-        sess.flush()
-        make_transient(u1)
-        sess.rollback()
+    def test_textual_execute(self):
+        """test that Session.execute() converts to text()"""
 
-    def test_deleted_flag(self):
-        users, User = self.tables.users, self.classes.User
+        users = self.tables.users
 
-        mapper(User, users)
 
-        sess = sessionmaker()()
+        sess = create_session(bind=self.metadata.bind)
+        users.insert().execute(id=7, name='jack')
 
-        u1 = User(name='u1')
-        sess.add(u1)
-        sess.commit()
+        # use :bindparam style
+        eq_(sess.execute("select * from users where id=:id",
+                         {'id':7}).fetchall(),
+            [(7, u'jack')])
 
-        sess.delete(u1)
-        sess.flush()
-        assert u1 not in sess
-        assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
-        sess.rollback()
-        assert u1 in sess
 
-        sess.delete(u1)
+        # use :bindparam style
+        eq_(sess.scalar("select id from users where id=:id",
+                         {'id':7}),
+            7)
+
+    def test_parameter_execute(self):
+        users = self.tables.users
+        sess = Session(bind=testing.db)
+        sess.execute(users.insert(), [
+                {"id": 7, "name": "u7"},
+                {"id": 8, "name": "u8"}
+            ]
+        )
+        sess.execute(users.insert(), {"id": 9, "name": "u9"})
+        eq_(
+            sess.execute(sa.select([users.c.id]).\
+                    order_by(users.c.id)).fetchall(),
+            [(7, ), (8, ), (9, )]
+        )
+
+
+class TransScopingTest(_fixtures.FixtureTest):
+    run_inserts = None
+
+    def test_no_close_on_flush(self):
+        """Flush() doesn't close a connection the session didn't open"""
+
+        User, users = self.classes.User, self.tables.users
+
+        c = testing.db.connect()
+        c.execute("select * from users")
+
+        mapper(User, users)
+        s = create_session(bind=c)
+        s.add(User(name='first'))
+        s.flush()
+        c.execute("select * from users")
+
+    def test_close(self):
+        """close() doesn't close a connection the session didn't open"""
+
+        User, users = self.classes.User, self.tables.users
+
+        c = testing.db.connect()
+        c.execute("select * from users")
+
+        mapper(User, users)
+        s = create_session(bind=c)
+        s.add(User(name='first'))
+        s.flush()
+        c.execute("select * from users")
+        s.close()
+        c.execute("select * from users")
+
+    @testing.requires.independent_connections
+    @engines.close_open_connections
+    def test_transaction(self):
+        User, users = self.classes.User, self.tables.users
+
+        mapper(User, users)
+        conn1 = testing.db.connect()
+        conn2 = testing.db.connect()
+
+        sess = create_session(autocommit=False, bind=conn1)
+        u = User(name='x')
+        sess.add(u)
+        sess.flush()
+        assert conn1.execute("select count(1) from users").scalar() == 1
+        assert conn2.execute("select count(1) from users").scalar() == 0
+        sess.commit()
+        assert conn1.execute("select count(1) from users").scalar() == 1
+
+        assert testing.db.connect().execute('select count(1) from users'
+                ).scalar() == 1
+        sess.close()
+
+class SessionUtilTest(_fixtures.FixtureTest):
+    run_inserts = None
+
+    def test_object_session_raises(self):
+        User = self.classes.User
+
+        assert_raises(
+            orm_exc.UnmappedInstanceError,
+            object_session,
+            object()
+        )
+
+        assert_raises(
+            orm_exc.UnmappedInstanceError,
+            object_session,
+            User()
+        )
+
+    def test_make_transient(self):
+        users, User = self.tables.users, self.classes.User
+
+        mapper(User, users)
+        sess = create_session()
+        sess.add(User(name='test'))
+        sess.flush()
+
+        u1 = sess.query(User).first()
+        make_transient(u1)
+        assert u1 not in sess
+        sess.add(u1)
+        assert u1 in sess.new
+
+        u1 = sess.query(User).first()
+        sess.expunge(u1)
+        make_transient(u1)
+        sess.add(u1)
+        assert u1 in sess.new
+
+        # test expired attributes
+        # get unexpired
+        u1 = sess.query(User).first()
+        sess.expire(u1)
+        make_transient(u1)
+        assert u1.id is None
+        assert u1.name is None
+
+        # works twice
+        make_transient(u1)
+
+        sess.close()
+
+        u1.name = 'test2'
+        sess.add(u1)
+        sess.flush()
+        assert u1 in sess
+        sess.delete(u1)
+        sess.flush()
+        assert u1 not in sess
+
+        assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
+        make_transient(u1)
+        sess.add(u1)
+        sess.flush()
+        assert u1 in sess
+
+    def test_make_transient_plus_rollback(self):
+        # test for [ticket:2182]
+        users, User = self.tables.users, self.classes.User
+
+        mapper(User, users)
+        sess = Session()
+        u1 = User(name='test')
+        sess.add(u1)
+        sess.commit()
+
+        sess.delete(u1)
+        sess.flush()
+        make_transient(u1)
+        sess.rollback()
+
+class SessionStateTest(_fixtures.FixtureTest):
+    run_inserts = None
+
+
+    @testing.requires.independent_connections
+    @engines.close_open_connections
+    def test_autoflush(self):
+        User, users = self.classes.User, self.tables.users
+
+        bind = self.metadata.bind
+        mapper(User, users)
+        conn1 = bind.connect()
+        conn2 = bind.connect()
+
+        sess = create_session(bind=conn1, autocommit=False, autoflush=True)
+        u = User()
+        u.name = 'ed'
+        sess.add(u)
+        u2 = sess.query(User).filter_by(name='ed').one()
+        assert u2 is u
+        eq_(conn1.execute("select count(1) from users").scalar(), 1)
+        eq_(conn2.execute("select count(1) from users").scalar(), 0)
+        sess.commit()
+        eq_(conn1.execute("select count(1) from users").scalar(), 1)
+        eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
+        sess.close()
+
+    @testing.requires.python26
+    def test_with_no_autoflush(self):
+        User, users = self.classes.User, self.tables.users
+
+        mapper(User, users)
+        sess = Session()
+
+        u = User()
+        u.name = 'ed'
+        sess.add(u)
+        def go(obj):
+            assert u not in sess.query(User).all()
+        testing.run_as_contextmanager(sess.no_autoflush, go)
+        assert u in sess.new
+        assert u in sess.query(User).all()
+        assert u not in sess.new
+
+
+    def test_deleted_flag(self):
+        users, User = self.tables.users, self.classes.User
+
+        mapper(User, users)
+
+        sess = sessionmaker()()
+
+        u1 = User(name='u1')
+        sess.add(u1)
+        sess.commit()
+
+        sess.delete(u1)
+        sess.flush()
+        assert u1 not in sess
+        assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
+        sess.rollback()
+        assert u1 in sess
+
+        sess.delete(u1)
         sess.commit()
         assert u1 not in sess
         assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
@@ -429,194 +565,243 @@ class SessionTest(_fixtures.FixtureTest):
         sess.rollback()
         assert not sess.is_active
 
-    def test_textual_execute(self):
-        """test that Session.execute() converts to text()"""
 
-        users = self.tables.users
+    @engines.close_open_connections
+    def test_add_delete(self):
+        User, Address, addresses, users = (self.classes.User,
+                                self.classes.Address,
+                                self.tables.addresses,
+                                self.tables.users)
 
 
-        sess = create_session(bind=self.metadata.bind)
-        users.insert().execute(id=7, name='jack')
+        s = create_session()
+        mapper(User, users, properties={
+            'addresses':relationship(Address, cascade="all, delete")
+        })
+        mapper(Address, addresses)
 
-        # use :bindparam style
-        eq_(sess.execute("select * from users where id=:id",
-                         {'id':7}).fetchall(),
-            [(7, u'jack')])
+        user = User(name='u1')
 
+        assert_raises_message(sa.exc.InvalidRequestError,
+                              'is not persisted', s.delete, user)
 
-        # use :bindparam style
-        eq_(sess.scalar("select id from users where id=:id",
-                         {'id':7}),
-            7)
+        s.add(user)
+        s.flush()
+        user = s.query(User).one()
+        s.expunge(user)
+        assert user not in s
 
-    def test_parameter_execute(self):
-        users = self.tables.users
-        sess = Session(bind=testing.db)
-        sess.execute(users.insert(), [
-                {"id": 7, "name": "u7"},
-                {"id": 8, "name": "u8"}
-            ]
-        )
-        sess.execute(users.insert(), {"id": 9, "name": "u9"})
-        eq_(
-            sess.execute(sa.select([users.c.id]).\
-                    order_by(users.c.id)).fetchall(),
-            [(7, ), (8, ), (9, )]
-        )
+        # modify outside of session, assert changes remain/get saved
+        user.name = "fred"
+        s.add(user)
+        assert user in s
+        assert user in s.dirty
+        s.flush()
+        s.expunge_all()
+        assert s.query(User).count() == 1
+        user = s.query(User).one()
+        assert user.name == 'fred'
 
+        # ensure its not dirty if no changes occur
+        s.expunge_all()
+        assert user not in s
+        s.add(user)
+        assert user in s
+        assert user not in s.dirty
 
-    @engines.close_open_connections
-    def test_bound_connection(self):
+        s2 = create_session()
+        assert_raises_message(sa.exc.InvalidRequestError,
+                              'is already attached to session',
+                              s2.delete, user)
+        u2 = s2.query(User).get(user.id)
+        assert_raises_message(sa.exc.InvalidRequestError,
+                              'another instance with key', s.delete, u2)
+        s.expire(user)
+        s.expunge(user)
+        assert user not in s
+        s.delete(user)
+        assert user in s
+
+        s.flush()
+        assert user not in s
+        assert s.query(User).count() == 0
+
+
+    @testing.uses_deprecated()
+    def test_identity_conflict(self):
         users, User = self.tables.users, self.classes.User
 
         mapper(User, users)
-        c = testing.db.connect()
-        sess = create_session(bind=c)
-        sess.begin()
-        transaction = sess.transaction
-        u = User(name='u1')
-        sess.add(u)
-        sess.flush()
-        assert transaction._connection_for_bind(testing.db) \
-            is transaction._connection_for_bind(c) is c
+        for s in (
+            create_session(),
+            create_session(weak_identity_map=False),
+        ):
+            users.delete().execute()
+            u1 = User(name="ed")
+            s.add(u1)
+            s.flush()
+            s.expunge(u1)
+            u2 = s.query(User).first()
+            s.expunge(u2)
+            s.identity_map.add(sa.orm.attributes.instance_state(u1))
 
+            assert_raises(AssertionError, s.identity_map.add,
+                          sa.orm.attributes.instance_state(u2))
+
+    def test_pickled_update(self):
+        users, User = self.tables.users, pickleable.User
+
+        mapper(User, users)
+        sess1 = create_session()
+        sess2 = create_session()
+        u1 = User(name='u1')
+        sess1.add(u1)
         assert_raises_message(sa.exc.InvalidRequestError,
-                              'Session already has a Connection '
-                              'associated',
-                              transaction._connection_for_bind,
-                              testing.db.connect())
-        transaction.rollback()
-        assert len(sess.query(User).all()) == 0
-        sess.close()
+                              'already attached to session', sess2.add,
+                              u1)
+        u2 = pickle.loads(pickle.dumps(u1))
+        sess2.add(u2)
 
-    def test_bound_connection_transactional(self):
-        User, users = self.classes.User, self.tables.users
+    def test_duplicate_update(self):
+        users, User = self.tables.users, self.classes.User
 
         mapper(User, users)
-        c = testing.db.connect()
+        Session = sessionmaker()
+        sess = Session()
 
-        sess = create_session(bind=c, autocommit=False)
-        u = User(name='u1')
-        sess.add(u)
+        u1 = User(name='u1')
+        sess.add(u1)
         sess.flush()
-        sess.close()
-        assert not c.in_transaction()
-        assert c.scalar("select count(1) from users") == 0
+        assert u1.id is not None
 
-        sess = create_session(bind=c, autocommit=False)
-        u = User(name='u2')
-        sess.add(u)
-        sess.flush()
-        sess.commit()
-        assert not c.in_transaction()
-        assert c.scalar("select count(1) from users") == 1
-        c.execute("delete from users")
-        assert c.scalar("select count(1) from users") == 0
+        sess.expunge(u1)
 
-        c = testing.db.connect()
+        assert u1 not in sess
+        assert Session.object_session(u1) is None
+
+        u2 = sess.query(User).get(u1.id)
+        assert u2 is not None and u2 is not u1
+        assert u2 in sess
+
+        assert_raises(Exception, lambda: sess.add(u1))
+
+        sess.expunge(u2)
+        assert u2 not in sess
+        assert Session.object_session(u2) is None
+
+        u1.name = "John"
+        u2.name = "Doe"
+
+        sess.add(u1)
+        assert u1 in sess
+        assert Session.object_session(u1) is sess
 
-        trans = c.begin()
-        sess = create_session(bind=c, autocommit=True)
-        u = User(name='u3')
-        sess.add(u)
         sess.flush()
-        assert c.in_transaction()
-        trans.commit()
-        assert not c.in_transaction()
-        assert c.scalar("select count(1) from users") == 1
 
-    def test_bind_arguments(self):
-        users, Address, addresses, User = (self.tables.users,
-                                self.classes.Address,
-                                self.tables.addresses,
-                                self.classes.User)
+        sess.expunge_all()
+
+        u3 = sess.query(User).get(u1.id)
+        assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
+
+    def test_no_double_save(self):
+        users = self.tables.users
+
+        sess = create_session()
+        class Foo(object):
+            def __init__(self):
+                sess.add(self)
+        class Bar(Foo):
+            def __init__(self):
+                sess.add(self)
+                Foo.__init__(self)
+        mapper(Foo, users)
+        mapper(Bar, users)
+
+        b = Bar()
+        assert b in sess
+        assert len(list(sess)) == 1
+
+    def test_identity_map_mutate(self):
+        users, User = self.tables.users, self.classes.User
 
         mapper(User, users)
-        mapper(Address, addresses)
 
-        e1 = engines.testing_engine()
-        e2 = engines.testing_engine()
-        e3 = engines.testing_engine()
+        sess = Session()
 
-        sess = Session(e3)
-        sess.bind_mapper(User, e1)
-        sess.bind_mapper(Address, e2)
+        sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
+        sess.commit()
 
-        assert sess.connection().engine is e3
-        assert sess.connection(bind=e1).engine is e1
-        assert sess.connection(mapper=Address, bind=e1).engine is e1
-        assert sess.connection(mapper=Address).engine is e2
-        assert sess.connection(clause=addresses.select()).engine is e2
-        assert sess.connection(mapper=User,
-                                clause=addresses.select()).engine is e1
-        assert sess.connection(mapper=User,
-                                clause=addresses.select(),
-                                bind=e2).engine is e2
+        # TODO: what are we testing here ?   that iteritems() can
+        # withstand a change?  should this be
+        # more directly attempting to manipulate the identity_map ?
+        u1, u2, u3 = sess.query(User).all()
+        for i, (key, value) in enumerate(sess.identity_map.iteritems()):
+            if i == 2:
+                del u3
+                gc_collect()
 
-        sess.close()
+class SessionStateWFixtureTest(_fixtures.FixtureTest):
 
-    @engines.close_open_connections
-    def test_add_delete(self):
-        User, Address, addresses, users = (self.classes.User,
-                                self.classes.Address,
+    def test_autoflush_rollback(self):
+        Address, addresses, users, User = (self.classes.Address,
                                 self.tables.addresses,
-                                self.tables.users)
-
+                                self.tables.users,
+                                self.classes.User)
 
-        s = create_session()
-        mapper(User, users, properties={
-            'addresses':relationship(Address, cascade="all, delete")
-        })
         mapper(Address, addresses)
+        mapper(User, users, properties={
+            'addresses':relationship(Address)})
 
-        user = User(name='u1')
+        sess = create_session(autocommit=False, autoflush=True)
+        u = sess.query(User).get(8)
+        newad = Address(email_address='a new address')
+        u.addresses.append(newad)
+        u.name = 'some new name'
+        assert u.name == 'some new name'
+        assert len(u.addresses) == 4
+        assert newad in u.addresses
+        sess.rollback()
+        assert u.name == 'ed'
+        assert len(u.addresses) == 3
 
-        assert_raises_message(sa.exc.InvalidRequestError,
-                              'is not persisted', s.delete, user)
+        assert newad not in u.addresses
+        # pending objects dont get expired
+        assert newad.email_address == 'a new address'
 
-        s.add(user)
-        s.flush()
-        user = s.query(User).one()
-        s.expunge(user)
-        assert user not in s
+    def test_expunge_cascade(self):
+        Address, addresses, users, User = (self.classes.Address,
+                                self.tables.addresses,
+                                self.tables.users,
+                                self.classes.User)
 
-        # modify outside of session, assert changes remain/get saved
-        user.name = "fred"
-        s.add(user)
-        assert user in s
-        assert user in s.dirty
-        s.flush()
-        s.expunge_all()
-        assert s.query(User).count() == 1
-        user = s.query(User).one()
-        assert user.name == 'fred'
+        mapper(Address, addresses)
+        mapper(User, users, properties={
+            'addresses':relationship(Address,
+                                 backref=backref("user", cascade="all"),
+                                 cascade="all")})
+
+        session = create_session()
+        u = session.query(User).filter_by(id=7).one()
+
+        # get everything to load in both directions
+        print [a.user for a in u.addresses]
+
+        # then see if expunge fails
+        session.expunge(u)
 
-        # ensure its not dirty if no changes occur
-        s.expunge_all()
-        assert user not in s
-        s.add(user)
-        assert user in s
-        assert user not in s.dirty
+        assert sa.orm.object_session(u) is None
+        assert sa.orm.attributes.instance_state(u).session_id is None
+        for a in u.addresses:
+            assert sa.orm.object_session(a) is None
+            assert sa.orm.attributes.instance_state(a).session_id is None
 
-        s2 = create_session()
-        assert_raises_message(sa.exc.InvalidRequestError,
-                              'is already attached to session',
-                              s2.delete, user)
-        u2 = s2.query(User).get(user.id)
-        assert_raises_message(sa.exc.InvalidRequestError,
-                              'another instance with key', s.delete, u2)
-        s.expire(user)
-        s.expunge(user)
-        assert user not in s
-        s.delete(user)
-        assert user in s
 
-        s.flush()
-        assert user not in s
-        assert s.query(User).count() == 0
 
+class WeakIdentityMapTest(_fixtures.FixtureTest):
+    run_inserts = None
 
-    def test_weak_ref(self):
+    @testing.requires.predictable_gc
+    def test_weakref(self):
         """test the weak-referencing identity map, which strongly-
         references modified items."""
 
@@ -651,7 +836,8 @@ class SessionTest(_fixtures.FixtureTest):
         assert user.name == 'fred'
         assert s.identity_map
 
-    def test_weak_ref_pickled(self):
+    @testing.requires.predictable_gc
+    def test_weakref_pickled(self):
         users, User = self.tables.users, pickleable.User
 
         s = create_session()
@@ -682,28 +868,7 @@ class SessionTest(_fixtures.FixtureTest):
 
         assert not s.identity_map
 
-    @testing.uses_deprecated()
-    def test_identity_conflict(self):
-        users, User = self.tables.users, self.classes.User
-
-        mapper(User, users)
-        for s in (
-            create_session(),
-            create_session(weak_identity_map=False),
-        ):
-            users.delete().execute()
-            u1 = User(name="ed")
-            s.add(u1)
-            s.flush()
-            s.expunge(u1)
-            u2 = s.query(User).first()
-            s.expunge(u2)
-            s.identity_map.add(sa.orm.attributes.instance_state(u1))
-
-            assert_raises(AssertionError, s.identity_map.add,
-                          sa.orm.attributes.instance_state(u2))
-
-
+    @testing.requires.predictable_gc
     def test_weakref_with_cycles_o2m(self):
         Address, addresses, users, User = (self.classes.Address,
                                 self.tables.addresses,
@@ -737,6 +902,7 @@ class SessionTest(_fixtures.FixtureTest):
         user = s.query(User).options(joinedload(User.addresses)).one()
         eq_(user, User(name="ed", addresses=[Address(email_address="ed2")]))
 
+    @testing.requires.predictable_gc
     def test_weakref_with_cycles_o2o(self):
         Address, addresses, users, User = (self.classes.Address,
                                 self.tables.addresses,
@@ -771,6 +937,39 @@ class SessionTest(_fixtures.FixtureTest):
         user = s.query(User).options(joinedload(User.address)).one()
         eq_(user, User(name="ed", address=Address(email_address="ed2")))
 
+    def test_auto_detach_on_gc_session(self):
+        users, User = self.tables.users, self.classes.User
+
+        mapper(User, users)
+
+        sess = Session()
+
+        u1 = User(name='u1')
+        sess.add(u1)
+        sess.commit()
+
+        # can't add u1 to Session,
+        # already belongs to u2
+        s2 = Session()
+        assert_raises_message(
+            sa.exc.InvalidRequestError,
+            r".*is already attached to session",
+            s2.add, u1
+        )
+
+        # garbage collect sess
+        del sess
+        gc_collect()
+
+        # s2 lets it in now despite u1 having
+        # session_key
+        s2.add(u1)
+        assert u1 in s2
+
+
+class StrongIdentityMapTest(_fixtures.FixtureTest):
+    run_inserts = None
+
     @testing.uses_deprecated()
     def test_strong_ref(self):
         users, User = self.tables.users, self.classes.User
@@ -848,178 +1047,6 @@ class SessionTest(_fixtures.FixtureTest):
         self.assert_(len(s.identity_map) == 0)
 
 
-    def test_pickled_update(self):
-        users, User = self.tables.users, pickleable.User
-
-        mapper(User, users)
-        sess1 = create_session()
-        sess2 = create_session()
-        u1 = User(name='u1')
-        sess1.add(u1)
-        assert_raises_message(sa.exc.InvalidRequestError,
-                              'already attached to session', sess2.add,
-                              u1)
-        u2 = pickle.loads(pickle.dumps(u1))
-        sess2.add(u2)
-
-    def test_duplicate_update(self):
-        users, User = self.tables.users, self.classes.User
-
-        mapper(User, users)
-        Session = sessionmaker()
-        sess = Session()
-
-        u1 = User(name='u1')
-        sess.add(u1)
-        sess.flush()
-        assert u1.id is not None
-
-        sess.expunge(u1)
-
-        assert u1 not in sess
-        assert Session.object_session(u1) is None
-
-        u2 = sess.query(User).get(u1.id)
-        assert u2 is not None and u2 is not u1
-        assert u2 in sess
-
-        assert_raises(Exception, lambda: sess.add(u1))
-
-        sess.expunge(u2)
-        assert u2 not in sess
-        assert Session.object_session(u2) is None
-
-        u1.name = "John"
-        u2.name = "Doe"
-
-        sess.add(u1)
-        assert u1 in sess
-        assert Session.object_session(u1) is sess
-
-        sess.flush()
-
-        sess.expunge_all()
-
-        u3 = sess.query(User).get(u1.id)
-        assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
-
-    def test_no_double_save(self):
-        users = self.tables.users
-
-        sess = create_session()
-        class Foo(object):
-            def __init__(self):
-                sess.add(self)
-        class Bar(Foo):
-            def __init__(self):
-                sess.add(self)
-                Foo.__init__(self)
-        mapper(Foo, users)
-        mapper(Bar, users)
-
-        b = Bar()
-        assert b in sess
-        assert len(list(sess)) == 1
-
-    def test_identity_map_mutate(self):
-        users, User = self.tables.users, self.classes.User
-
-        mapper(User, users)
-
-        sess = Session()
-
-        sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
-        sess.commit()
-
-        u1, u2, u3 = sess.query(User).all()
-        for i, (key, value) in enumerate(sess.identity_map.iteritems()):
-            if i == 2:
-                del u3
-                gc_collect()
-
-    def test_auto_detach_on_gc_session(self):
-        users, User = self.tables.users, self.classes.User
-
-        mapper(User, users)
-
-        sess = Session()
-
-        u1 = User(name='u1')
-        sess.add(u1)
-        sess.commit()
-
-        # can't add u1 to Session,
-        # already belongs to u2
-        s2 = Session()
-        assert_raises_message(
-            sa.exc.InvalidRequestError,
-            r".*is already attached to session",
-            s2.add, u1
-        )
-
-        # garbage collect sess
-        del sess
-        gc_collect()
-
-        # s2 lets it in now despite u1 having
-        # session_key
-        s2.add(u1)
-        assert u1 in s2
-
-class SessionDataTest(_fixtures.FixtureTest):
-    def test_expunge_cascade(self):
-        Address, addresses, users, User = (self.classes.Address,
-                                self.tables.addresses,
-                                self.tables.users,
-                                self.classes.User)
-
-        mapper(Address, addresses)
-        mapper(User, users, properties={
-            'addresses':relationship(Address,
-                                 backref=backref("user", cascade="all"),
-                                 cascade="all")})
-
-        session = create_session()
-        u = session.query(User).filter_by(id=7).one()
-
-        # get everything to load in both directions
-        print [a.user for a in u.addresses]
-
-        # then see if expunge fails
-        session.expunge(u)
-
-        assert sa.orm.object_session(u) is None
-        assert sa.orm.attributes.instance_state(u).session_id is None
-        for a in u.addresses:
-            assert sa.orm.object_session(a) is None
-            assert sa.orm.attributes.instance_state(a).session_id is None
-
-    def test_autoflush_rollback(self):
-        Address, addresses, users, User = (self.classes.Address,
-                                self.tables.addresses,
-                                self.tables.users,
-                                self.classes.User)
-
-        mapper(Address, addresses)
-        mapper(User, users, properties={
-            'addresses':relationship(Address)})
-
-        sess = create_session(autocommit=False, autoflush=True)
-        u = sess.query(User).get(8)
-        newad = Address(email_address='a new address')
-        u.addresses.append(newad)
-        u.name = 'some new name'
-        assert u.name == 'some new name'
-        assert len(u.addresses) == 4
-        assert newad in u.addresses
-        sess.rollback()
-        assert u.name == 'ed'
-        assert len(u.addresses) == 3
-
-        assert newad not in u.addresses
-        # pending objects dont get expired
-        assert newad.email_address == 'a new address'
-
 class IsModifiedTest(_fixtures.FixtureTest):
     run_inserts = None
 
@@ -1380,3 +1407,5 @@ class TLTransactionTest(fixtures.MappedTest):
         sess.flush()
         self.bind.commit()
 
+
+
index 7eb91485d837d5b2495def5b48353032b8b00db7..c98f89e85b812562a89c56e7f176aa6a3f78b76c 100644 (file)
@@ -361,10 +361,6 @@ class SessionTransactionTest(FixtureTest):
         )
 
 
-
-
-
-
     def test_error_on_using_inactive_session_commands(self):
         users, User = self.tables.users, self.classes.User