From: Mike Bayer Date: Sat, 20 Nov 2010 20:11:12 +0000 (-0500) Subject: - merge no-save-cascade collection flushes branch, [ticket:1973] X-Git-Tag: rel_0_7b1~247 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=99859369d1f6964cc22cb8a06de2e97b29b03d58;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - merge no-save-cascade collection flushes branch, [ticket:1973] --- 99859369d1f6964cc22cb8a06de2e97b29b03d58 diff --cc test/orm/test_cascade.py index ce297982e4,ca38f918c8..03cdd45616 --- a/test/orm/test_cascade.py +++ b/test/orm/test_cascade.py @@@ -51,35 -18,41 +51,43 @@@ class O2MCascadeDeleteOrphanTest(_base. @testing.resolve_artifact_names def setup_mappers(cls): mapper(Address, addresses) - mapper(User, users, - properties=dict(addresses=relationship(Address, - cascade='all, delete-orphan', backref='user'), - orders=relationship(mapper(Order, orders), - cascade='all, delete-orphan', order_by=orders.c.id))) - mapper(Dingaling, dingalings, properties={'address' - : relationship(Address)}) + mapper(Order, orders) + mapper(User, users, properties={ + 'addresses':relationship(Address, + cascade='all, delete-orphan', backref='user'), + + 'orders':relationship(Order, + cascade='all, delete-orphan', order_by=orders.c.id) + }) + + mapper(Dingaling, dingalings, properties={ + 'address' : relationship(Address) + }) @testing.resolve_artifact_names - def test_list_assignment(self): + def test_list_assignment_new(self): sess = Session() u = User(name='jack', orders=[ - Order(description='someorder'), - Order(description='someotherorder')]) + Order(description='order 1'), + Order(description='order 2')]) sess.add(u) sess.commit() - u = sess.query(User).get(u.id) eq_(u, User(name='jack', - orders=[Order(description='someorder'), - Order(description='someotherorder')])) + orders=[Order(description='order 1'), + Order(description='order 2')])) - u.orders= [Order(description="order 3"), Order(description="order 4")] + @testing.resolve_artifact_names + def test_list_assignment_replace(self): + sess = Session() + u = User(name='jack', orders=[ + Order(description='someorder'), + Order(description='someotherorder')]) + sess.add(u) - sess.commit() + + u.orders=[Order(description="order 3"), Order(description="order 4")] sess.commit() - u = sess.query(User).get(u.id) eq_(u, User(name='jack', orders=[Order(description="order 3"), Order(description="order 4")])) @@@ -88,16 -60,12 +96,12 @@@ eq_(sess.query(Order).order_by(Order.id).all(), [Order(description="order 3"), Order(description="order 4")]) - # no issue with modification - u.orders[0].description = "order 3 modified" - - eq_(sess.query(Order).order_by(Order.id).all(), - [Order(description="order 3 modified"), Order(description="order 4")]) - - # a standalone order raises a NOT NULL constraint error. + @testing.resolve_artifact_names + def test_standalone_orphan(self): + sess = Session() o5 = Order(description="order 5") sess.add(o5) - assert_raises_message(orm_exc.FlushError, "is an orphan", sess.flush) + assert_raises(sa_exc.DBAPIError, sess.flush) @testing.resolve_artifact_names def test_save_update_sends_pending(self): @@@ -331,7 -238,478 +335,478 @@@ class O2OSingleParentTest(_fixtures.Fix assert u1.address is not a1 assert a1.user is None - class NoSaveCascadeTest(_fixtures.FixtureTest): + class NoSaveCascadeFlushTest(_fixtures.FixtureTest): + """Test related item not present in session, commit proceeds.""" + + @testing.resolve_artifact_names + def _one_to_many_fixture(self, o2m_cascade=True, + m2o_cascade=True, + o2m=False, + m2o=False, + o2m_cascade_backrefs=True, + m2o_cascade_backrefs=True): + + if o2m: + if m2o: + addresses_rel = {'addresses':relationship( + Address, + cascade_backrefs=o2m_cascade_backrefs, + cascade=o2m_cascade and 'save-update' or '', + backref=backref('user', + cascade=m2o_cascade and 'save-update' or '', + cascade_backrefs=m2o_cascade_backrefs + ) + )} + + else: + addresses_rel = {'addresses':relationship( + Address, + cascade=o2m_cascade and 'save-update' or '', + cascade_backrefs=o2m_cascade_backrefs, + )} + user_rel = {} + elif m2o: + user_rel = {'user':relationship(User, + cascade=m2o_cascade and 'save-update' or '', + cascade_backrefs=m2o_cascade_backrefs + )} + addresses_rel = {} + else: + addresses_rel = {} + user_rel = {} + + mapper(User, users, properties=addresses_rel) + mapper(Address, addresses, properties=user_rel) + + @testing.resolve_artifact_names + def _many_to_many_fixture(self, fwd_cascade=True, + bkd_cascade=True, + fwd=False, + bkd=False, + fwd_cascade_backrefs=True, + bkd_cascade_backrefs=True): + + if fwd: + if bkd: + keywords_rel = {'keywords':relationship( + Keyword, + secondary=item_keywords, + cascade_backrefs=fwd_cascade_backrefs, + cascade=fwd_cascade and 'save-update' or '', + backref=backref('items', + cascade=bkd_cascade and 'save-update' or '', + cascade_backrefs=bkd_cascade_backrefs + ) + )} + + else: + keywords_rel = {'keywords':relationship( + Keyword, + secondary=item_keywords, + cascade=fwd_cascade and 'save-update' or '', + cascade_backrefs=fwd_cascade_backrefs, + )} + items_rel = {} + elif bkd: + items_rel = {'items':relationship(Item, + secondary=item_keywords, + cascade=bkd_cascade and 'save-update' or '', + cascade_backrefs=bkd_cascade_backrefs + )} + keywords_rel = {} + else: + keywords_rel = {} + items_rel = {} + + mapper(Item, items, properties=keywords_rel) + mapper(Keyword, keywords, properties=items_rel) + + @testing.resolve_artifact_names + def test_o2m_only_child_pending(self): + self._one_to_many_fixture(o2m=True, m2o=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + u1.addresses.append(a1) + sess.add(u1) + assert u1 in sess + assert a1 in sess + sess.flush() + + @testing.resolve_artifact_names + def test_o2m_only_child_transient(self): + self._one_to_many_fixture(o2m=True, m2o=False, o2m_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + u1.addresses.append(a1) + sess.add(u1) + assert u1 in sess + assert a1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_o2m_only_child_persistent(self): + self._one_to_many_fixture(o2m=True, m2o=False, o2m_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + sess.add(a1) + sess.flush() + + sess.expunge_all() + + u1.addresses.append(a1) + sess.add(u1) + assert u1 in sess + assert a1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_o2m_backref_child_pending(self): + self._one_to_many_fixture(o2m=True, m2o=True) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + u1.addresses.append(a1) + sess.add(u1) + assert u1 in sess + assert a1 in sess + sess.flush() + + @testing.resolve_artifact_names + def test_o2m_backref_child_transient(self): + self._one_to_many_fixture(o2m=True, m2o=True, + o2m_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + u1.addresses.append(a1) + sess.add(u1) + assert u1 in sess + assert a1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_o2m_backref_child_transient_nochange(self): + self._one_to_many_fixture(o2m=True, m2o=True, + o2m_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + u1.addresses.append(a1) + sess.add(u1) + assert u1 in sess + assert a1 not in sess + @testing.emits_warning(r'.*not in session') + def go(): + sess.commit() + go() + eq_(u1.addresses, []) + + @testing.resolve_artifact_names + def test_o2m_backref_child_expunged(self): + self._one_to_many_fixture(o2m=True, m2o=True, + o2m_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + sess.add(a1) + sess.flush() + + sess.add(u1) + u1.addresses.append(a1) + sess.expunge(a1) + assert u1 in sess + assert a1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_o2m_backref_child_expunged_nochange(self): + self._one_to_many_fixture(o2m=True, m2o=True, + o2m_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + sess.add(a1) + sess.flush() + + sess.add(u1) + u1.addresses.append(a1) + sess.expunge(a1) + assert u1 in sess + assert a1 not in sess + @testing.emits_warning(r'.*not in session') + def go(): + sess.commit() + go() + eq_(u1.addresses, []) + + @testing.resolve_artifact_names + def test_m2o_only_child_pending(self): + self._one_to_many_fixture(o2m=False, m2o=True) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + assert u1 in sess + assert a1 in sess + sess.flush() + + @testing.resolve_artifact_names + def test_m2o_only_child_transient(self): + self._one_to_many_fixture(o2m=False, m2o=True, m2o_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + assert u1 not in sess + assert a1 in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2o_only_child_expunged(self): + self._one_to_many_fixture(o2m=False, m2o=True, m2o_cascade=False) + sess = Session() + u1 = User(name='u1') + sess.add(u1) + sess.flush() + + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + sess.expunge(u1) + assert u1 not in sess + assert a1 in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2o_backref_child_pending(self): + self._one_to_many_fixture(o2m=True, m2o=True) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + assert u1 in sess + assert a1 in sess + sess.flush() + + @testing.resolve_artifact_names + def test_m2o_backref_child_transient(self): + self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) + sess = Session() + u1 = User(name='u1') + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + assert u1 not in sess + assert a1 in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2o_backref_child_expunged(self): + self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) + sess = Session() + u1 = User(name='u1') + sess.add(u1) + sess.flush() + + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + sess.expunge(u1) + assert u1 not in sess + assert a1 in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2o_backref_child_pending_nochange(self): + self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) + sess = Session() + u1 = User(name='u1') + + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + assert u1 not in sess + assert a1 in sess + @testing.emits_warning(r'.*not in session') + def go(): + sess.commit() + go() + # didn't get flushed + assert a1.user is None + + @testing.resolve_artifact_names + def test_m2o_backref_child_expunged_nochange(self): + self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False) + sess = Session() + u1 = User(name='u1') + sess.add(u1) + sess.flush() + + a1 = Address(email_address='a1') + a1.user = u1 + sess.add(a1) + sess.expunge(u1) + assert u1 not in sess + assert a1 in sess + @testing.emits_warning(r'.*not in session') + def go(): + sess.commit() + go() + # didn't get flushed + assert a1.user is None + + @testing.resolve_artifact_names + def test_m2m_only_child_pending(self): + self._many_to_many_fixture(fwd=True, bkd=False) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + i1.keywords.append(k1) + sess.add(i1) + assert i1 in sess + assert k1 in sess + sess.flush() + + @testing.resolve_artifact_names + def test_m2m_only_child_transient(self): + self._many_to_many_fixture(fwd=True, bkd=False, fwd_cascade=False) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + i1.keywords.append(k1) + sess.add(i1) + assert i1 in sess + assert k1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2m_only_child_persistent(self): + self._many_to_many_fixture(fwd=True, bkd=False, fwd_cascade=False) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + sess.add(k1) + sess.flush() + + sess.expunge_all() + + i1.keywords.append(k1) + sess.add(i1) + assert i1 in sess + assert k1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2m_backref_child_pending(self): + self._many_to_many_fixture(fwd=True, bkd=True) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + i1.keywords.append(k1) + sess.add(i1) + assert i1 in sess + assert k1 in sess + sess.flush() + + @testing.resolve_artifact_names + def test_m2m_backref_child_transient(self): + self._many_to_many_fixture(fwd=True, bkd=True, + fwd_cascade=False) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + i1.keywords.append(k1) + sess.add(i1) + assert i1 in sess + assert k1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2m_backref_child_transient_nochange(self): + self._many_to_many_fixture(fwd=True, bkd=True, + fwd_cascade=False) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + i1.keywords.append(k1) + sess.add(i1) + assert i1 in sess + assert k1 not in sess + @testing.emits_warning(r'.*not in session') + def go(): + sess.commit() + go() + eq_(i1.keywords, []) + + @testing.resolve_artifact_names + def test_m2m_backref_child_expunged(self): + self._many_to_many_fixture(fwd=True, bkd=True, + fwd_cascade=False) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + sess.add(k1) + sess.flush() + + sess.add(i1) + i1.keywords.append(k1) + sess.expunge(k1) + assert i1 in sess + assert k1 not in sess + assert_raises_message( + sa_exc.SAWarning, "not in session", sess.flush + ) + + @testing.resolve_artifact_names + def test_m2m_backref_child_expunged_nochange(self): + self._many_to_many_fixture(fwd=True, bkd=True, + fwd_cascade=False) + sess = Session() + i1 = Item(description='i1') + k1 = Keyword(name='k1') + sess.add(k1) + sess.flush() + + sess.add(i1) + i1.keywords.append(k1) + sess.expunge(k1) + assert i1 in sess + assert k1 not in sess + @testing.emits_warning(r'.*not in session') + def go(): + sess.commit() + go() + eq_(i1.keywords, []) + -class NoSaveCascadeTest(_fixtures.FixtureTest): ++class NoSaveCascadeBackrefTest(_fixtures.FixtureTest): """test that backrefs don't force save-update cascades to occur when the cascade initiated from the forwards side.""" @@@ -408,9 -786,38 +883,9 @@@ k1.items.append(i1) assert i1 in sess assert k1 not in sess - + -class O2MCascadeNoOrphanTest(_fixtures.FixtureTest): - run_inserts = None - - @classmethod - @testing.resolve_artifact_names - def setup_mappers(cls): - mapper(User, users, properties = dict( - orders = relationship( - mapper(Order, orders), cascade="all") - )) - - @testing.resolve_artifact_names - def test_cascade_delete_noorphans(self): - sess = create_session() - u = User(name='jack', - orders=[Order(description='someorder'), - Order(description='someotherorder')]) - sess.add(u) - sess.flush() - assert users.count().scalar() == 1 - assert orders.count().scalar() == 2 - - del u.orders[0] - sess.delete(u) - sess.flush() - assert users.count().scalar() == 0 - assert orders.count().scalar() == 1 - - -class M2OCascadeTest(_base.MappedTest): +class M2OCascadeDeleteOrphanTestOne(_base.MappedTest): @classmethod def define_tables(cls, metadata): @@@ -1038,17 -1465,36 +1534,36 @@@ class NoBackrefCascadeTest(_fixtures.Fi a1.dingalings.append(d1) assert a1 not in sess - - a2 = Address(email_address='a2') - sess.add(a2) + + @testing.resolve_artifact_names + def test_m2o_flag_on_backref(self): + sess = Session() + + a1 = Address(email_address='a1') + sess.add(a1) u1 = User(name='u1') - u1.addresses.append(a2) + u1.addresses.append(a1) assert u1 in sess - sess.commit() + @testing.resolve_artifact_names + def test_m2o_commit_warns(self): + sess = Session() + + a1 = Address(email_address='a1') + d1 = Dingaling() + sess.add(d1) + + a1.dingalings.append(d1) + assert a1 not in sess + + assert_raises_message( + sa_exc.SAWarning, + "not in session", + sess.commit + ) -class UnsavedOrphansTest(_base.MappedTest): +class PendingOrphanTestSingleLevel(_base.MappedTest): """Pending entities that are orphans""" @classmethod @@@ -1444,12 -1834,62 +1959,58 @@@ class DoubleParentM2OOrphanTest(_base.M session = create_session() a1 = Address() session.add(a1) - try: - session.flush() - assert False - except orm_exc.FlushError, e: - assert True + session.flush() - - class OrphanMoveTest(_base.MappedTest): - """Test that movement of objects that would be orphaned - to a new parent behaves properly. + class CollectionAssignmentOrphanTest(_base.MappedTest): + @classmethod + def define_tables(cls, metadata): + Table('table_a', metadata, + Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('name', String(30))) + Table('table_b', metadata, + Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('name', String(30)), + Column('a_id', Integer, ForeignKey('table_a.id'))) + + @testing.resolve_artifact_names + def test_basic(self): + class A(_fixtures.Base): + pass + class B(_fixtures.Base): + pass + + mapper(A, table_a, properties={ + 'bs':relationship(B, cascade="all, delete-orphan") + }) + mapper(B, table_b) + + a1 = A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]) + + sess = create_session() + sess.add(a1) + sess.flush() + + sess.expunge_all() + + eq_(sess.query(A).get(a1.id), + A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')])) + + a1 = sess.query(A).get(a1.id) + assert not class_mapper(B)._is_orphan( + attributes.instance_state(a1.bs[0])) + a1.bs[0].foo='b2modified' + a1.bs[1].foo='b3modified' + sess.flush() + + sess.expunge_all() + eq_(sess.query(A).get(a1.id), + A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')])) + + class O2MConflictTest(_base.MappedTest): + """test that O2M dependency detects a change in parent, does the + right thing, and updates the collection/attribute. """ diff --cc test/orm/test_session.py index 5994c41dac,7bfaf51316..62047970c9 --- a/test/orm/test_session.py +++ b/test/orm/test_session.py @@@ -1125,242 -1125,46 +1125,203 @@@ class SessionTest(_fixtures.FixtureTest self.assert_(s.prune() == 0) self.assert_(len(s.identity_map) == 0) - @testing.resolve_artifact_names - def test_no_save_cascade_1(self): - mapper(Address, addresses) - mapper(User, users, properties=dict( - addresses=relationship(Address, cascade="none", backref="user"))) - s = create_session() - - u = User(name='u1') - s.add(u) - a = Address(email_address='u1@e') - u.addresses.append(a) - assert u in s - assert a not in s - s.flush() - print "\n".join([repr(x.__dict__) for x in s]) - s.expunge_all() - assert s.query(User).one().id == u.id - assert s.query(Address).first() is None - - @testing.resolve_artifact_names - def test_no_save_cascade_2(self): - mapper(Address, addresses) - mapper(User, users, properties=dict( - addresses=relationship(Address, - cascade="all", - backref=backref("user", cascade="none")))) - - s = create_session() - u = User(name='u1') - a = Address(email_address='u1@e') - a.user = u - s.add(a) - assert u not in s - assert a in s - s.flush() - s.expunge_all() - assert s.query(Address).one().id == a.id - assert s.query(User).first() is None - + @testing.resolve_artifact_names - def test_extension(self): + def test_pickled_update(self): mapper(User, users) - log = [] - class MyExt(sa.orm.session.SessionExtension): - def before_commit(self, session): - log.append('before_commit') - def after_commit(self, session): - log.append('after_commit') - def after_rollback(self, session): - log.append('after_rollback') - def before_flush(self, session, flush_context, objects): - log.append('before_flush') - def after_flush(self, session, flush_context): - log.append('after_flush') - def after_flush_postexec(self, session, flush_context): - log.append('after_flush_postexec') - def after_begin(self, session, transaction, connection): - log.append('after_begin') - def after_attach(self, session, instance): - log.append('after_attach') - def after_bulk_update( - self, - session, - query, - query_context, - result, - ): - log.append('after_bulk_update') + sess1 = create_session() + sess2 = create_session() + u1 = User(name='u1') + sess1.add(u1) + assert_raises_message(sa.exc.InvalidRequestError, + 'already attached to session', sess2.add, + u1) + u2 = pickle.loads(pickle.dumps(u1)) + sess2.add(u2) - def after_bulk_delete( - self, - session, - query, - query_context, - result, - ): - log.append('after_bulk_delete') + @testing.resolve_artifact_names + def test_duplicate_update(self): + mapper(User, users) + Session = sessionmaker() + sess = Session() - sess = create_session(extension = MyExt()) + u1 = User(name='u1') + sess.add(u1) + sess.flush() + assert u1.id is not None + + sess.expunge(u1) + + assert u1 not in sess + assert Session.object_session(u1) is None + + u2 = sess.query(User).get(u1.id) + assert u2 is not None and u2 is not u1 + assert u2 in sess + + assert_raises(Exception, lambda: sess.add(u1)) + + sess.expunge(u2) + assert u2 not in sess + assert Session.object_session(u2) is None + + u1.name = "John" + u2.name = "Doe" + + sess.add(u1) + assert u1 in sess + assert Session.object_session(u1) is sess + + sess.flush() + + sess.expunge_all() + + u3 = sess.query(User).get(u1.id) + assert u3 is not u1 and u3 is not u2 and u3.name == u1.name + + @testing.resolve_artifact_names + def test_no_double_save(self): + sess = create_session() + class Foo(object): + def __init__(self): + sess.add(self) + class Bar(Foo): + def __init__(self): + sess.add(self) + Foo.__init__(self) + mapper(Foo, users) + mapper(Bar, users) + + b = Bar() + assert b in sess + assert len(list(sess)) == 1 + + @testing.resolve_artifact_names + def test_identity_map_mutate(self): + mapper(User, users) + + sess = Session() + + sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')]) + sess.commit() + + u1, u2, u3 = sess.query(User).all() + for i, (key, value) in enumerate(sess.identity_map.iteritems()): + if i == 2: + del u3 + gc_collect() + +class SessionEventsTest(_fixtures.FixtureTest): + run_inserts = None + + def test_class_listen(self): + def my_listener(*arg, **kw): + pass + + event.listen(my_listener, 'on_before_flush', Session) + + s = Session() + assert my_listener in s.dispatch.on_before_flush + + def test_sessionmaker_listen(self): + """test that listen can be applied to individual scoped_session() classes.""" + + def my_listener_one(*arg, **kw): + pass + def my_listener_two(*arg, **kw): + pass + + S1 = sessionmaker() + S2 = sessionmaker() + + event.listen(my_listener_one, 'on_before_flush', Session) + event.listen(my_listener_two, 'on_before_flush', S1) + + s1 = S1() + assert my_listener_one in s1.dispatch.on_before_flush + assert my_listener_two in s1.dispatch.on_before_flush + + s2 = S2() + assert my_listener_one in s2.dispatch.on_before_flush + assert my_listener_two not in s2.dispatch.on_before_flush + + def test_scoped_session_invalid_callable(self): + from sqlalchemy.orm import scoped_session + + def my_listener_one(*arg, **kw): + pass + + scope = scoped_session(lambda:Session()) + + assert_raises_message( + sa.exc.ArgumentError, + "Session event listen on a ScopedSession " + "requries that its creation callable is a Session subclass.", + event.listen, my_listener_one, "on_before_flush", scope + ) + + def test_scoped_session_invalid_class(self): + from sqlalchemy.orm import scoped_session + + def my_listener_one(*arg, **kw): + pass + + class NotASession(object): + def __call__(self): + return Session() + + scope = scoped_session(NotASession) + + assert_raises_message( + sa.exc.ArgumentError, + "Session event listen on a ScopedSession " + "requries that its creation callable is a Session subclass.", + event.listen, my_listener_one, "on_before_flush", scope + ) + + def test_scoped_session_listen(self): + from sqlalchemy.orm import scoped_session + + def my_listener_one(*arg, **kw): + pass + + scope = scoped_session(sessionmaker()) + event.listen(my_listener_one, "on_before_flush", scope) + + assert my_listener_one in scope().dispatch.on_before_flush + + def _listener_fixture(self, **kw): + canary = [] + def listener(name): + def go(*arg, **kw): + canary.append(name) + return go + + sess = Session(**kw) + + for evt in [ + 'on_before_commit', + 'on_after_commit', + 'on_after_rollback', + 'on_before_flush', + 'on_after_flush', + 'on_after_flush_postexec', + 'on_after_begin', + 'on_after_attach', + 'on_after_bulk_update', + 'on_after_bulk_delete' + ]: + event.listen(listener(evt), evt, sess) + + return sess, canary + + @testing.resolve_artifact_names + def test_flush_autocommit_hook(self): + + mapper(User, users) + + sess, canary = self._listener_fixture(autoflush=False, autocommit=True) + u = User(name='u1') sess.add(u) sess.flush()