From 5c611f702cf1df6e072ddf17040c20e3a5fb96de Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 20 Nov 2010 14:52:21 -0500 Subject: [PATCH] - the in-flush check for "pending orphan being flushed" has been removed. It's now possible to issue INSERT for an object that would otherwise be an orphan due to no parent object specified, along a relationship that specifies "delete-orphan" cascade. The detection of this condition as an error should now be accomplished via NOT NULL foreign keys. [ticket:1912] --- lib/sqlalchemy/orm/session.py | 15 +- test/orm/inheritance/test_basic.py | 13 +- test/orm/inheritance/test_polymorph.py | 69 +-- test/orm/test_cascade.py | 724 +++++++++++++------------ 4 files changed, 410 insertions(+), 411 deletions(-) diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index a1eb4c46d1..2920746634 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -1441,20 +1441,7 @@ class Session(object): proc = new.union(dirty).difference(deleted) for state in proc: - is_orphan = _state_mapper(state)._is_orphan(state) - if is_orphan and not state.has_identity: - path = ", nor ".join( - ["any parent '%s' instance " - "via that classes' '%s' attribute" % - (cls.__name__, key) - for (key, cls) in chain(*( - m.delete_orphans for m in _state_mapper(state).iterate_to_root() - )) - ]) - raise exc.FlushError( - "Instance %s is an unsaved, pending instance and is an " - "orphan (is not attached to %s)" % ( - mapperutil.state_str(state), path)) + is_orphan = _state_mapper(state)._is_orphan(state) and state.has_identity flush_context.register_object(state, isdelete=is_orphan) processed.add(state) diff --git a/test/orm/inheritance/test_basic.py b/test/orm/inheritance/test_basic.py index b9956f2863..3e4d50d96b 100644 --- a/test/orm/inheritance/test_basic.py +++ b/test/orm/inheritance/test_basic.py @@ -1276,6 +1276,15 @@ class PKDiscriminatorTest(_base.MappedTest): class DeleteOrphanTest(_base.MappedTest): + """Test the fairly obvious, that an error is raised + when attempting to insert an orphan. + + Previous SQLA versions would check this constraint + in memory which is the original rationale for this test. + + """ + + @classmethod def define_tables(cls, metadata): global single, parent @@ -1310,8 +1319,6 @@ class DeleteOrphanTest(_base.MappedTest): sess = create_session() s1 = SubClass(data='s1') sess.add(s1) - assert_raises_message(orm_exc.FlushError, - r"is not attached to any parent 'Parent' instance via " - "that classes' 'related' attribute", sess.flush) + assert_raises(sa_exc.DBAPIError, sess.flush) diff --git a/test/orm/inheritance/test_polymorph.py b/test/orm/inheritance/test_polymorph.py index eed2395d7f..7296bec98e 100644 --- a/test/orm/inheritance/test_polymorph.py +++ b/test/orm/inheritance/test_polymorph.py @@ -4,6 +4,7 @@ from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message from sqlalchemy import * from sqlalchemy.orm import * from sqlalchemy.orm import exc as orm_exc +from sqlalchemy import exc as sa_exc from sqlalchemy.test import Column, testing from sqlalchemy.util import function_named from test.orm import _fixtures, _base @@ -30,7 +31,7 @@ class PolymorphTest(_base.MappedTest): people = Table('people', metadata, Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('company_id', Integer, ForeignKey('companies.company_id')), + Column('company_id', Integer, ForeignKey('companies.company_id'), nullable=False), Column('name', String(50)), Column('type', String(30))) @@ -65,7 +66,10 @@ class InsertOrderTest(PolymorphTest): 'person':people.select(people.c.type=='person'), }, None, 'pjoin') - person_mapper = mapper(Person, people, with_polymorphic=('*', person_join), polymorphic_on=person_join.c.type, polymorphic_identity='person') + person_mapper = mapper(Person, people, + with_polymorphic=('*', person_join), + polymorphic_on=person_join.c.type, + polymorphic_identity='person') mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') @@ -77,49 +81,22 @@ class InsertOrderTest(PolymorphTest): session = create_session() c = Company(name='company1') - c.employees.append(Manager(status='AAB', manager_name='manager1', name='pointy haired boss')) - c.employees.append(Engineer(status='BBA', engineer_name='engineer1', primary_language='java', name='dilbert')) + c.employees.append(Manager(status='AAB', manager_name='manager1' + , name='pointy haired boss')) + c.employees.append(Engineer(status='BBA', + engineer_name='engineer1', + primary_language='java', name='dilbert')) c.employees.append(Person(status='HHH', name='joesmith')) - c.employees.append(Engineer(status='CGG', engineer_name='engineer2', primary_language='python', name='wally')) - c.employees.append(Manager(status='ABA', manager_name='manager2', name='jsmith')) + c.employees.append(Engineer(status='CGG', + engineer_name='engineer2', + primary_language='python', name='wally')) + c.employees.append(Manager(status='ABA', manager_name='manager2' + , name='jsmith')) session.add(c) session.flush() session.expunge_all() eq_(session.query(Company).get(c.company_id), c) -class RelationshipToSubclassTest(PolymorphTest): - def test_basic(self): - """test a relationship to an inheriting mapper where the relationship is to a subclass - but the join condition is expressed by the parent table. - - also test that backrefs work in this case. - - this test touches upon a lot of the join/foreign key determination code in properties.py - and creates the need for properties.py to search for conditions individually within - the mapper's local table as well as the mapper's 'mapped' table, so that relationships - requiring lots of specificity (like self-referential joins) as well as relationships requiring - more generalization (like the example here) both come up with proper results.""" - - mapper(Person, people) - - mapper(Engineer, engineers, inherits=Person) - mapper(Manager, managers, inherits=Person) - - mapper(Company, companies, properties={ - 'managers': relationship(Manager, backref="company") - }) - - sess = create_session() - - c = Company(name='company1') - c.managers.append(Manager(status='AAB', manager_name='manager1', name='pointy haired boss')) - sess.add(c) - sess.flush() - sess.expunge_all() - - eq_(sess.query(Company).filter_by(company_id=c.company_id).one(), c) - assert c.managers[0].company is c - class RoundTripTest(PolymorphTest): pass @@ -163,9 +140,16 @@ def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, manager_with_polymorphic = None if redefine_colprop: - person_mapper = mapper(Person, people, with_polymorphic=person_with_polymorphic, polymorphic_on=people.c.type, polymorphic_identity='person', properties= {'person_name':people.c.name}) + person_mapper = mapper(Person, people, + with_polymorphic=person_with_polymorphic, + polymorphic_on=people.c.type, + polymorphic_identity='person', + properties= {'person_name':people.c.name}) else: - person_mapper = mapper(Person, people, with_polymorphic=person_with_polymorphic, polymorphic_on=people.c.type, polymorphic_identity='person') + person_mapper = mapper(Person, people, + with_polymorphic=person_with_polymorphic, + polymorphic_on=people.c.type, + polymorphic_identity='person') mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') mapper(Manager, managers, inherits=person_mapper, with_polymorphic=manager_with_polymorphic, polymorphic_identity='manager') @@ -270,7 +254,8 @@ def _generate_round_trip_test(include_base, lazy_relationship, redefine_colprop, # test standalone orphans daboss = Boss(status='BBB', manager_name='boss', golf_swing='fore', **{person_attribute_name:'daboss'}) session.add(daboss) - assert_raises(orm_exc.FlushError, session.flush) + assert_raises(sa_exc.DBAPIError, session.flush) + c = session.query(Company).first() daboss.company = c manager_list = [e for e in c.employees if isinstance(e, Manager)] diff --git a/test/orm/test_cascade.py b/test/orm/test_cascade.py index f7a22dad68..f69a7c346b 100644 --- a/test/orm/test_cascade.py +++ b/test/orm/test_cascade.py @@ -11,51 +11,93 @@ from sqlalchemy.test.testing import eq_ from test.orm import _base, _fixtures -class O2MCascadeTest(_fixtures.FixtureTest): +class O2MCascadeDeleteOrphanTest(_base.MappedTest): run_inserts = None + @classmethod + def define_tables(cls, metadata): + Table('users', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('name', String(30), nullable=False), + ) + Table('addresses', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('user_id', Integer, ForeignKey('users.id')), + Column('email_address', String(50), nullable=False), + ) + Table('orders', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('user_id', Integer, ForeignKey('users.id'), nullable=False), + Column('description', String(30)), + ) + Table("dingalings", metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('address_id', Integer, ForeignKey('addresses.id')), + Column('data', String(30)) + ) + + @classmethod + def setup_classes(cls): + class User(_base.ComparableEntity): + pass + class Address(_base.ComparableEntity): + pass + class Order(_base.ComparableEntity): + pass + class Dingaling(_base.ComparableEntity): + pass + @classmethod @testing.resolve_artifact_names def setup_mappers(cls): mapper(Address, addresses) - mapper(User, users, - properties=dict(addresses=relationship(Address, - cascade='all, delete-orphan', backref='user'), - orders=relationship(mapper(Order, orders), - cascade='all, delete-orphan', order_by=orders.c.id))) - mapper(Dingaling, dingalings, properties={'address' - : relationship(Address)}) + mapper(Order, orders) + mapper(User, users, properties={ + 'addresses':relationship(Address, + cascade='all, delete-orphan', backref='user'), + + 'orders':relationship(Order, + cascade='all, delete-orphan', order_by=orders.c.id) + }) + + mapper(Dingaling, dingalings, properties={ + 'address' : relationship(Address) + }) @testing.resolve_artifact_names def test_list_assignment(self): - sess = create_session() + sess = Session() u = User(name='jack', orders=[ - Order(description='someorder'), - Order(description='someotherorder')]) + Order(description='order 1'), + Order(description='order 2')]) sess.add(u) - sess.flush() - sess.expunge_all() + sess.commit() - u = sess.query(User).get(u.id) eq_(u, User(name='jack', - orders=[Order(description='someorder'), - Order(description='someotherorder')])) + orders=[Order(description='order 1'), + Order(description='order 2')])) - u.orders=[Order(description="order 3"), Order(description="order 4")] - sess.flush() - sess.expunge_all() + u.orders= [Order(description="order 3"), Order(description="order 4")] + sess.commit() - u = sess.query(User).get(u.id) eq_(u, User(name='jack', orders=[Order(description="order 3"), Order(description="order 4")])) + # order 1, order 2 have been deleted eq_(sess.query(Order).order_by(Order.id).all(), [Order(description="order 3"), Order(description="order 4")]) + # no issue with modification + u.orders[0].description = "order 3 modified" + + eq_(sess.query(Order).order_by(Order.id).all(), + [Order(description="order 3 modified"), Order(description="order 4")]) + + # a standalone order raises a NOT NULL constraint error. o5 = Order(description="order 5") sess.add(o5) - assert_raises_message(orm_exc.FlushError, "is an orphan", sess.flush) + assert_raises(sa_exc.DBAPIError, sess.flush) @testing.resolve_artifact_names def test_save_update_sends_pending(self): @@ -206,7 +248,54 @@ class O2MCascadeTest(_fixtures.FixtureTest): assert users.count().scalar() == 1 assert orders.count().scalar() == 0 -class O2OCascadeTest(_fixtures.FixtureTest): +class O2MCascadeDeleteNoOrphanTest(_base.MappedTest): + run_inserts = None + + @classmethod + def define_tables(cls, metadata): + Table('users', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('name', String(30)) + ) + Table('orders', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('user_id', Integer, ForeignKey('users.id')), + Column('description', String(30)) + ) + + @classmethod + def setup_classes(cls): + class User(_base.ComparableEntity): + pass + class Order(_base.ComparableEntity): + pass + + @classmethod + @testing.resolve_artifact_names + def setup_mappers(cls): + mapper(User, users, properties = dict( + orders = relationship( + mapper(Order, orders), cascade="all") + )) + + @testing.resolve_artifact_names + def test_cascade_delete_noorphans(self): + sess = create_session() + u = User(name='jack', + orders=[Order(description='someorder'), + Order(description='someotherorder')]) + sess.add(u) + sess.flush() + assert users.count().scalar() == 1 + assert orders.count().scalar() == 2 + + del u.orders[0] + sess.delete(u) + sess.flush() + assert users.count().scalar() == 0 + assert orders.count().scalar() == 1 + +class O2OSingleParentTest(_fixtures.FixtureTest): run_inserts = None @classmethod @@ -228,33 +317,6 @@ class O2OCascadeTest(_fixtures.FixtureTest): assert u1.address is not a1 assert a1.user is None - - -class O2MBackrefTest(_fixtures.FixtureTest): - run_inserts = None - - @classmethod - @testing.resolve_artifact_names - def setup_mappers(cls): - mapper(User, users, - properties=dict(orders=relationship(mapper(Order, - orders), cascade='all, delete-orphan', backref='user'))) - - @testing.resolve_artifact_names - def test_lazyload_bug(self): - sess = create_session() - - u = User(name="jack") - sess.add(u) - sess.expunge(u) - - o1 = Order(description='someorder') - o1.user = u - sess.add(u) - assert u in sess - assert o1 in sess - - class NoSaveCascadeTest(_fixtures.FixtureTest): """test that backrefs don't force save-update cascades to occur when the cascade initiated from the forwards side.""" @@ -334,36 +396,7 @@ class NoSaveCascadeTest(_fixtures.FixtureTest): assert k1 not in sess -class O2MCascadeNoOrphanTest(_fixtures.FixtureTest): - run_inserts = None - - @classmethod - @testing.resolve_artifact_names - def setup_mappers(cls): - mapper(User, users, properties = dict( - orders = relationship( - mapper(Order, orders), cascade="all") - )) - - @testing.resolve_artifact_names - def test_cascade_delete_noorphans(self): - sess = create_session() - u = User(name='jack', - orders=[Order(description='someorder'), - Order(description='someotherorder')]) - sess.add(u) - sess.flush() - assert users.count().scalar() == 1 - assert orders.count().scalar() == 2 - - del u.orders[0] - sess.delete(u) - sess.flush() - assert users.count().scalar() == 0 - assert orders.count().scalar() == 1 - - -class M2OCascadeTest(_base.MappedTest): +class M2OCascadeDeleteOrphanTestOne(_base.MappedTest): @classmethod def define_tables(cls, metadata): @@ -524,24 +557,25 @@ class M2OCascadeTest(_base.MappedTest): eq_(sess.query(Pref).order_by(Pref.id).all(), [Pref(data="pref 1"), Pref(data="pref 3"), Pref(data="newpref")]) -class M2OCascadeDeleteTest(_base.MappedTest): +class M2OCascadeDeleteOrphanTestTwo(_base.MappedTest): @classmethod def define_tables(cls, metadata): - Table('t1', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('data',String(50)), + Table('t1', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(50)), Column('t2id', Integer, ForeignKey('t2.id'))) - Table('t2', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('data',String(50)), + Table('t2', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(50)), Column('t3id', Integer, ForeignKey('t3.id'))) - Table('t3', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + Table('t3', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) @classmethod @@ -556,8 +590,11 @@ class M2OCascadeDeleteTest(_base.MappedTest): @classmethod @testing.resolve_artifact_names def setup_mappers(cls): - mapper(T1, t1, properties={'t2': relationship(T2, cascade="all")}) - mapper(T2, t2, properties={'t3': relationship(T3, cascade="all")}) + mapper(T1, t1, properties=dict(t2=relationship(T2, + cascade='all, delete-orphan', single_parent=True))) + mapper(T2, t2, properties=dict(t3=relationship(T3, + cascade='all, delete-orphan', single_parent=True, + backref=backref('t2', uselist=False)))) mapper(T3, t3) @testing.resolve_artifact_names @@ -574,100 +611,92 @@ class M2OCascadeDeleteTest(_base.MappedTest): eq_(sess.query(T3).all(), []) @testing.resolve_artifact_names - def test_cascade_delete_postappend_onelevel(self): + def test_deletes_orphans_onelevel(self): sess = create_session() - x1 = T1(data='t1', ) - x2 = T2(data='t2') - x3 = T3(data='t3') - sess.add_all((x1, x2, x3)) + x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b'))) + sess.add(x2) sess.flush() + x2.t2 = None - sess.delete(x1) - x1.t2 = x2 - x2.t3 = x3 + sess.delete(x2) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) @testing.resolve_artifact_names - def test_cascade_delete_postappend_twolevel(self): + def test_deletes_orphans_twolevel(self): sess = create_session() - x1 = T1(data='t1', t2=T2(data='t2')) - x3 = T3(data='t3') - sess.add_all((x1, x3)) + x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a'))) + sess.add(x) sess.flush() - sess.delete(x1) - x1.t2.t3 = x3 + x.t2.t3 = None + sess.delete(x) sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) @testing.resolve_artifact_names - def test_preserves_orphans_onelevel(self): + def test_finds_orphans_twolevel(self): sess = create_session() - x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b'))) - sess.add(x2) + x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a'))) + sess.add(x) sess.flush() - x2.t2 = None - sess.delete(x2) + x.t2.t3 = None sess.flush() - eq_(sess.query(T1).all(), []) + eq_(sess.query(T1).all(), [T1()]) eq_(sess.query(T2).all(), [T2()]) - eq_(sess.query(T3).all(), [T3()]) + eq_(sess.query(T3).all(), []) - @testing.future @testing.resolve_artifact_names - def test_preserves_orphans_onelevel_postremove(self): - sess = create_session() - x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b'))) - sess.add(x2) - sess.flush() + def test_single_parent_raise(self): - sess.delete(x2) - x2.t2 = None - sess.flush() - eq_(sess.query(T1).all(), []) - eq_(sess.query(T2).all(), [T2()]) - eq_(sess.query(T3).all(), [T3()]) + sess = create_session() + + y = T2(data='T2a') + x = T1(data='T1a', t2=y) + assert_raises(sa_exc.InvalidRequestError, T1, data='T1b', t2=y) @testing.resolve_artifact_names - def test_preserves_orphans_twolevel(self): + def test_single_parent_backref(self): + sess = create_session() - x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a'))) - sess.add(x) - sess.flush() + + y = T3(data='T3a') + x = T2(data='T2a', t3=y) - x.t2.t3 = None - sess.delete(x) - sess.flush() - eq_(sess.query(T1).all(), []) - eq_(sess.query(T2).all(), []) - eq_(sess.query(T3).all(), [T3()]) + # cant attach the T3 to another T2 + assert_raises(sa_exc.InvalidRequestError, T2, data='T2b', t3=y) + + # set via backref tho is OK, unsets from previous parent + # first + z = T2(data='T2b') + y.t2 = z + assert z.t3 is y + assert x.t3 is None -class M2OCascadeDeleteOrphanTest(_base.MappedTest): +class M2OCascadeDeleteNoOrphanTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): - Table('t1', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('data', String(50)), + Table('t1', metadata, Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data',String(50)), Column('t2id', Integer, ForeignKey('t2.id'))) - Table('t2', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('data', String(50)), + Table('t2', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data',String(50)), Column('t3id', Integer, ForeignKey('t3.id'))) - Table('t3', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + Table('t3', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) @classmethod @@ -682,11 +711,8 @@ class M2OCascadeDeleteOrphanTest(_base.MappedTest): @classmethod @testing.resolve_artifact_names def setup_mappers(cls): - mapper(T1, t1, properties=dict(t2=relationship(T2, - cascade='all, delete-orphan', single_parent=True))) - mapper(T2, t2, properties=dict(t3=relationship(T3, - cascade='all, delete-orphan', single_parent=True, - backref=backref('t2', uselist=False)))) + mapper(T1, t1, properties={'t2': relationship(T2, cascade="all")}) + mapper(T2, t2, properties={'t3': relationship(T3, cascade="all")}) mapper(T3, t3) @testing.resolve_artifact_names @@ -703,73 +729,81 @@ class M2OCascadeDeleteOrphanTest(_base.MappedTest): eq_(sess.query(T3).all(), []) @testing.resolve_artifact_names - def test_deletes_orphans_onelevel(self): + def test_cascade_delete_postappend_onelevel(self): sess = create_session() - x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b'))) - sess.add(x2) + x1 = T1(data='t1', ) + x2 = T2(data='t2') + x3 = T3(data='t3') + sess.add_all((x1, x2, x3)) sess.flush() - x2.t2 = None - sess.delete(x2) + sess.delete(x1) + x1.t2 = x2 + x2.t3 = x3 sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) @testing.resolve_artifact_names - def test_deletes_orphans_twolevel(self): + def test_cascade_delete_postappend_twolevel(self): sess = create_session() - x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a'))) - sess.add(x) + x1 = T1(data='t1', t2=T2(data='t2')) + x3 = T3(data='t3') + sess.add_all((x1, x3)) sess.flush() - x.t2.t3 = None - sess.delete(x) + sess.delete(x1) + x1.t2.t3 = x3 sess.flush() eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), []) eq_(sess.query(T3).all(), []) @testing.resolve_artifact_names - def test_finds_orphans_twolevel(self): + def test_preserves_orphans_onelevel(self): sess = create_session() - x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a'))) - sess.add(x) + x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b'))) + sess.add(x2) sess.flush() + x2.t2 = None - x.t2.t3 = None + sess.delete(x2) sess.flush() - eq_(sess.query(T1).all(), [T1()]) + eq_(sess.query(T1).all(), []) eq_(sess.query(T2).all(), [T2()]) - eq_(sess.query(T3).all(), []) + eq_(sess.query(T3).all(), [T3()]) + @testing.future @testing.resolve_artifact_names - def test_single_parent_raise(self): - + def test_preserves_orphans_onelevel_postremove(self): sess = create_session() - - y = T2(data='T2a') - x = T1(data='T1a', t2=y) - assert_raises(sa_exc.InvalidRequestError, T1, data='T1b', t2=y) + x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b'))) + sess.add(x2) + sess.flush() - @testing.resolve_artifact_names - def test_single_parent_backref(self): + sess.delete(x2) + x2.t2 = None + sess.flush() + eq_(sess.query(T1).all(), []) + eq_(sess.query(T2).all(), [T2()]) + eq_(sess.query(T3).all(), [T3()]) + @testing.resolve_artifact_names + def test_preserves_orphans_twolevel(self): sess = create_session() - - y = T3(data='T3a') - x = T2(data='T2a', t3=y) + x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a'))) + sess.add(x) + sess.flush() + + x.t2.t3 = None + sess.delete(x) + sess.flush() + eq_(sess.query(T1).all(), []) + eq_(sess.query(T2).all(), []) + eq_(sess.query(T3).all(), [T3()]) - # cant attach the T3 to another T2 - assert_raises(sa_exc.InvalidRequestError, T2, data='T2b', t3=y) - - # set via backref tho is OK, unsets from previous parent - # first - z = T2(data='T2b') - y.t2 = z - assert z.t3 is y - assert x.t3 is None class M2MCascadeTest(_base.MappedTest): @classmethod @@ -1000,7 +1034,7 @@ class NoBackrefCascadeTest(_fixtures.FixtureTest): sess.commit() -class UnsavedOrphansTest(_base.MappedTest): +class PendingOrphanTestSingleLevel(_base.MappedTest): """Pending entities that are orphans""" @classmethod @@ -1015,32 +1049,60 @@ class UnsavedOrphansTest(_base.MappedTest): test_needs_autoincrement=True), Column('user_id', Integer, ForeignKey('users.user_id')), Column('email_address', String(40))) - + Table('orders', metadata, + Column('order_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False), + ) @classmethod def setup_classes(cls): class User(_fixtures.Base): pass class Address(_fixtures.Base): pass - + class Order(_fixtures.Base): + pass + @testing.resolve_artifact_names def test_pending_standalone_orphan(self): - """An entity that never had a parent on a delete-orphan cascade - can't be saved.""" - + """Standalone 'orphan' objects can now be persisted, if the underlying + constraints of the database allow it. + + This now supports persisting of objects based on foreign key + values alone. + + """ + + mapper(Order, orders) mapper(Address, addresses) mapper(User, users, properties=dict( addresses=relationship(Address, cascade="all,delete-orphan", - backref="user") + backref="user"), + orders=relationship(Order, cascade='all, delete-orphan') )) - s = create_session() + s = Session() + + # the standalone Address goes in, its foreign key + # allows NULL a = Address() s.add(a) - try: - s.flush() - except orm_exc.FlushError, e: - pass - assert a.address_id is None, "Error: address should not be persistent" + s.commit() + + # the standalone Order does not. + o = Order() + s.add(o) + assert_raises(sa_exc.DBAPIError, s.commit) + s.rollback() + + # can assign o.user_id by foreign key, + # flush succeeds + u = User() + s.add(u) + s.flush() + o = Order(user_id=u.user_id) + s.add(o) + s.commit() + assert o in s and o not in s.new + @testing.resolve_artifact_names def test_pending_collection_expunge(self): @@ -1088,69 +1150,93 @@ class UnsavedOrphansTest(_base.MappedTest): eq_(s.query(Address).all(), [Address(email_address='ad1')]) -class UnsavedOrphansTest2(_base.MappedTest): - """same test as UnsavedOrphans only three levels deep""" - +class PendingOrphanTestTwoLevel(_base.MappedTest): + """test usages stated at + + http://article.gmane.org/gmane.comp.python.sqlalchemy.user/3085 + http://article.gmane.org/gmane.comp.python.sqlalchemy.user/3119 + """ + @classmethod - def define_tables(cls, meta): - Table('orders', meta, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('name', String(50))) - - Table('items', meta, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('order_id', Integer, ForeignKey('orders.id'), - nullable=False), - Column('name', String(50))) - - Table('attributes', meta, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('item_id', Integer, ForeignKey('items.id'), - nullable=False), - Column('name', String(50))) - - @testing.resolve_artifact_names - def test_pending_expunge(self): - class Order(_fixtures.Base): + def define_tables(cls, metadata): + Table('order', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True) + ) + Table('item', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('order_id', Integer, ForeignKey('order.id'), nullable=False) + ) + Table('attribute', metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('item_id', Integer, ForeignKey('item.id'), nullable=False) + ) + + @classmethod + def setup_classes(cls): + class Order(_base.ComparableEntity): pass - class Item(_fixtures.Base): + class Item(_base.ComparableEntity): pass - class Attribute(_fixtures.Base): + class Attribute(_base.ComparableEntity): pass + + @testing.resolve_artifact_names + def test_singlelevel_remove(self): + mapper(Order, order, properties={ + 'items':relationship(Item, cascade="all, delete-orphan") + }) + mapper(Item, item) + s = Session() + o1 = Order() + s.add(o1) + + i1 = Item() + o1.items.append(i1) + o1.items.remove(i1) + s.commit() + assert i1 not in o1.items + + @testing.resolve_artifact_names + def test_multilevel_remove(self): + mapper(Order, order, properties={ + 'items':relationship(Item, cascade="all, delete-orphan") + }) + mapper(Item, item, properties={ + 'attributes':relationship(Attribute, cascade="all, delete-orphan") + }) + mapper(Attribute, attribute) + s = Session() + o1 = Order() + s.add(o1) - mapper(Attribute, attributes) - mapper(Item, items, properties=dict( - attributes=relationship(Attribute, cascade="all,delete-orphan", - backref="item") - )) - mapper(Order, orders, properties=dict( - items=relationship(Item, cascade="all,delete-orphan", - backref="order") - )) - - s = create_session() - order = Order(name="order1") - s.add(order) - - attr = Attribute(name="attr1") - item = Item(name="item1", attributes=[attr]) - - order.items.append(item) - order.items.remove(item) + i1 = Item() + a1 = Attribute() + i1.attributes.append(a1) - assert item not in s - assert attr not in s + o1.items.append(i1) - s.flush() - assert orders.count().scalar() == 1 - assert items.count().scalar() == 0 - assert attributes.count().scalar() == 0 + assert i1 in s + assert a1 in s + + # i1 is an orphan so the operation + # removes 'i1'. The operation + # cascades down to 'a1'. + o1.items.remove(i1) -class UnsavedOrphansTest3(_base.MappedTest): - """test not expunging double parents""" + assert i1 not in s + assert a1 not in s + + s.commit() + assert o1 in s + assert a1 not in s + assert i1 not in s + assert a1 not in o1.items + +class DoubleParentO2MOrphanTest(_base.MappedTest): + """Test orphan behavior on an entity that requires + two parents via many-to-one (one-to-many collection.). + + """ @classmethod def define_tables(cls, meta): @@ -1162,6 +1248,7 @@ class UnsavedOrphansTest3(_base.MappedTest): Column('account_id', Integer,primary_key=True, test_needs_autoincrement=True), Column('balance', Integer)) + Table('customers', meta, Column('customer_id', Integer,primary_key=True, test_needs_autoincrement=True), @@ -1254,8 +1341,11 @@ class UnsavedOrphansTest3(_base.MappedTest): 'Should expunge customer when both parents are gone' -class DoubleParentOrphanTest(_base.MappedTest): - """test orphan detection for an entity with two parent relationships""" +class DoubleParentM2OOrphanTest(_base.MappedTest): + """Test orphan behavior on an entity that requires + two parents via one-to-many (many-to-one reference to the orphan). + + """ @classmethod def define_tables(cls, metadata): @@ -1340,62 +1430,12 @@ class DoubleParentOrphanTest(_base.MappedTest): session = create_session() a1 = Address() session.add(a1) - try: - session.flush() - assert False - except orm_exc.FlushError, e: - assert True - -class CollectionAssignmentOrphanTest(_base.MappedTest): - @classmethod - def define_tables(cls, metadata): - Table('table_a', metadata, - Column('id', Integer, - primary_key=True, test_needs_autoincrement=True), - Column('name', String(30))) - Table('table_b', metadata, - Column('id', Integer, - primary_key=True, test_needs_autoincrement=True), - Column('name', String(30)), - Column('a_id', Integer, ForeignKey('table_a.id'))) - - @testing.resolve_artifact_names - def test_basic(self): - class A(_fixtures.Base): - pass - class B(_fixtures.Base): - pass - - mapper(A, table_a, properties={ - 'bs':relationship(B, cascade="all, delete-orphan") - }) - mapper(B, table_b) - - a1 = A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]) - - sess = create_session() - sess.add(a1) - sess.flush() - - sess.expunge_all() - - eq_(sess.query(A).get(a1.id), - A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')])) - - a1 = sess.query(A).get(a1.id) - assert not class_mapper(B)._is_orphan( - attributes.instance_state(a1.bs[0])) - a1.bs[0].foo='b2modified' - a1.bs[1].foo='b3modified' - sess.flush() - - sess.expunge_all() - eq_(sess.query(A).get(a1.id), - A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')])) + session.flush() -class O2MConflictTest(_base.MappedTest): - """test that O2M dependency detects a change in parent, does the - right thing, and even updates the collection/attribute. + +class OrphanMoveTest(_base.MappedTest): + """Test that movement of objects that would be orphaned + to a new parent behaves properly. """ @@ -1420,7 +1460,7 @@ class O2MConflictTest(_base.MappedTest): pass @testing.resolve_artifact_names - def _do_delete_old_test(self): + def _do_move_test(self, delete_old): sess = create_session() p1, p2, c1 = Parent(), Parent(), Child() @@ -1431,7 +1471,8 @@ class O2MConflictTest(_base.MappedTest): sess.add_all([p1, c1]) sess.flush() - sess.delete(p1) + if delete_old: + sess.delete(p1) if Parent.child.property.uselist: p2.child.append(c1) @@ -1442,35 +1483,14 @@ class O2MConflictTest(_base.MappedTest): sess.flush() eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1]) - @testing.resolve_artifact_names - def _do_move_test(self): - sess = create_session() - - p1, p2, c1 = Parent(), Parent(), Child() - if Parent.child.property.uselist: - p1.child.append(c1) - else: - p1.child = c1 - sess.add_all([p1, c1]) - sess.flush() - - if Parent.child.property.uselist: - p2.child.append(c1) - else: - p2.child = c1 - sess.add(p2) - - sess.flush() - eq_(sess.query(Child).filter(Child.parent_id==p2.id).all(), [c1]) - @testing.resolve_artifact_names def test_o2o_delete_old(self): mapper(Parent, parent, properties={ 'child':relationship(Child, uselist=False) }) mapper(Child, child) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) @testing.resolve_artifact_names def test_o2m_delete_old(self): @@ -1478,8 +1498,8 @@ class O2MConflictTest(_base.MappedTest): 'child':relationship(Child, uselist=True) }) mapper(Child, child) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) @testing.resolve_artifact_names def test_o2o_backref_delete_old(self): @@ -1487,8 +1507,8 @@ class O2MConflictTest(_base.MappedTest): 'child':relationship(Child, uselist=False, backref='parent') }) mapper(Child, child) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) @testing.resolve_artifact_names def test_o2o_delcascade_delete_old(self): @@ -1496,8 +1516,8 @@ class O2MConflictTest(_base.MappedTest): 'child':relationship(Child, uselist=False, cascade="all, delete") }) mapper(Child, child) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) @testing.resolve_artifact_names def test_o2o_delorphan_delete_old(self): @@ -1506,8 +1526,8 @@ class O2MConflictTest(_base.MappedTest): cascade="all, delete, delete-orphan") }) mapper(Child, child) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) @testing.resolve_artifact_names def test_o2o_delorphan_backref_delete_old(self): @@ -1517,8 +1537,8 @@ class O2MConflictTest(_base.MappedTest): backref='parent') }) mapper(Child, child) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) @testing.resolve_artifact_names def test_o2o_backref_delorphan_delete_old(self): @@ -1528,8 +1548,8 @@ class O2MConflictTest(_base.MappedTest): backref=backref('child', uselist=False), cascade="all,delete,delete-orphan") }) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) @testing.resolve_artifact_names def test_o2m_backref_delorphan_delete_old(self): @@ -1539,8 +1559,8 @@ class O2MConflictTest(_base.MappedTest): backref=backref('child', uselist=True), cascade="all,delete,delete-orphan") }) - self._do_delete_old_test() - self._do_move_test() + self._do_move_test(True) + self._do_move_test(False) class PartialFlushTest(_base.MappedTest): -- 2.47.2