From: Mike Bayer Date: Sat, 10 Jul 2010 18:50:13 +0000 (-0400) Subject: - experimenting with pytidy with mods as a textmate plugin along X-Git-Tag: rel_0_6_3~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5cce6bf2a86cb318d80953cd3712fcb77bbc52db;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - experimenting with pytidy with mods as a textmate plugin along the path to 78 chars. eh --- diff --git a/test/orm/test_cascade.py b/test/orm/test_cascade.py index 7b07898a57..1c935df139 100644 --- a/test/orm/test_cascade.py +++ b/test/orm/test_cascade.py @@ -1,8 +1,10 @@ from sqlalchemy.test.testing import assert_raises, assert_raises_message -from sqlalchemy import Integer, String, ForeignKey, Sequence, exc as sa_exc +from sqlalchemy import Integer, String, ForeignKey, Sequence, \ + exc as sa_exc from sqlalchemy.test.schema import Table, Column -from sqlalchemy.orm import mapper, relationship, create_session, sessionmaker, class_mapper, backref +from sqlalchemy.orm import mapper, relationship, create_session, \ + sessionmaker, class_mapper, backref from sqlalchemy.orm import attributes, exc as orm_exc from sqlalchemy.test import testing from sqlalchemy.test.testing import eq_ @@ -16,14 +18,13 @@ class O2MCascadeTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def setup_mappers(cls): mapper(Address, addresses) - mapper(User, users, properties = dict( - addresses = relationship(Address, cascade="all, delete-orphan", backref="user"), - orders = relationship( - mapper(Order, orders), cascade="all, delete-orphan", order_by=orders.c.id) - )) - mapper(Dingaling,dingalings, properties={ - 'address':relationship(Address) - }) + mapper(User, users, + properties=dict(addresses=relationship(Address, + cascade='all, delete-orphan', backref='user'), + orders=relationship(mapper(Order, orders), + cascade='all, delete-orphan', order_by=orders.c.id))) + mapper(Dingaling, dingalings, properties={'address' + : relationship(Address)}) @testing.resolve_artifact_names def test_list_assignment(self): @@ -58,18 +59,18 @@ class O2MCascadeTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_save_update_sends_pending(self): - """test that newly added and deleted collection items are cascaded on save-update""" - + """test that newly added and deleted collection items are + cascaded on save-update""" + sess = sessionmaker(expire_on_commit=False)() - o1, o2, o3 = Order(description='o1'), Order(description='o2'), Order(description='o3') + o1, o2, o3 = Order(description='o1'), Order(description='o2'), \ + Order(description='o3') u = User(name='jack', orders=[o1, o2]) sess.add(u) sess.commit() sess.close() - u.orders.append(o3) u.orders.remove(o1) - sess.add(u) assert o1 in sess assert o2 in sess @@ -93,7 +94,8 @@ class O2MCascadeTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_delete_unloaded_collections(self): - """Unloaded collections are still included in a delete-cascade by default.""" + """Unloaded collections are still included in a delete-cascade + by default.""" sess = create_session() u = User(name='jack', addresses=[Address(email_address="address1"), @@ -114,8 +116,8 @@ class O2MCascadeTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_cascades_onlycollection(self): - """Cascade only reaches instances that are still part of the collection, - not those that have been removed""" + """Cascade only reaches instances that are still part of the + collection, not those that have been removed""" sess = create_session() u = User(name='jack', @@ -143,7 +145,8 @@ class O2MCascadeTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_cascade_nosideeffects(self): - """test that cascade leaves the state of unloaded scalars/collections unchanged.""" + """test that cascade leaves the state of unloaded + scalars/collections unchanged.""" sess = create_session() u = User(name='jack') @@ -210,17 +213,16 @@ class O2OCascadeTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def setup_mappers(cls): mapper(Address, addresses) - mapper(User, users, properties = { - 'address':relationship(Address, backref=backref("user", single_parent=True), uselist=False) - }) + mapper(User, users, properties={'address' + : relationship(Address, backref=backref('user', + single_parent=True), uselist=False)}) @testing.resolve_artifact_names def test_single_parent_raise(self): a1 = Address(email_address='some address') u1 = User(name='u1', address=a1) - - assert_raises(sa_exc.InvalidRequestError, Address, email_address='asd', user=u1) - + assert_raises(sa_exc.InvalidRequestError, Address, + email_address='asd', user=u1) a2 = Address(email_address='asd') u1.address = a2 assert u1.address is not a1 @@ -234,10 +236,9 @@ class O2MBackrefTest(_fixtures.FixtureTest): @classmethod @testing.resolve_artifact_names def setup_mappers(cls): - mapper(User, users, properties = dict( - orders = relationship( - mapper(Order, orders), cascade="all, delete-orphan", backref="user") - )) + mapper(User, users, + properties=dict(orders=relationship(mapper(Order, + orders), cascade='all, delete-orphan', backref='user'))) @testing.resolve_artifact_names def test_lazyload_bug(self): @@ -309,9 +310,9 @@ class NoSaveCascadeTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_unidirectional_cascade_m2m(self): - mapper(Item, items, properties={ - 'keywords':relationship(Keyword, secondary=item_keywords, cascade="none", backref="items") - }) + mapper(Item, items, properties={'keywords' + : relationship(Keyword, secondary=item_keywords, + cascade='none', backref='items')}) mapper(Keyword, keywords) sess = create_session() @@ -363,27 +364,28 @@ class O2MCascadeNoOrphanTest(_fixtures.FixtureTest): class M2OCascadeTest(_base.MappedTest): + @classmethod def define_tables(cls, metadata): - Table("extra", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), - Column("prefs_id", Integer, ForeignKey("prefs.id"))) - - Table('prefs', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(40))) - - Table('users', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table('extra', metadata, Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('prefs_id', Integer, ForeignKey('prefs.id'))) + Table('prefs', metadata, Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('data', String(40))) + Table( + 'users', + metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(40)), Column('pref_id', Integer, ForeignKey('prefs.id')), - Column('foo_id', Integer, ForeignKey('foo.id')) + Column('foo_id', Integer, ForeignKey('foo.id')), ) + Table('foo', metadata, Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', + String(40))) - Table('foo', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(40)) - ) @classmethod def setup_classes(cls): class User(_fixtures.Base): @@ -399,13 +401,11 @@ class M2OCascadeTest(_base.MappedTest): @testing.resolve_artifact_names def setup_mappers(cls): mapper(Extra, extra) - mapper(Pref, prefs, properties=dict( - extra = relationship(Extra, cascade="all, delete") - )) - mapper(User, users, properties = dict( - pref = relationship(Pref, lazy='joined', cascade="all, delete-orphan", single_parent=True ), - foo = relationship(Foo) # straight m2o - )) + mapper(Pref, prefs, properties=dict(extra=relationship(Extra, + cascade='all, delete'))) + mapper(User, users, properties=dict(pref=relationship(Pref, + lazy='joined', cascade='all, delete-orphan', + single_parent=True), foo=relationship(Foo))) # straight m2o mapper(Foo, foo) @classmethod @@ -458,7 +458,8 @@ class M2OCascadeTest(_base.MappedTest): @testing.resolve_artifact_names def test_save_update_sends_pending(self): - """test that newly added and deleted scalar items are cascaded on save-update""" + """test that newly added and deleted scalar items are cascaded + on save-update""" sess = sessionmaker(expire_on_commit=False)() p1, p2 = Pref(data='p1'), Pref(data='p2') @@ -524,18 +525,23 @@ class M2OCascadeTest(_base.MappedTest): [Pref(data="pref 1"), Pref(data="pref 3"), Pref(data="newpref")]) class M2OCascadeDeleteTest(_base.MappedTest): + @classmethod def define_tables(cls, metadata): - Table('t1', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(50)), + Table('t1', metadata, Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data',String(50)), Column('t2id', Integer, ForeignKey('t2.id'))) - Table('t2', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(50)), + + Table('t2', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data',String(50)), Column('t3id', Integer, ForeignKey('t3.id'))) - Table('t3', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + + Table('t3', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) @classmethod @@ -648,15 +654,20 @@ class M2OCascadeDeleteOrphanTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('t1', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50)), Column('t2id', Integer, ForeignKey('t2.id'))) + Table('t2', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50)), Column('t3id', Integer, ForeignKey('t3.id'))) + Table('t3', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) @classmethod @@ -671,10 +682,11 @@ class M2OCascadeDeleteOrphanTest(_base.MappedTest): @classmethod @testing.resolve_artifact_names def setup_mappers(cls): - mapper(T1, t1, properties=dict( - t2=relationship(T2, cascade="all, delete-orphan", single_parent=True))) - mapper(T2, t2, properties=dict( - t3=relationship(T3, cascade="all, delete-orphan", single_parent=True, backref=backref('t2', uselist=False)))) + mapper(T1, t1, properties=dict(t2=relationship(T2, + cascade='all, delete-orphan', single_parent=True))) + mapper(T2, t2, properties=dict(t3=relationship(T3, + cascade='all, delete-orphan', single_parent=True, + backref=backref('t2', uselist=False)))) mapper(T3, t3) @testing.resolve_artifact_names @@ -763,12 +775,14 @@ class M2MCascadeTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('a', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(30)), test_needs_fk=True ) Table('b', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(30)), test_needs_fk=True @@ -780,7 +794,8 @@ class M2MCascadeTest(_base.MappedTest): ) Table('c', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(30)), Column('bid', Integer, ForeignKey('b.id')), test_needs_fk=True @@ -798,11 +813,12 @@ class M2MCascadeTest(_base.MappedTest): @testing.resolve_artifact_names def test_delete_orphan(self): - mapper(A, a, properties={ - # if no backref here, delete-orphan failed until [ticket:427] was - # fixed - 'bs': relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True) - }) + + # if no backref here, delete-orphan failed until [ticket:427] + # was fixed + + mapper(A, a, properties={'bs': relationship(B, secondary=atob, + cascade='all, delete-orphan', single_parent=True)}) mapper(B, b) sess = create_session() @@ -819,12 +835,10 @@ class M2MCascadeTest(_base.MappedTest): @testing.resolve_artifact_names def test_delete_orphan_dynamic(self): - mapper(A, a, properties={ - # if no backref here, delete-orphan failed until [ticket:427] was - # fixed - 'bs': relationship(B, secondary=atob, - cascade="all, delete-orphan", single_parent=True,lazy="dynamic") - }) + mapper(A, a, properties={'bs': relationship(B, secondary=atob, + cascade='all, delete-orphan', single_parent=True, + lazy='dynamic')}) # if no backref here, delete-orphan + # failed until [ticket:427] was fixed mapper(B, b) sess = create_session() @@ -844,9 +858,11 @@ class M2MCascadeTest(_base.MappedTest): mapper(A, a, properties={ # if no backref here, delete-orphan failed until [ticket:427] was # fixed - 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True) + 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", + single_parent=True) }) - mapper(B, b, properties={'cs':relationship(C, cascade="all, delete-orphan")}) + mapper(B, b, properties={'cs': + relationship(C, cascade="all, delete-orphan")}) mapper(C, c) sess = create_session() @@ -865,7 +881,8 @@ class M2MCascadeTest(_base.MappedTest): @testing.resolve_artifact_names def test_cascade_delete(self): mapper(A, a, properties={ - 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True) + 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", + single_parent=True) }) mapper(B, b) @@ -883,7 +900,8 @@ class M2MCascadeTest(_base.MappedTest): @testing.resolve_artifact_names def test_single_parent_raise(self): mapper(A, a, properties={ - 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True) + 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", + single_parent=True) }) mapper(B, b) @@ -927,11 +945,13 @@ class UnsavedOrphansTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('users', metadata, - Column('user_id', Integer,primary_key=True, test_needs_autoincrement=True), + Column('user_id', Integer,primary_key=True, + test_needs_autoincrement=True), Column('name', String(40))) Table('addresses', metadata, - Column('address_id', Integer,primary_key=True, test_needs_autoincrement=True), + Column('address_id', Integer,primary_key=True, + test_needs_autoincrement=True), Column('user_id', Integer, ForeignKey('users.user_id')), Column('email_address', String(40))) @@ -944,11 +964,13 @@ class UnsavedOrphansTest(_base.MappedTest): @testing.resolve_artifact_names def test_pending_standalone_orphan(self): - """An entity that never had a parent on a delete-orphan cascade can't be saved.""" + """An entity that never had a parent on a delete-orphan cascade + can't be saved.""" mapper(Address, addresses) mapper(User, users, properties=dict( - addresses=relationship(Address, cascade="all,delete-orphan", backref="user") + addresses=relationship(Address, cascade="all,delete-orphan", + backref="user") )) s = create_session() a = Address() @@ -961,11 +983,13 @@ class UnsavedOrphansTest(_base.MappedTest): @testing.resolve_artifact_names def test_pending_collection_expunge(self): - """Removing a pending item from a collection expunges it from the session.""" + """Removing a pending item from a collection expunges it from + the session.""" mapper(Address, addresses) mapper(User, users, properties=dict( - addresses=relationship(Address, cascade="all,delete-orphan", backref="user") + addresses=relationship(Address, cascade="all,delete-orphan", + backref="user") )) s = create_session() @@ -989,7 +1013,8 @@ class UnsavedOrphansTest(_base.MappedTest): def test_nonorphans_ok(self): mapper(Address, addresses) mapper(User, users, properties=dict( - addresses=relationship(Address, cascade="all,delete", backref="user") + addresses=relationship(Address, cascade="all,delete", + backref="user") )) s = create_session() u = User(name='u1', addresses=[Address(email_address='ad1')]) @@ -1008,17 +1033,20 @@ class UnsavedOrphansTest2(_base.MappedTest): @classmethod def define_tables(cls, meta): Table('orders', meta, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50))) Table('items', meta, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('order_id', Integer, ForeignKey('orders.id'), nullable=False), Column('name', String(50))) Table('attributes', meta, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('item_id', Integer, ForeignKey('items.id'), nullable=False), Column('name', String(50))) @@ -1034,10 +1062,12 @@ class UnsavedOrphansTest2(_base.MappedTest): mapper(Attribute, attributes) mapper(Item, items, properties=dict( - attributes=relationship(Attribute, cascade="all,delete-orphan", backref="item") + attributes=relationship(Attribute, cascade="all,delete-orphan", + backref="item") )) mapper(Order, orders, properties=dict( - items=relationship(Item, cascade="all,delete-orphan", backref="order") + items=relationship(Item, cascade="all,delete-orphan", + backref="order") )) s = create_session() @@ -1064,13 +1094,16 @@ class UnsavedOrphansTest3(_base.MappedTest): @classmethod def define_tables(cls, meta): Table('sales_reps', meta, - Column('sales_rep_id', Integer,primary_key=True, test_needs_autoincrement=True), + Column('sales_rep_id', Integer,primary_key=True, + test_needs_autoincrement=True), Column('name', String(50))) Table('accounts', meta, - Column('account_id', Integer,primary_key=True, test_needs_autoincrement=True), + Column('account_id', Integer,primary_key=True, + test_needs_autoincrement=True), Column('balance', Integer)) Table('customers', meta, - Column('customer_id', Integer,primary_key=True, test_needs_autoincrement=True), + Column('customer_id', Integer,primary_key=True, + test_needs_autoincrement=True), Column('name', String(50)), Column('sales_rep_id', Integer, ForeignKey('sales_reps.sales_rep_id')), @@ -1079,7 +1112,8 @@ class UnsavedOrphansTest3(_base.MappedTest): @testing.resolve_artifact_names def test_double_parent_expunge_o2m(self): - """test the delete-orphan uow event for multiple delete-orphan parent relationships.""" + """test the delete-orphan uow event for multiple delete-orphan + parent relationships.""" class Customer(_fixtures.Base): pass @@ -1114,11 +1148,13 @@ class UnsavedOrphansTest3(_base.MappedTest): assert c in s, "Should not expunge customer yet, still has one parent" sr.customers.remove(c) - assert c not in s, "Should expunge customer when both parents are gone" + assert c not in s, \ + 'Should expunge customer when both parents are gone' @testing.resolve_artifact_names def test_double_parent_expunge_o2o(self): - """test the delete-orphan uow event for multiple delete-orphan parent relationships.""" + """test the delete-orphan uow event for multiple delete-orphan + parent relationships.""" class Customer(_fixtures.Base): pass @@ -1153,7 +1189,8 @@ class UnsavedOrphansTest3(_base.MappedTest): assert c in s, "Should not expunge customer yet, still has one parent" sr.customer = None - assert c not in s, "Should expunge customer when both parents are gone" + assert c not in s, \ + 'Should expunge customer when both parents are gone' class DoubleParentOrphanTest(_base.MappedTest): @@ -1162,19 +1199,22 @@ class DoubleParentOrphanTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('addresses', metadata, - Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('address_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('street', String(30)), ) Table('homes', metadata, - Column('home_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True), + Column('home_id', Integer, primary_key=True, key="id", + test_needs_autoincrement=True), Column('description', String(30)), Column('address_id', Integer, ForeignKey('addresses.address_id'), nullable=False), ) Table('businesses', metadata, - Column('business_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True), + Column('business_id', Integer, primary_key=True, key="id", + test_needs_autoincrement=True), Column('description', String(30), key="description"), Column('address_id', Integer, ForeignKey('addresses.address_id'), nullable=False), @@ -1182,7 +1222,8 @@ class DoubleParentOrphanTest(_base.MappedTest): @testing.resolve_artifact_names def test_non_orphan(self): - """test that an entity can have two parent delete-orphan cascades, and persists normally.""" + """test that an entity can have two parent delete-orphan + cascades, and persists normally.""" class Address(_fixtures.Base): pass @@ -1192,35 +1233,49 @@ class DoubleParentOrphanTest(_base.MappedTest): pass mapper(Address, addresses) - mapper(Home, homes, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)}) - mapper(Business, businesses, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)}) + mapper(Home, homes, properties={'address' + : relationship(Address, cascade='all,delete-orphan', + single_parent=True)}) + mapper(Business, businesses, properties={'address' + : relationship(Address, cascade='all,delete-orphan', + single_parent=True)}) session = create_session() h1 = Home(description='home1', address=Address(street='address1')) - b1 = Business(description='business1', address=Address(street='address2')) + b1 = Business(description='business1', + address=Address(street='address2')) session.add_all((h1,b1)) session.flush() session.expunge_all() - eq_(session.query(Home).get(h1.id), Home(description='home1', address=Address(street='address1'))) - eq_(session.query(Business).get(b1.id), Business(description='business1', address=Address(street='address2'))) + eq_(session.query(Home).get(h1.id), Home(description='home1', + address=Address(street='address1'))) + eq_(session.query(Business).get(b1.id), + Business(description='business1', + address=Address(street='address2'))) @testing.resolve_artifact_names def test_orphan(self): - """test that an entity can have two parent delete-orphan cascades, and is detected as an orphan - when saved without a parent.""" + """test that an entity can have two parent delete-orphan + cascades, and is detected as an orphan when saved without a + parent.""" class Address(_fixtures.Base): pass + class Home(_fixtures.Base): pass + class Business(_fixtures.Base): pass mapper(Address, addresses) - mapper(Home, homes, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)}) - mapper(Business, businesses, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)}) - + mapper(Home, homes, properties={'address' + : relationship(Address, cascade='all,delete-orphan', + single_parent=True)}) + mapper(Business, businesses, properties={'address' + : relationship(Address, cascade='all,delete-orphan', + single_parent=True)}) session = create_session() a1 = Address() session.add(a1) @@ -1233,12 +1288,14 @@ class DoubleParentOrphanTest(_base.MappedTest): class CollectionAssignmentOrphanTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): - Table('table_a', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table('table_a', metadata, + Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), Column('name', String(30))) - Table('table_b', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('name', String(30)), + Table('table_b', metadata, + Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('name', String(30)), Column('a_id', Integer, ForeignKey('table_a.id'))) @testing.resolve_artifact_names @@ -1284,11 +1341,14 @@ class O2MConflictTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table("parent", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True) + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True) ) Table("child", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), - Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False) + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), + Column('parent_id', Integer, ForeignKey('parent.id'), + nullable=False) ) @classmethod @@ -1381,7 +1441,8 @@ class O2MConflictTest(_base.MappedTest): @testing.resolve_artifact_names def test_o2o_delorphan_delete_old(self): mapper(Parent, parent, properties={ - 'child':relationship(Child, uselist=False, cascade="all, delete, delete-orphan") + 'child':relationship(Child, uselist=False, + cascade="all, delete, delete-orphan") }) mapper(Child, child) self._do_delete_old_test() @@ -1428,12 +1489,14 @@ class PartialFlushTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table("base", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), Column("descr", String(50)) ) Table("noninh_child", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('base_id', Integer, ForeignKey('base.id')) ) diff --git a/test/orm/test_session.py b/test/orm/test_session.py index 36709dbcb3..22de94e2b3 100644 --- a/test/orm/test_session.py +++ b/test/orm/test_session.py @@ -1,8 +1,10 @@ -from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.test.testing import eq_, assert_raises, \ + assert_raises_message from sqlalchemy.test.util import gc_collect import inspect import pickle -from sqlalchemy.orm import create_session, sessionmaker, attributes, make_transient +from sqlalchemy.orm import create_session, sessionmaker, attributes, \ + make_transient from sqlalchemy.orm.attributes import instance_state import sqlalchemy as sa from sqlalchemy.test import engines, testing, config @@ -127,9 +129,11 @@ class SessionTest(_fixtures.FixtureTest): [User(id=1, name='ed')]) # test expression binding - sess.execute(users_unbound.insert(), params=dict(id=2, name='jack')) - eq_(sess.execute(users_unbound.select(users_unbound.c.id == 2)).fetchall(), - [(2, 'jack')]) + + sess.execute(users_unbound.insert(), params=dict(id=2, + name='jack')) + eq_(sess.execute(users_unbound.select(users_unbound.c.id + == 2)).fetchall(), [(2, 'jack')]) eq_(sess.execute(users_unbound.select(User.id == 2)).fetchall(), [(2, 'jack')]) @@ -164,8 +168,9 @@ class SessionTest(_fixtures.FixtureTest): [User(id=1, name='ed')]) sess.execute(users_unbound.insert(), params=dict(id=2, name='jack')) - eq_(sess.execute(users_unbound.select(users_unbound.c.id == 2)).fetchall(), - [(2, 'jack')]) + + eq_(sess.execute(users_unbound.select(users_unbound.c.id + == 2)).fetchall(), [(2, 'jack')]) eq_(sess.execute(users_unbound.select(User.id == 2)).fetchall(), [(2, 'jack')]) @@ -206,7 +211,9 @@ class SessionTest(_fixtures.FixtureTest): assert conn2.execute("select count(1) from users").scalar() == 0 sess.commit() assert conn1.execute("select count(1) from users").scalar() == 1 - assert testing.db.connect().execute("select count(1) from users").scalar() == 1 + + assert testing.db.connect().execute('select count(1) from users' + ).scalar() == 1 sess.close() @testing.requires.independent_connections @@ -286,24 +293,28 @@ class SessionTest(_fixtures.FixtureTest): eq_(q.one(), Address(email_address='foo')) + @testing.requires.independent_connections @engines.close_open_connections @testing.resolve_artifact_names def test_autoflush_unbound(self): mapper(User, users) - try: sess = create_session(autocommit=False, autoflush=True) u = User() - u.name='ed' + u.name = 'ed' sess.add(u) u2 = sess.query(User).filter_by(name='ed').one() assert u2 is u - assert sess.execute("select count(1) from users", mapper=User).scalar() == 1 - assert testing.db.connect().execute("select count(1) from users").scalar() == 0 + assert sess.execute('select count(1) from users', + mapper=User).scalar() == 1 + assert testing.db.connect().execute('select count(1) from ' + 'users').scalar() == 0 sess.commit() - assert sess.execute("select count(1) from users", mapper=User).scalar() == 1 - assert testing.db.connect().execute("select count(1) from users").scalar() == 1 + assert sess.execute('select count(1) from users', + mapper=User).scalar() == 1 + assert testing.db.connect().execute('select count(1) from ' + 'users').scalar() == 1 sess.close() except: sess.rollback() @@ -315,14 +326,15 @@ class SessionTest(_fixtures.FixtureTest): mapper(User, users) conn1 = testing.db.connect() conn2 = testing.db.connect() - - sess = create_session(bind=conn1, autocommit=False, autoflush=True) + sess = create_session(bind=conn1, autocommit=False, + autoflush=True) u = User() - u.name='ed' + u.name = 'ed' sess.add(u) sess.commit() - assert conn1.execute("select count(1) from users").scalar() == 1 - assert testing.db.connect().execute("select count(1) from users").scalar() == 1 + assert conn1.execute('select count(1) from users').scalar() == 1 + assert testing.db.connect().execute('select count(1) from users' + ).scalar() == 1 sess.commit() @testing.resolve_artifact_names @@ -432,23 +444,23 @@ class SessionTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_heavy_nesting(self): session = create_session(bind=testing.db) - session.begin() - session.connection().execute(users.insert().values(name='user1')) - + session.connection().execute(users.insert().values(name='user1' + )) session.begin(subtransactions=True) - session.begin_nested() - - session.connection().execute(users.insert().values(name='user2')) - assert session.connection().execute("select count(1) from users").scalar() == 2 - + session.connection().execute(users.insert().values(name='user2' + )) + assert session.connection().execute('select count(1) from users' + ).scalar() == 2 session.rollback() - assert session.connection().execute("select count(1) from users").scalar() == 1 - session.connection().execute(users.insert().values(name='user3')) - + assert session.connection().execute('select count(1) from users' + ).scalar() == 1 + session.connection().execute(users.insert().values(name='user3' + )) session.commit() - assert session.connection().execute("select count(1) from users").scalar() == 2 + assert session.connection().execute('select count(1) from users' + ).scalar() == 2 @testing.fails_on('sqlite', 'FIXME: unknown') @testing.resolve_artifact_names @@ -633,27 +645,26 @@ class SessionTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_error_on_using_inactive_session(self): mapper(User, users) - sess = create_session(autocommit=True) - sess.begin() sess.begin(subtransactions=True) - sess.add(User(name='u1')) sess.flush() - sess.rollback() - assert_raises_message(sa.exc.InvalidRequestError, "inactive due to a rollback in a subtransaction", sess.begin, subtransactions=True) + assert_raises_message(sa.exc.InvalidRequestError, + 'inactive due to a rollback in a ' + 'subtransaction', sess.begin, + subtransactions=True) sess.close() @testing.resolve_artifact_names def test_no_autocommit_with_explicit_commit(self): mapper(User, users) session = create_session(autocommit=False) - session.add(User(name='ed')) session.transaction.commit() - assert session.transaction is not None, "autocommit=False should start a new transaction" + assert session.transaction is not None, \ + 'autocommit=False should start a new transaction' @engines.close_open_connections @testing.resolve_artifact_names @@ -666,10 +677,14 @@ class SessionTest(_fixtures.FixtureTest): u = User(name='u1') sess.add(u) sess.flush() - assert transaction._connection_for_bind(testing.db) is transaction._connection_for_bind(c) is c - - assert_raises_message(sa.exc.InvalidRequestError, "Session already has a Connection associated", transaction._connection_for_bind, testing.db.connect()) - + assert transaction._connection_for_bind(testing.db) \ + is transaction._connection_for_bind(c) is c + + assert_raises_message(sa.exc.InvalidRequestError, + 'Session already has a Connection ' + 'associated', + transaction._connection_for_bind, + testing.db.connect()) transaction.rollback() assert len(sess.query(User).all()) == 0 sess.close() @@ -722,7 +737,8 @@ class SessionTest(_fixtures.FixtureTest): user = User(name='u1') - assert_raises_message(sa.exc.InvalidRequestError, "is not persisted", s.delete, user) + assert_raises_message(sa.exc.InvalidRequestError, + 'is not persisted', s.delete, user) s.add(user) s.flush() @@ -749,11 +765,12 @@ class SessionTest(_fixtures.FixtureTest): assert user not in s.dirty s2 = create_session() - assert_raises_message(sa.exc.InvalidRequestError, "is already attached to session", s2.delete, user) - + assert_raises_message(sa.exc.InvalidRequestError, + 'is already attached to session', + s2.delete, user) u2 = s2.query(User).get(user.id) - assert_raises_message(sa.exc.InvalidRequestError, "another instance with key", s.delete, u2) - + assert_raises_message(sa.exc.InvalidRequestError, + 'another instance with key', s.delete, u2) s.expire(user) s.expunge(user) assert user not in s @@ -809,7 +826,8 @@ class SessionTest(_fixtures.FixtureTest): @testing.resolve_artifact_names def test_weak_ref(self): - """test the weak-referencing identity map, which strongly-references modified items.""" + """test the weak-referencing identity map, which strongly- + references modified items.""" s = create_session() mapper(User, users) @@ -885,7 +903,8 @@ class SessionTest(_fixtures.FixtureTest): s.expunge(u2) s.identity_map.add(sa.orm.attributes.instance_state(u1)) - assert_raises(AssertionError, s.identity_map.add, sa.orm.attributes.instance_state(u2)) + assert_raises(AssertionError, s.identity_map.add, + sa.orm.attributes.instance_state(u2)) @testing.resolve_artifact_names @@ -1078,43 +1097,61 @@ class SessionTest(_fixtures.FixtureTest): log.append('after_begin') def after_attach(self, session, instance): log.append('after_attach') - def after_bulk_update(self, session, query, query_context, result): + def after_bulk_update( + self, + session, + query, + query_context, + result, + ): log.append('after_bulk_update') - def after_bulk_delete(self, session, query, query_context, result): + + def after_bulk_delete( + self, + session, + query, + query_context, + result, + ): log.append('after_bulk_delete') sess = create_session(extension = MyExt()) u = User(name='u1') sess.add(u) sess.flush() - assert log == ['after_attach', 'before_flush', 'after_begin', 'after_flush', 'before_commit', 'after_commit', 'after_flush_postexec'] - + assert log == [ + 'after_attach', + 'before_flush', + 'after_begin', + 'after_flush', + 'before_commit', + 'after_commit', + 'after_flush_postexec', + ] log = [] sess = create_session(autocommit=False, extension=MyExt()) u = User(name='u1') sess.add(u) sess.flush() - assert log == ['after_attach', 'before_flush', 'after_begin', 'after_flush', 'after_flush_postexec'] - + assert log == ['after_attach', 'before_flush', 'after_begin', + 'after_flush', 'after_flush_postexec'] log = [] u.name = 'ed' sess.commit() - assert log == ['before_commit', 'before_flush', 'after_flush', 'after_flush_postexec', 'after_commit'] - + assert log == ['before_commit', 'before_flush', 'after_flush', + 'after_flush_postexec', 'after_commit'] log = [] sess.commit() assert log == ['before_commit', 'after_commit'] - log = [] sess.query(User).delete() assert log == ['after_begin', 'after_bulk_delete'] - log = [] sess.query(User).update({'name': 'foo'}) assert log == ['after_bulk_update'] - log = [] - sess = create_session(autocommit=False, extension=MyExt(), bind=testing.db) + sess = create_session(autocommit=False, extension=MyExt(), + bind=testing.db) conn = sess.connection() assert log == ['after_begin'] @@ -1131,7 +1168,8 @@ class SessionTest(_fixtures.FixtureTest): session.add(User(name='another %s' % obj.name)) for obj in list(session.deleted): if isinstance(obj, User): - x = session.query(User).filter(User.name=='another %s' % obj.name).one() + x = session.query(User).filter(User.name + == 'another %s' % obj.name).one() session.delete(x) sess = create_session(extension = MyExt(), autoflush=True) @@ -1211,21 +1249,20 @@ class SessionTest(_fixtures.FixtureTest): sess = create_session(extension=MyExt()) sess.add(User(name='foo')) - assert_raises_message(sa.exc.InvalidRequestError, "already flushing", sess.flush) + assert_raises_message(sa.exc.InvalidRequestError, + 'already flushing', sess.flush) @testing.resolve_artifact_names def test_pickled_update(self): mapper(User, users) sess1 = create_session() sess2 = create_session() - u1 = User(name='u1') sess1.add(u1) - - assert_raises_message(sa.exc.InvalidRequestError, "already attached to session", sess2.add, u1) - + assert_raises_message(sa.exc.InvalidRequestError, + 'already attached to session', sess2.add, + u1) u2 = pickle.loads(pickle.dumps(u1)) - sess2.add(u2) @testing.resolve_artifact_names @@ -1293,10 +1330,9 @@ class DisposedStates(_base.MappedTest): @classmethod def define_tables(cls, metadata): global t1 - t1 = Table('t1', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(50)) - ) + t1 = Table('t1', metadata, Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('data', String(50))) @classmethod def setup_mappers(cls): @@ -1312,20 +1348,22 @@ class DisposedStates(_base.MappedTest): super(DisposedStates, self).teardown() def _set_imap_in_disposal(self, sess, *objs): - """remove selected objects from the given session, as though they - were dereferenced and removed from WeakIdentityMap. + """remove selected objects from the given session, as though + they were dereferenced and removed from WeakIdentityMap. - Hardcodes the identity map's "all_states()" method to return the full list - of states. This simulates the all_states() method returning results, afterwhich - some of the states get garbage collected (this normally only happens during - asynchronous gc). The Session now has one or more - InstanceState's which have been removed from the identity map and disposed. + Hardcodes the identity map's "all_states()" method to return the + full list of states. This simulates the all_states() method + returning results, afterwhich some of the states get garbage + collected (this normally only happens during asynchronous gc). + The Session now has one or more InstanceState's which have been + removed from the identity map and disposed. Will the Session not trip over this ??? Stay tuned. """ + all_states = sess.identity_map.all_states() - sess.identity_map.all_states = lambda: all_states + sess.identity_map.all_states = lambda : all_states for obj in objs: state = attributes.instance_state(obj) sess.identity_map.remove(state) @@ -1335,7 +1373,8 @@ class DisposedStates(_base.MappedTest): global sess sess = create_session(**kwargs) - data = o1, o2, o3, o4, o5 = [T('t1'), T('t2'), T('t3'), T('t4'), T('t5')] + data = o1, o2, o3, o4, o5 = [T('t1'), T('t2'), T('t3'), T('t4' + ), T('t5')] sess.add_all(data) @@ -1392,8 +1431,9 @@ class SessionInterface(testing.TestBase): return ok def _map_it(self, cls): - return mapper(cls, Table('t', sa.MetaData(), - Column('id', Integer, primary_key=True, test_needs_autoincrement=True))) + return mapper(cls, Table('t', sa.MetaData(), Column('id', + Integer, primary_key=True, + test_needs_autoincrement=True))) @testing.uses_deprecated() def _test_instance_guards(self, user_arg): @@ -1437,7 +1477,8 @@ class SessionInterface(testing.TestBase): raises_('refresh', user_arg) - instance_methods = self._public_session_methods() - self._class_methods + instance_methods = self._public_session_methods() \ + - self._class_methods eq_(watchdog, instance_methods, watchdog.symmetric_difference(instance_methods)) @@ -1506,10 +1547,9 @@ class TLTransactionTest(engine_base.AltEngineTest, _base.MappedTest): @classmethod def define_tables(cls, metadata): - Table('users', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('name', String(20)), - test_needs_acid=True) + Table('users', metadata, Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('name', String(20)), test_needs_acid=True) @classmethod def setup_classes(cls): diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 2537c48965..062ed5f1b2 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1,6 +1,7 @@ """Test various algorithmic properties of selectables.""" -from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.test.testing import eq_, assert_raises, \ + assert_raises_message from sqlalchemy import * from sqlalchemy.test import * from sqlalchemy.sql import util as sql_util, visitors @@ -24,39 +25,46 @@ table2 = Table('table2', metadata, Column('coly', Integer), ) + class SelectableTest(TestBase, AssertsExecutionResults): + def test_distance_on_labels(self): + # same column three times - s = select([table1.c.col1.label('c2'), table1.c.col1, table1.c.col1.label('c1')]) - # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far - #assert s.corresponding_column(table1.c.col1) is s.c.col1 + s = select([table1.c.col1.label('c2'), table1.c.col1, + table1.c.col1.label('c1')]) + + # didnt do this yet...col.label().make_proxy() has same + # "distance" as col.make_proxy() so far assert + # s.corresponding_column(table1.c.col1) is s.c.col1 + assert s.corresponding_column(s.c.col1) is s.c.col1 assert s.corresponding_column(s.c.c1) is s.c.c1 def test_distance_on_aliases(self): a1 = table1.alias('a1') - - for s in ( - select([a1, table1], use_labels=True), - select([table1, a1], use_labels=True) - ): - assert s.corresponding_column(table1.c.col1) is s.c.table1_col1 + for s in (select([a1, table1], use_labels=True), + select([table1, a1], use_labels=True)): + assert s.corresponding_column(table1.c.col1) \ + is s.c.table1_col1 assert s.corresponding_column(a1.c.col1) is s.c.a1_col1 - def test_join_against_self(self): jj = select([table1.c.col1.label('bar_col1')]) - jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1) + jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) # test column directly agaisnt itself - assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + assert jjj.corresponding_column(jjj.c.table1_col1) \ + is jjj.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 # test alias of the join + j2 = jjj.alias('foo') - assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1 + assert j2.corresponding_column(table1.c.col1) \ + is j2.c.table1_col1 def test_against_cloned_non_table(self): # test that corresponding column digs across @@ -73,20 +81,27 @@ class SelectableTest(TestBase, AssertsExecutionResults): def test_select_on_table(self): sel = select([table1, table2], use_labels=True) - assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1 - assert sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1 - assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1 - assert table1.corresponding_column(sel.c.table1_col1, require_embedded=True) is None - def test_join_against_join(self): - j = outerjoin(table1, table2, table1.c.col1==table2.c.col2) - jj = select([ table1.c.col1.label('bar_col1')],from_obj=[j]).alias('foo') - jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1) - assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + assert sel.corresponding_column(table1.c.col1) \ + is sel.c.table1_col1 + assert sel.corresponding_column(table1.c.col1, + require_embedded=True) is sel.c.table1_col1 + assert table1.corresponding_column(sel.c.table1_col1) \ + is table1.c.col1 + assert table1.corresponding_column(sel.c.table1_col1, + require_embedded=True) is None - j2 = jjj.alias('foo') - assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1 + def test_join_against_join(self): + j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2) + jj = select([table1.c.col1.label('bar_col1')], + from_obj=[j]).alias('foo') + jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) + assert jjj.corresponding_column(jjj.c.table1_col1) \ + is jjj.c.table1_col1 + j2 = jjj.alias('foo') + assert j2.corresponding_column(jjj.c.table1_col1) \ + is j2.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1 def test_table_alias(self): @@ -97,12 +112,18 @@ class SelectableTest(TestBase, AssertsExecutionResults): criterion = a.c.col1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) + def test_union(self): - # tests that we can correspond a column in a Select statement with a certain Table, against - # a column in a Union where one of its underlying Selects matches to that same Table - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ) + + # tests that we can correspond a column in a Select statement + # with a certain Table, against a column in a Union where one of + # its underlying Selects matches to that same Table + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) c = u.corresponding_column(s1.c.table1_col2) @@ -128,19 +149,25 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert u1.corresponding_column(table1.c.colx) is u1.c.col2 assert u1.corresponding_column(table1.c.col3) is u1.c.col1 - def test_singular_union(self): - u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3])) + def test_singular_union(self): + u = union(select([table1.c.col1, table1.c.col2, + table1.c.col3]), select([table1.c.col1, + table1.c.col2, table1.c.col3])) u = union(select([table1.c.col1, table1.c.col2, table1.c.col3])) assert u.c.col1 is not None assert u.c.col2 is not None assert u.c.col3 is not None - + def test_alias_union(self): + # same as testunion, except its an alias of the union - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])).alias('analias') s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 @@ -149,10 +176,14 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly def test_select_union(self): + # like testaliasunion, but off a Select off the union. - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])).alias('analias') s = select([u]) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -160,10 +191,14 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert s.corresponding_column(s2.c.table2_col2) is s.c.col2 def test_union_against_join(self): + # same as testunion, except its an alias of the union - u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') + + u = select([table1.c.col1, table1.c.col2, table1.c.col3, + table1.c.colx, null().label('coly' + )]).union(select([table2.c.col1, table2.c.col2, + table2.c.col3, null().label('colx'), + table2.c.coly])).alias('analias') j1 = table1.join(table2) assert u.corresponding_column(j1.c.table1_colx) is u.c.colx assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx @@ -191,8 +226,11 @@ class SelectableTest(TestBase, AssertsExecutionResults): criterion = a.c.table1_col1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) + def test_column_labels(self): - a = select([table1.c.col1.label('acol1'), table1.c.col2.label('acol2'), table1.c.col3.label('acol3')]) + a = select([table1.c.col1.label('acol1'), + table1.c.col2.label('acol2'), + table1.c.col3.label('acol3')]) j = join(a, table2) criterion = a.c.acol1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) @@ -243,31 +281,31 @@ class SelectableTest(TestBase, AssertsExecutionResults): assert_raises(exc.NoReferencedTableError, s.join, t1) + def test_join_condition(self): m = MetaData() t1 = Table('t1', m, Column('id', Integer)) - t2 = Table('t2', m, Column('id', Integer), Column('t1id', ForeignKey('t1.id'))) - t3 = Table('t3', m, Column('id', Integer), - Column('t1id', ForeignKey('t1.id')), - Column('t2id', ForeignKey('t2.id'))) - t4 = Table('t4', m, Column('id', Integer), Column('t2id', ForeignKey('t2.id'))) - + t2 = Table('t2', m, Column('id', Integer), Column('t1id', + ForeignKey('t1.id'))) + t3 = Table('t3', m, Column('id', Integer), Column('t1id', + ForeignKey('t1.id')), Column('t2id', + ForeignKey('t2.id'))) + t4 = Table('t4', m, Column('id', Integer), Column('t2id', + ForeignKey('t2.id'))) t1t2 = t1.join(t2) t2t3 = t2.join(t3) - - for left, right, a_subset, expected in [ - (t1, t2, None, t1.c.id==t2.c.t1id), - (t1t2, t3, t2, t1t2.c.t2_id==t3.c.t2id), - (t2t3, t1, t3, t1.c.id==t3.c.t1id), - (t2t3, t4, None, t2t3.c.t2_id==t4.c.t2id), - (t2t3, t4, t3, t2t3.c.t2_id==t4.c.t2id), - (t2t3.join(t1), t4, None, t2t3.c.t2_id==t4.c.t2id), - (t2t3.join(t1), t4, t1, t2t3.c.t2_id==t4.c.t2id), - (t1t2, t2t3, t2, t1t2.c.t2_id==t2t3.c.t3_t2id), - ]: - assert expected.compare( - sql_util.join_condition(left, right, a_subset=a_subset) - ) + for (left, right, a_subset, expected) in [ + (t1, t2, None, t1.c.id == t2.c.t1id), + (t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id), + (t2t3, t1, t3, t1.c.id == t3.c.t1id), + (t2t3, t4, None, t2t3.c.t2_id == t4.c.t2id), + (t2t3, t4, t3, t2t3.c.t2_id == t4.c.t2id), + (t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id), + (t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id), + (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id), + ]: + assert expected.compare(sql_util.join_condition(left, + right, a_subset=a_subset)) # these are ambiguous, or have no joins for left, right, a_subset in [ @@ -298,35 +336,39 @@ class SelectableTest(TestBase, AssertsExecutionResults): left.join(right).onclause ) - # TODO: this raises due to right side being "grouped", - # and no longer has FKs. Did we want to make - # _FromGrouping friendlier ? - assert_raises_message( - exc.ArgumentError, - r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?", - t1t2.join, t2t3 - ) - assert_raises_message( - exc.ArgumentError, - r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?", - t1t2.join, t2t3.select(use_labels=True) - ) + + # TODO: this raises due to right side being "grouped", and no + # longer has FKs. Did we want to make _FromGrouping friendlier + # ? + + assert_raises_message(exc.ArgumentError, + "Perhaps you meant to convert the right " + "side to a subquery using alias\(\)\?", + t1t2.join, t2t3) + assert_raises_message(exc.ArgumentError, + "Perhaps you meant to convert the right " + "side to a subquery using alias\(\)\?", + t1t2.join, t2t3.select(use_labels=True)) + class PrimaryKeyTest(TestBase, AssertsExecutionResults): + def test_join_pk_collapse_implicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, - which is the root column along a chain of foreign key relationships.""" + """test that redundant columns in a join get 'collapsed' into a + minimal primary key, which is the root column along a chain of + foreign key relationships.""" meta = MetaData() a = Table('a', meta, Column('id', Integer, primary_key=True)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True)) - + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), + primary_key=True)) + c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), + primary_key=True)) + d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), + primary_key=True)) assert c.c.id.references(b.c.id) assert not d.c.id.references(a.c.id) - assert list(a.join(b).primary_key) == [a.c.id] assert list(b.join(c).primary_key) == [b.c.id] assert list(a.join(b).join(c).primary_key) == [a.c.id] @@ -334,26 +376,36 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults): assert list(d.join(c).join(b).primary_key) == [b.c.id] assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id] + def test_join_pk_collapse_explicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, - which is the root column along a chain of explicit join conditions.""" + """test that redundant columns in a join get 'collapsed' into a + minimal primary key, which is the root column along a chain of + explicit join conditions.""" meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer)) - - print list(a.join(b, a.c.x==b.c.id).primary_key) - assert list(a.join(b, a.c.x==b.c.id).primary_key) == [a.c.id] - assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id] - assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [b.c.id] - assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id] - assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id] - - assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id] + a = Table('a', meta, Column('id', Integer, primary_key=True), + Column('x', Integer)) + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), + primary_key=True), Column('x', Integer)) + c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), + primary_key=True), Column('x', Integer)) + d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), + primary_key=True), Column('x', Integer)) + print list(a.join(b, a.c.x == b.c.id).primary_key) + assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id] + assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id] + assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) \ + == [a.c.id] + assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) \ + == [b.c.id] + assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \ + == [b.c.id] + assert list(d.join(b, d.c.id == b.c.id).join(c, b.c.id + == c.c.x).primary_key) == [b.c.id] + assert list(a.join(b).join(c, c.c.id + == b.c.x).join(d).primary_key) == [a.c.id] + assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x + == b.c.id)).primary_key) == [a.c.id] def test_init_doesnt_blowitaway(self): meta = MetaData() @@ -391,19 +443,17 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults): Column('id', Integer, primary_key= True), ) - engineer = Table( 'Engineer', metadata, - Column('id', Integer, ForeignKey( 'Employee.id', ), primary_key=True), - ) + engineer = Table('Engineer', metadata, + Column('id', Integer, + ForeignKey('Employee.id'), primary_key=True)) - eq_( - util.column_set(employee.join(engineer, employee.c.id==engineer.c.id).primary_key), - util.column_set([employee.c.id]) - ) - eq_( - util.column_set(employee.join(engineer, engineer.c.id==employee.c.id).primary_key), - util.column_set([employee.c.id]) - ) + eq_(util.column_set(employee.join(engineer, employee.c.id + == engineer.c.id).primary_key), + util.column_set([employee.c.id])) + eq_(util.column_set(employee.join(engineer, engineer.c.id + == employee.c.id).primary_key), + util.column_set([employee.c.id])) class ReduceTest(TestBase, AssertsExecutionResults): @@ -419,164 +469,171 @@ class ReduceTest(TestBase, AssertsExecutionResults): Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True), Column('t3data', String(30))) - - eq_( - util.column_set(sql_util.reduce_columns([ - t1.c.t1id, t1.c.t1data, t2.c.t2id, - t2.c.t2data, t3.c.t3id, t3.c.t3data])), - util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data]) - ) + eq_(util.column_set(sql_util.reduce_columns([ + t1.c.t1id, + t1.c.t1data, + t2.c.t2id, + t2.c.t2data, + t3.c.t3id, + t3.c.t3data, + ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, + t3.c.t3data])) + def test_reduce_selectable(self): - metadata = MetaData() - - engineers = Table('engineers', metadata, - Column('engineer_id', Integer, primary_key=True), - Column('engineer_name', String(50)), - ) - - managers = Table('managers', metadata, - Column('manager_id', Integer, primary_key=True), - Column('manager_name', String(50)) - ) - - s = select([engineers, managers]).where(engineers.c.engineer_name==managers.c.manager_name) - - eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), - util.column_set([s.c.engineer_id, s.c.engineer_name, s.c.manager_id]) - ) + metadata = MetaData() + engineers = Table('engineers', metadata, Column('engineer_id', + Integer, primary_key=True), + Column('engineer_name', String(50))) + managers = Table('managers', metadata, Column('manager_id', + Integer, primary_key=True), + Column('manager_name', String(50))) + s = select([engineers, + managers]).where(engineers.c.engineer_name + == managers.c.manager_name) + eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), + util.column_set([s.c.engineer_id, s.c.engineer_name, + s.c.manager_id])) + def test_reduce_aliased_join(self): metadata = MetaData() - people = Table('people', metadata, - Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True), - Column('name', String(50)), - Column('type', String(30))) - - engineers = Table('engineers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), - Column('status', String(30)), - Column('engineer_name', String(50)), - Column('primary_language', String(50)), - ) - - managers = Table('managers', metadata, - Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), - Column('status', String(30)), - Column('manager_name', String(50)) - ) - - pjoin = people.outerjoin(engineers).\ - outerjoin(managers).select(use_labels=True).\ - alias('pjoin') - eq_( - util.column_set(sql_util.reduce_columns([ - pjoin.c.people_person_id, pjoin.c.engineers_person_id, - pjoin.c.managers_person_id])), - util.column_set([pjoin.c.people_person_id]) - ) + people = Table('people', metadata, Column('person_id', Integer, + Sequence('person_id_seq', optional=True), + primary_key=True), Column('name', String(50)), + Column('type', String(30))) + engineers = Table( + 'engineers', + metadata, + Column('person_id', Integer, ForeignKey('people.person_id' + ), primary_key=True), + Column('status', String(30)), + Column('engineer_name', String(50)), + Column('primary_language', String(50)), + ) + managers = Table('managers', metadata, Column('person_id', + Integer, ForeignKey('people.person_id'), + primary_key=True), Column('status', + String(30)), Column('manager_name', + String(50))) + pjoin = \ + people.outerjoin(engineers).outerjoin(managers).\ + select(use_labels=True).alias('pjoin' + ) + eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, + pjoin.c.engineers_person_id, pjoin.c.managers_person_id])), + util.column_set([pjoin.c.people_person_id])) def test_reduce_aliased_union(self): metadata = MetaData() - item_table = Table( - 'item', metadata, - Column('id', Integer, ForeignKey('base_item.id'), primary_key=True), - Column('dummy', Integer, default=0)) - base_item_table = Table( - 'base_item', metadata, - Column('id', Integer, primary_key=True), - Column('child_name', String(255), default=None)) - + item_table = Table('item', metadata, Column('id', Integer, + ForeignKey('base_item.id'), + primary_key=True), Column('dummy', Integer, + default=0)) + base_item_table = Table('base_item', metadata, Column('id', + Integer, primary_key=True), + Column('child_name', String(255), + default=None)) from sqlalchemy.orm.util import polymorphic_union - - item_join = polymorphic_union( { - 'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'), - 'Item':base_item_table.join(item_table), - }, None, 'item_join') - - eq_( - util.column_set(sql_util.reduce_columns([ - item_join.c.id, item_join.c.dummy, item_join.c.child_name - ])), - util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name]) - ) + item_join = polymorphic_union({ + 'BaseItem': + base_item_table.select( + base_item_table.c.child_name + == 'BaseItem'), + 'Item': base_item_table.join(item_table)}, + None, 'item_join') + eq_(util.column_set(sql_util.reduce_columns([item_join.c.id, + item_join.c.dummy, item_join.c.child_name])), + util.column_set([item_join.c.id, item_join.c.dummy, + item_join.c.child_name])) + def test_reduce_aliased_union_2(self): metadata = MetaData() - - page_table = Table('page', metadata, - Column('id', Integer, primary_key=True), - ) + page_table = Table('page', metadata, Column('id', Integer, + primary_key=True)) magazine_page_table = Table('magazine_page', metadata, - Column('page_id', Integer, ForeignKey('page.id'), primary_key=True), - ) + Column('page_id', Integer, + ForeignKey('page.id'), + primary_key=True)) classified_page_table = Table('classified_page', metadata, - Column('magazine_page_id', Integer, - ForeignKey('magazine_page.page_id'), primary_key=True), - ) - - # this is essentially the union formed by the ORM's polymorphic_union function. - # we define two versions with different ordering of selects. + Column('magazine_page_id', Integer, + ForeignKey('magazine_page.page_id'), primary_key=True)) + + # this is essentially the union formed by the ORM's + # polymorphic_union function. we define two versions with + # different ordering of selects. + # + # the first selectable has the "real" column + # classified_page.magazine_page_id - # the first selectable has the "real" column classified_page.magazine_page_id pjoin = union( - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]).select_from(page_table.join(magazine_page_table).join(classified_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]).select_from(page_table.join(magazine_page_table)), - ).alias('pjoin') - - eq_( - util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id]) - ) + select([ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id + ]). + select_from( + page_table.join(magazine_page_table). + join(classified_page_table)), + + select([ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label('magazine_page_id') + ]). + select_from(page_table.join(magazine_page_table)) + ).alias('pjoin') + eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id, + pjoin.c.page_id, pjoin.c.magazine_page_id])), + util.column_set([pjoin.c.id])) # the first selectable has a CAST, which is a placeholder for - # classified_page.magazine_page_id in the second selectable. reduce_columns - # needs to take into account all foreign keys derived from pjoin.c.magazine_page_id. - # the UNION construct currently makes the external column look like that of the first - # selectable only. - pjoin = union( - select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]).select_from(page_table.join(magazine_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]).select_from(page_table.join(magazine_page_table).join(classified_page_table)) - ).alias('pjoin') - - eq_( - util.column_set(sql_util.reduce_columns([ - pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id]) - ) + # classified_page.magazine_page_id in the second selectable. + # reduce_columns needs to take into account all foreign keys + # derived from pjoin.c.magazine_page_id. the UNION construct + # currently makes the external column look like that of the + # first selectable only. + + pjoin = union(select([ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label('magazine_page_id') + ]). + select_from(page_table.join(magazine_page_table)), + + select([ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id + ]). + select_from(page_table.join(magazine_page_table). + join(classified_page_table)) + ).alias('pjoin') + eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id, + pjoin.c.page_id, pjoin.c.magazine_page_id])), + util.column_set([pjoin.c.id])) class DerivedTest(TestBase, AssertsExecutionResults): def test_table(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) + + t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) + t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) assert t1.is_derived_from(t1) assert not t2.is_derived_from(t1) + def test_alias(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) + t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) + t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) assert t1.alias().is_derived_from(t1) assert not t2.alias().is_derived_from(t1) @@ -585,8 +642,11 @@ class DerivedTest(TestBase, AssertsExecutionResults): def test_select(self): meta = MetaData() - t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) + + t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) + t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), + Column('c2', String(30))) assert t1.select().is_derived_from(t1) assert not t2.select().is_derived_from(t1) @@ -623,14 +683,23 @@ class AnnotationsTest(TestBase): t1 = s1._annotate({}) t2 = s1 - # t1 needs to share the same _make_proxy() columns as t2, even though it's - # annotated. otherwise paths will diverge once they are corresponded against "inner" below. + # t1 needs to share the same _make_proxy() columns as t2, even + # though it's annotated. otherwise paths will diverge once they + # are corresponded against "inner" below. + assert t1.c is t2.c assert t1.c.col1 is t2.c.col1 inner = select([s1]) - assert inner.corresponding_column(t2.c.col1, require_embedded=False) is inner.corresponding_column(t2.c.col1, require_embedded=True) is inner.c.col1 - assert inner.corresponding_column(t1.c.col1, require_embedded=False) is inner.corresponding_column(t1.c.col1, require_embedded=True) is inner.c.col1 + + assert inner.corresponding_column(t2.c.col1, + require_embedded=False) \ + is inner.corresponding_column(t2.c.col1, + require_embedded=True) is inner.c.col1 + assert inner.corresponding_column(t1.c.col1, + require_embedded=False) \ + is inner.corresponding_column(t1.c.col1, + require_embedded=True) is inner.c.col1 def test_annotated_visit(self): table1 = table('table1', column("col1"), column("col2")) @@ -643,8 +712,10 @@ class AnnotationsTest(TestBase): b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary}) assert str(b2) == "table1.col1 = table1.col2" - b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':visit_binary}) - assert str(b3) == "table1.col1 = table1.col2" + + b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary' + : visit_binary}) + assert str(b3) == 'table1.col1 = table1.col2' def visit_binary(b): b.left = bindparam('bar') @@ -655,19 +726,20 @@ class AnnotationsTest(TestBase): b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary}) assert str(b5) == ":bar = table1.col2" + def test_annotate_expressions(self): - table1 = table('table1', column("col1"), column("col2")) - - for expr, expected in [ - (table1.c.col1, "table1.col1"), - (table1.c.col1 == 5, "table1.col1 = :col1_1"), - (table1.c.col1.in_([2,3,4]), "table1.col1 IN (:col1_1, :col1_2, :col1_3)") - ]: + table1 = table('table1', column('col1'), column('col2')) + for expr, expected in [(table1.c.col1, 'table1.col1'), + (table1.c.col1 == 5, + 'table1.col1 = :col1_1'), + (table1.c.col1.in_([2, 3, 4]), + 'table1.col1 IN (:col1_1, :col1_2, ' + ':col1_3)')]: eq_(str(expr), expected) eq_(str(expr._annotate({})), expected) eq_(str(sql_util._deep_annotate(expr, {})), expected) - eq_(str(sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1])), expected) - + eq_(str(sql_util._deep_annotate(expr, {}, + exclude=[table1.c.col1])), expected) def test_deannotate(self): table1 = table('table1', column("col1"), column("col2")) @@ -681,9 +753,10 @@ class AnnotationsTest(TestBase): for elem in (b2._annotations, b2.left._annotations): assert '_orm_adapt' in elem - for elem in (b3._annotations, b3.left._annotations, b4._annotations, b4.left._annotations): + for elem in b3._annotations, b3.left._annotations, \ + b4._annotations, b4.left._annotations: assert elem == {} - + assert b2.left is not bin.left assert b3.left is not b2.left is not bin.left assert b4.left is bin.left # since column is immutable