]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Undeclared SAWarnings are now fatal to tests as well.
authorJason Kirtland <jek@discorporate.us>
Sat, 12 Jan 2008 04:52:05 +0000 (04:52 +0000)
committerJason Kirtland <jek@discorporate.us>
Sat, 12 Jan 2008 04:52:05 +0000 (04:52 +0000)
- Fixed typo that was killing runs of individual named tests.

test/dialect/mysql.py
test/engine/reflection.py
test/orm/inheritance/basic.py
test/orm/mapper.py
test/orm/query.py
test/sql/generative.py
test/sql/selectable.py
test/sql/testtypes.py
test/testlib/testing.py

index 0a9153327082fa85b12b3b999912238666091349..df6a2547bec273fb9b269856e0a5d2a5cc76f5b3 100644 (file)
@@ -1,5 +1,5 @@
 import testbase
-import sets, warnings
+import sets
 from sqlalchemy import *
 from sqlalchemy import sql, exceptions
 from sqlalchemy.databases import mysql
@@ -627,6 +627,7 @@ class TypesTest(AssertMixin):
             def_table.drop()
 
     @testing.exclude('mysql', '<', (5, 0, 0))
+    @testing.uses_deprecated('Using String type with no length')
     def test_type_reflection(self):
         # (ask_for, roundtripped_as_if_different)
         specs = [( String(), mysql.MSText(), ),
@@ -665,13 +666,7 @@ class TypesTest(AssertMixin):
         m = MetaData(db)
         t_table = Table('mysql_types', m, *columns)
         try:
-            try:
-                warnings.filterwarnings('ignore',
-                                        'Using String type with no length.*')
-                m.create_all()
-            finally:
-                warnings.filterwarnings("always",
-                                        'Using String type with no length.*')
+            m.create_all()
 
             m2 = MetaData(db)
             rt = Table('mysql_types', m2, autoload=True)
index ea56e315b0674b2dbfac8793f59e6f4ace0ab786..c123b930a771e282ab70506c4377d4b8809d6f58 100644 (file)
@@ -1,6 +1,5 @@
 import testbase
-import pickle, StringIO, unicodedata, warnings
-
+import pickle, StringIO, unicodedata
 from sqlalchemy import *
 from sqlalchemy import exceptions
 from sqlalchemy import types as sqltypes
@@ -206,21 +205,20 @@ class ReflectionTest(PersistTest):
         dialect_module.ischema_names = {}
         try:
             try:
-                warnings.filterwarnings('error', 'Did not recognize type')
                 m2 = MetaData(testbase.db)
                 t2 = Table("test", m2, autoload=True)
                 assert False
-            except RuntimeWarning:
+            except exceptions.SAWarning:
                 assert True
 
-            warnings.filterwarnings('ignore', 'Did not recognize type')
-            m3 = MetaData(testbase.db)
-            t3 = Table("test", m3, autoload=True)
-            assert t3.c.foo.type.__class__ == sqltypes.NullType
+            @testing.emits_warning('Did not recognize type')
+            def warns():
+                m3 = MetaData(testbase.db)
+                t3 = Table("test", m3, autoload=True)
+                assert t3.c.foo.type.__class__ == sqltypes.NullType
 
         finally:
             dialect_module.ischema_names = ischema_names
-            warnings.filterwarnings('always', 'Did not recognize type')
             t.drop()
 
     def test_override_fkandpkcol(self):
index 1e46a3f6b7acbb7256b23ceb0567db11735ff88b..f2b7c6e4f8f5e685ef234f3abd2f2e47afc794da 100644 (file)
@@ -1,5 +1,4 @@
 import testbase
-import warnings
 from sqlalchemy import *
 from sqlalchemy import exceptions, util
 from sqlalchemy.orm import *
@@ -550,14 +549,12 @@ class DistinctPKTest(ORMTest):
         self._do_test(True)
 
     def test_explicit_composite_pk(self):
-        warnings.filterwarnings("error", r".*On mapper.*distinct primary key")
-
         person_mapper = mapper(Person, person_table)
         try:
             mapper(Employee, employee_table, inherits=person_mapper, primary_key=[person_table.c.id, employee_table.c.id])
             self._do_test(True)
             assert False
-        except RuntimeWarning, e:
+        except exceptions.SAWarning, e:
             assert str(e) == "On mapper Mapper|Employee|employees, primary key column 'employees.id' is being combined with distinct primary key column 'persons.id' in attribute 'id'.  Use explicit properties to give each column its own mapped attribute name.", str(e)
 
     def test_explicit_pk(self):
index ca2f832114a8f7fcc84acd6a8542400da8a24874..464979f4cabf75824fd9503dd02dbb56a0571744 100644 (file)
@@ -130,15 +130,12 @@ class MapperTest(MapperSuperTest):
         def bad_expunge(foo):
             raise Exception("this exception should be stated as a warning")
 
-        import warnings
-        warnings.filterwarnings("always", r".*this exception should be stated as a warning")
-
         sess.expunge = bad_expunge
         try:
             Foo(_sa_session=sess)
             assert False
         except Exception, e:
-            assert e is ex
+            assert isinstance(e, exceptions.SAWarning)
 
         clear_mappers()
 
index 233e4ac7f9bb111f8daae4d1f5dc9d1ad664786f..2d06be363f098d063b067c02f4cda1ca9028b82f 100644 (file)
@@ -12,7 +12,7 @@ from testlib.fixtures import *
 class QueryTest(FixtureTest):
     keep_mappers = True
     keep_data = True
-    
+
     def setUpAll(self):
         super(QueryTest, self).setUpAll()
         self.setup_mappers()
@@ -60,46 +60,41 @@ class GetTest(QueryTest):
         s.clear()
         u2 = s.query(User).get(7)
         assert u is not u2
-    
+
     def test_no_criterion(self):
         """test that get()/load() does not use preexisting filter/etc. criterion"""
 
         s = create_session()
-
-        import warnings
-        warnings.filterwarnings("error", r".*Query.*")
-        print "---------------------------------------"
         try:
             s.query(User).join('addresses').filter(Address.user_id==8).get(7)
             assert False
-        except RuntimeWarning, e:
+        except exceptions.SAWarning, e:
             assert str(e) == "Query.get() being called on a Query with existing criterion; criterion is being ignored."
 
-        warnings.filterwarnings("once", r".*Query.*")
-        
-        assert s.query(User).filter(User.id==7).get(19) is None
+        @testing.emits_warning('Query.*')
+        def warns():
+            assert s.query(User).filter(User.id==7).get(19) is None
 
-        u = s.query(User).get(7)
-        assert s.query(User).filter(User.id==9).get(7) is u
-        s.clear()
-        assert s.query(User).filter(User.id==9).get(7).id == u.id
+            u = s.query(User).get(7)
+            assert s.query(User).filter(User.id==9).get(7) is u
+            s.clear()
+            assert s.query(User).filter(User.id==9).get(7).id == u.id
 
-        # user 10 has no addresses
-        u = s.query(User).get(10)
-        assert s.query(User).join('addresses').get(10) is u
-        s.clear()
-        assert s.query(User).join('addresses').get(10).id == u.id
-        
-        u = s.query(User).get(7)
-        assert s.query(User).join('addresses').filter(Address.user_id==8).filter(User.id==7).first() is None
-        assert s.query(User).join('addresses').filter(Address.user_id==8).get(7) is u
-        s.clear()
-        assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id
-        
-        assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id
+            # user 10 has no addresses
+            u = s.query(User).get(10)
+            assert s.query(User).join('addresses').get(10) is u
+            s.clear()
+            assert s.query(User).join('addresses').get(10).id == u.id
 
+            u = s.query(User).get(7)
+            assert s.query(User).join('addresses').filter(Address.user_id==8).filter(User.id==7).first() is None
+            assert s.query(User).join('addresses').filter(Address.user_id==8).get(7) is u
+            s.clear()
+            assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id
+
+            assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id
+        warns()
 
-        
     def test_unique_param_names(self):
         class SomeUser(object):
             pass
@@ -250,7 +245,7 @@ class OperatorTest(QueryTest):
 
     def test_op(self):
         assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name_1"
-        
+
     def test_in(self):
          self._test(User.id.in_(['a', 'b']),
                     "users.id IN (:users_id_1, :users_id_2)")
@@ -391,7 +386,7 @@ class FilterTest(QueryTest):
 
         # one to many generates WHERE NOT EXISTS
         assert [User(name='chuck')] == sess.query(User).filter_by(addresses = None).all()
-        
+
 class AggregateTest(QueryTest):
     def test_sum(self):
         sess = create_session()
@@ -407,7 +402,7 @@ class AggregateTest(QueryTest):
         assert [User(name=u'ed',id=8)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)> 2).all()
 
         assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)< 2).all()
-        
+
 class CountTest(QueryTest):
     def test_basic(self):
         assert 4 == create_session().query(User).count()
@@ -421,28 +416,28 @@ class DistinctTest(QueryTest):
 
     def test_joined(self):
         """test that orderbys from a joined table get placed into the columns clause when DISTINCT is used"""
-        
+
         sess = create_session()
         q = sess.query(User).join('addresses').distinct().order_by(desc(Address.email_address))
 
         assert [User(id=7), User(id=9), User(id=8)] == q.all()
 
         sess.clear()
-        
-        # test that it works on embedded eagerload/LIMIT subquery 
+
+        # test that it works on embedded eagerload/LIMIT subquery
         q = sess.query(User).join('addresses').distinct().options(eagerload('addresses')).order_by(desc(Address.email_address)).limit(2)
 
         def go():
             assert [
                 User(id=7, addresses=[
                     Address(id=1)
-                ]), 
+                ]),
                 User(id=9, addresses=[
                     Address(id=5)
-                ]), 
+                ]),
             ] == q.all()
         self.assert_sql_count(testbase.db, go, 1)
-        
+
 
 class YieldTest(QueryTest):
     def test_basic(self):
@@ -463,7 +458,7 @@ class YieldTest(QueryTest):
             assert False
         except StopIteration:
             pass
-        
+
 class TextTest(QueryTest):
     def test_fulltext(self):
         assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users").all()
@@ -505,7 +500,7 @@ class ParentTest(QueryTest):
         # test against None for parent? this can't be done with the current API since we don't know
         # what mapper to use
         #assert sess.query(Order).with_parent(None, property='addresses').all() == [Order(description="order 5")]
-        
+
     def test_noparent(self):
         sess = create_session()
         q = sess.query(User)
@@ -526,16 +521,16 @@ class ParentTest(QueryTest):
 
 
 class JoinTest(QueryTest):
-    
+
     def test_getjoinable_tables(self):
         sess = create_session()
-        
+
         sel1 = select([users]).alias()
         sel2 = select([users], from_obj=users.join(addresses)).alias()
-        
+
         j1 = sel1.join(users, sel1.c.id==users.c.id)
         j2 = j1.join(addresses)
-        
+
         for from_obj, assert_cond in (
             (users, [users]),
             (users.join(addresses), [users, addresses]),
@@ -544,11 +539,11 @@ class JoinTest(QueryTest):
             (sel1.join(users, sel1.c.id==users.c.id), [sel1, users]),
             (sel2.join(users, sel2.c.id==users.c.id), [sel2, users]),
             (j2, [j1, j2, sel1, users, addresses])
-            
+
         ):
             ret = set(sess.query(User).select_from(from_obj)._get_joinable_tables())
             self.assertEquals(ret, set(assert_cond).union([from_obj]), [x.description for x in ret])
-        
+
     def test_overlapping_paths(self):
         for aliased in (True,False):
             # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
@@ -740,7 +735,7 @@ class InstancesTest(QueryTest):
         self.assert_sql_count(testbase.db, go, 1)
 
         sess.clear()
-        
+
         def go():
             l = q.options(contains_alias('ulist'), contains_eager('addresses')).from_statement(query).all()
             assert fixtures.user_address_result == l
@@ -758,7 +753,7 @@ class InstancesTest(QueryTest):
         self.assert_sql_count(testbase.db, go, 1)
 
         sess.clear()
-        
+
         def go():
             l = q.options(contains_eager('addresses')).from_statement(selectquery).all()
             assert fixtures.user_address_result[0:3] == l
@@ -776,15 +771,15 @@ class InstancesTest(QueryTest):
             assert fixtures.user_address_result == l
         self.assert_sql_count(testbase.db, go, 1)
         sess.clear()
-        
+
         def go():
             # test using the Alias object itself
             l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute())
             assert fixtures.user_address_result == l
         self.assert_sql_count(testbase.db, go, 1)
-        
+
         sess.clear()
-        
+
         def decorate(row):
             d = {}
             for c in addresses.columns:
@@ -797,7 +792,7 @@ class InstancesTest(QueryTest):
             assert fixtures.user_address_result == l
         self.assert_sql_count(testbase.db, go, 1)
         sess.clear()
-        
+
         oalias = orders.alias('o1')
         ialias = items.alias('i1')
         query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id).order_by(oalias.c.id).order_by(ialias.c.id)
@@ -809,7 +804,7 @@ class InstancesTest(QueryTest):
         self.assert_sql_count(testbase.db, go, 1)
 
         sess.clear()
-        
+
         # test using Alias with more than one level deep
         def go():
             l = q.options(contains_eager('orders', alias=oalias), contains_eager('orders.items', alias=ialias)).instances(query.execute())
@@ -839,9 +834,9 @@ class InstancesTest(QueryTest):
         q = sess.query(User)
         l = q.instances(selectquery.execute(), Address)
         assert l == expected
-        
+
         sess.clear()
-        
+
         for aliased in (False, True):
             q = sess.query(User)
 
@@ -862,7 +857,7 @@ class InstancesTest(QueryTest):
             q = sess.query(User, Address).join('addresses', aliased=aliased).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
             assert q.all() == [(user8, address3)]
             sess.clear()
-            
+
     def test_aliased_multi_mappers(self):
         sess = create_session()
 
@@ -884,7 +879,7 @@ class InstancesTest(QueryTest):
         assert l == expected
 
         sess.clear()
-        
+
         q = sess.query(User).add_entity(Address, alias=adalias)
         l = q.select_from(users.outerjoin(adalias)).filter(adalias.c.email_address=='ed@bettyboop.com').all()
         assert l == [(user8, address3)]
@@ -893,18 +888,18 @@ class InstancesTest(QueryTest):
         sess = create_session()
 
         expected = [(u, u.name) for u in sess.query(User).all()]
-        
+
         for add_col in (User.name, users.c.name, User.c.name):
             assert sess.query(User).add_column(add_col).all() == expected
             sess.clear()
-            
+
         try:
             sess.query(User).add_column(object()).all()
             assert False
         except exceptions.InvalidRequestError, e:
             assert "Invalid column expression" in str(e)
-            
-        
+
+
     def test_multi_columns_2(self):
         """test aliased/nonalised joins with the usage of add_column()"""
         sess = create_session()
@@ -922,13 +917,13 @@ class InstancesTest(QueryTest):
             l = q.all()
             assert l == expected
             sess.clear()
-            
+
         s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id)
         q = sess.query(User)
         l = q.add_column("count").from_statement(s).all()
         assert l == expected
-    
-        
+
+
     def test_two_columns(self):
         sess = create_session()
         (user7, user8, user9, user10) = sess.query(User).all()
@@ -943,9 +938,9 @@ class InstancesTest(QueryTest):
         q = create_session().query(User)
         l = q.add_column("count").add_column("concat").from_statement(s).all()
         assert l == expected
-        
+
         sess.clear()
-        
+
         # test with select_from()
         q = create_session().query(User).add_column(func.count(addresses.c.id))\
             .add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\
@@ -953,7 +948,7 @@ class InstancesTest(QueryTest):
 
         assert q.all() == expected
         sess.clear()
-        
+
         # test with outerjoin() both aliased and non
         for aliased in (False, True):
             q = create_session().query(User).add_column(func.count(addresses.c.id))\
@@ -966,19 +961,19 @@ class InstancesTest(QueryTest):
 
 class SelectFromTest(QueryTest):
     keep_mappers = False
-    
+
     def setup_mappers(self):
         pass
-        
+
     def test_replace_with_select(self):
         mapper(User, users, properties = {
             'addresses':relation(Address)
         })
         mapper(Address, addresses)
-    
+
         sel = users.select(users.c.id.in_([7, 8])).alias()
         sess = create_session()
-    
+
         self.assertEquals(sess.query(User).select_from(sel).all(), [User(id=7), User(id=8)])
 
         self.assertEquals(sess.query(User).select_from(sel).filter(User.c.id==8).all(), [User(id=8)])
@@ -990,8 +985,8 @@ class SelectFromTest(QueryTest):
         self.assertEquals(sess.query(User).select_from(sel).order_by(asc(User.name)).all(), [
             User(name='ed',id=8), User(name='jack',id=7)
         ])
-        
-        self.assertEquals(sess.query(User).select_from(sel).options(eagerload('addresses')).first(), 
+
+        self.assertEquals(sess.query(User).select_from(sel).options(eagerload('addresses')).first(),
             User(name='jack', addresses=[Address(id=1)])
         )
 
@@ -1004,19 +999,19 @@ class SelectFromTest(QueryTest):
         sel = users.select(users.c.id.in_([7, 8]))
         sess = create_session()
 
-        self.assertEquals(sess.query(User).select_from(sel).join('addresses').add_entity(Address).order_by(User.id).order_by(Address.id).all(), 
+        self.assertEquals(sess.query(User).select_from(sel).join('addresses').add_entity(Address).order_by(User.id).order_by(Address.id).all(),
             [
-                (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), 
-                (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), 
+                (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
+                (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
                 (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)),
                 (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4))
             ]
         )
 
-        self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(), 
+        self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(),
             [
-                (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), 
-                (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), 
+                (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
+                (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
                 (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)),
                 (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4))
             ]
@@ -1033,10 +1028,10 @@ class SelectFromTest(QueryTest):
             'keywords':relation(Keyword, secondary=item_keywords, order_by=keywords.c.id) #m2m
         })
         mapper(Keyword, keywords)
-        
+
         sel = users.select(users.c.id.in_([7, 8]))
         sess = create_session()
-        
+
         self.assertEquals(sess.query(User).select_from(sel).join(['orders', 'items', 'keywords']).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
             User(name=u'jack',id=7)
         ])
@@ -1049,15 +1044,15 @@ class SelectFromTest(QueryTest):
             self.assertEquals(sess.query(User).select_from(sel).options(eagerload_all('orders.items.keywords')).join(['orders', 'items', 'keywords'], aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
                 User(name=u'jack',orders=[
                     Order(description=u'order 1',items=[
-                        Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]), 
-                        Item(description=u'item 2',keywords=[Keyword(name=u'red',id=2), Keyword(name=u'small',id=5), Keyword(name=u'square')]), 
+                        Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]),
+                        Item(description=u'item 2',keywords=[Keyword(name=u'red',id=2), Keyword(name=u'small',id=5), Keyword(name=u'square')]),
                         Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)])
-                    ]), 
+                    ]),
                     Order(description=u'order 3',items=[
-                        Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]), 
-                        Item(description=u'item 4',keywords=[],id=4), 
+                        Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]),
+                        Item(description=u'item 4',keywords=[],id=4),
                         Item(description=u'item 5',keywords=[],id=5)
-                        ]), 
+                        ]),
                     Order(description=u'order 5',items=[Item(description=u'item 5',keywords=[])])])
                 ])
         self.assert_sql_count(testbase.db, go, 1)
@@ -1065,36 +1060,36 @@ class SelectFromTest(QueryTest):
         sess.clear()
         sel2 = orders.select(orders.c.id.in_([1,2,3]))
         self.assertEquals(sess.query(Order).select_from(sel2).join(['items', 'keywords']).filter(Keyword.name == 'red').all(), [
-            Order(description=u'order 1',id=1), 
-            Order(description=u'order 2',id=2), 
+            Order(description=u'order 1',id=1),
+            Order(description=u'order 2',id=2),
         ])
         self.assertEquals(sess.query(Order).select_from(sel2).join(['items', 'keywords'], aliased=True).filter(Keyword.name == 'red').all(), [
-            Order(description=u'order 1',id=1), 
-            Order(description=u'order 2',id=2), 
+            Order(description=u'order 1',id=1),
+            Order(description=u'order 2',id=2),
         ])
-        
-        
+
+
     def test_replace_with_eager(self):
         mapper(User, users, properties = {
             'addresses':relation(Address)
         })
         mapper(Address, addresses)
-    
+
         sel = users.select(users.c.id.in_([7, 8]))
         sess = create_session()
-    
+
         def go():
-            self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).all(), 
+            self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).all(),
                 [
-                    User(id=7, addresses=[Address(id=1)]), 
+                    User(id=7, addresses=[Address(id=1)]),
                     User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])
                 ]
             )
         self.assert_sql_count(testbase.db, go, 1)
         sess.clear()
-        
+
         def go():
-            self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(), 
+            self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(),
                 [User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])]
             )
         self.assert_sql_count(testbase.db, go, 1)
@@ -1103,7 +1098,7 @@ class SelectFromTest(QueryTest):
         def go():
             self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel)[1], User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)]))
         self.assert_sql_count(testbase.db, go, 1)
-    
+
 class CustomJoinTest(QueryTest):
     keep_mappers = False
 
@@ -1244,5 +1239,3 @@ class ExternalColumnsTest(QueryTest):
 
 if __name__ == '__main__':
     testbase.main()
-
-
index 41b4caebf7ab3fa12e29f6f82070312d781e713a..c787d3404d4a05229650c9cb9e5fc6ed81b1d63f 100644 (file)
@@ -11,10 +11,10 @@ from sqlalchemy.sql import util as sql_util
 class TraversalTest(AssertMixin):
     """test ClauseVisitor's traversal, particularly its ability to copy and modify
     a ClauseElement in place."""
-    
+
     def setUpAll(self):
         global A, B
-        
+
         # establish two ficticious ClauseElements.
         # define deep equality semantics as well as deep identity semantics.
         class A(ClauseElement):
@@ -23,16 +23,16 @@ class TraversalTest(AssertMixin):
 
             def is_other(self, other):
                 return other is self
-            
+
             def __eq__(self, other):
                 return other.expr == self.expr
-            
+
             def __ne__(self, other):
                 return other.expr != self.expr
-                
+
             def __str__(self):
                 return "A(%s)" % repr(self.expr)
-                
+
         class B(ClauseElement):
             def __init__(self, *items):
                 self.items = items
@@ -50,22 +50,22 @@ class TraversalTest(AssertMixin):
                     if i1 != i2:
                         return False
                 return True
-            
+
             def __ne__(self, other):
                 for i1, i2 in zip(self.items, other.items):
                     if i1 != i2:
                         return True
                 return False
-            
-            def _copy_internals(self, clone=_clone):    
+
+            def _copy_internals(self, clone=_clone):
                 self.items = [clone(i) for i in self.items]
 
             def get_children(self, **kwargs):
                 return self.items
-            
+
             def __str__(self):
                 return "B(%s)" % repr([str(i) for i in self.items])
-    
+
     def test_test_classes(self):
         a1 = A("expr1")
         struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
@@ -78,22 +78,22 @@ class TraversalTest(AssertMixin):
         assert struct != struct3
         assert not struct.is_other(struct2)
         assert not struct.is_other(struct3)
-        
-    def test_clone(self):    
+
+    def test_clone(self):
         struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
-        
+
         class Vis(ClauseVisitor):
             def visit_a(self, a):
                 pass
             def visit_b(self, b):
                 pass
-                
+
         vis = Vis()
         s2 = vis.traverse(struct, clone=True)
         assert struct == s2
         assert not struct.is_other(s2)
 
-    def test_no_clone(self):    
+    def test_no_clone(self):
         struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
 
         class Vis(ClauseVisitor):
@@ -106,7 +106,7 @@ class TraversalTest(AssertMixin):
         s2 = vis.traverse(struct, clone=False)
         assert struct == s2
         assert struct.is_other(s2)
-        
+
     def test_change_in_place(self):
         struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
         struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), A("expr2b")), A("expr3"))
@@ -136,41 +136,41 @@ class TraversalTest(AssertMixin):
         s3 = vis2.traverse(struct, clone=True)
         assert struct != s3
         assert struct3 == s3
-    
-        
+
+
 class ClauseTest(SQLCompileTest):
     """test copy-in-place behavior of various ClauseElements."""
-    
+
     def setUpAll(self):
         global t1, t2
-        t1 = table("table1", 
+        t1 = table("table1",
             column("col1"),
             column("col2"),
             column("col3"),
             )
-        t2 = table("table2", 
+        t2 = table("table2",
             column("col1"),
             column("col2"),
             column("col3"),
             )
-            
+
     def test_binary(self):
         clause = t1.c.col2 == t2.c.col2
         assert str(clause) == ClauseVisitor().traverse(clause, clone=True)
-    
+
     def test_join(self):
         clause = t1.join(t2, t1.c.col2==t2.c.col2)
         c1 = str(clause)
         assert str(clause) == str(ClauseVisitor().traverse(clause, clone=True))
-    
+
         class Vis(ClauseVisitor):
             def visit_binary(self, binary):
                 binary.right = t2.c.col3
-                
+
         clause2 = Vis().traverse(clause, clone=True)
         assert c1 == str(clause)
         assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
-        
+
     def test_text(self):
         clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')])
         c1 = str(clause)
@@ -178,13 +178,13 @@ class ClauseTest(SQLCompileTest):
             def visit_textclause(self, text):
                 text.text = text.text + " SOME MODIFIER=:lala"
                 text.bindparams['lala'] = bindparam('lala')
-                
+
         clause2 = Vis().traverse(clause, clone=True)
         assert c1 == str(clause)
         assert str(clause2) == c1 + " SOME MODIFIER=:lala"
         assert clause.bindparams.keys() == ['bar']
         assert util.Set(clause2.bindparams.keys()) == util.Set(['bar', 'lala'])
-                
+
     def test_select(self):
         s2 = select([t1])
         s2_assert = str(s2)
@@ -201,7 +201,7 @@ class ClauseTest(SQLCompileTest):
         assert str(s2) == s3_assert
 
         print "------------------"
-        
+
         s4_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col3==9)))
         class Vis(ClauseVisitor):
             def visit_select(self, select):
@@ -211,7 +211,7 @@ class ClauseTest(SQLCompileTest):
         print str(s4)
         assert str(s4) == s4_assert
         assert str(s3) == s3_assert
-        
+
         print "------------------"
         s5_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col1==9)))
         class Vis(ClauseVisitor):
@@ -224,10 +224,10 @@ class ClauseTest(SQLCompileTest):
         print str(s5)
         assert str(s5) == s5_assert
         assert str(s4) == s4_assert
-    
+
     def test_binds(self):
         """test that unique bindparams change their name upon clone() to prevent conflicts"""
-        
+
         s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias()
         s2 = ClauseVisitor().traverse(s, clone=True).alias()
         s3 = select([s], s.c.col2==s2.c.col2)
@@ -236,7 +236,7 @@ class ClauseTest(SQLCompileTest):
         "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) AS anon_1, "\
         "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 "\
         "WHERE anon_1.col2 = anon_2.col2")
-        
+
         s = select([t1], t1.c.col1==4).alias()
         s2 = ClauseVisitor().traverse(s, clone=True).alias()
         s3 = select([s], s.c.col2==s2.c.col2)
@@ -244,7 +244,8 @@ class ClauseTest(SQLCompileTest):
         "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_1) AS anon_1, "\
         "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_2) AS anon_2 "\
         "WHERE anon_1.col2 = anon_2.col2")
-        
+
+    @testing.emits_warning('.*replaced by another column with the same key')
     def test_alias(self):
         subq = t2.select().alias('subq')
         s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)])
@@ -260,38 +261,38 @@ class ClauseTest(SQLCompileTest):
 
         s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3, clone=True)
         assert orig == str(s) == str(s3) == str(s4)
-        
+
     def test_correlated_select(self):
         s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2)
         class Vis(ClauseVisitor):
             def visit_select(self, select):
                 select.append_whereclause(t1.c.col2==7)
-                
+
         self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2_1")
 
 class ClauseAdapterTest(SQLCompileTest):
     def setUpAll(self):
         global t1, t2
-        t1 = table("table1", 
+        t1 = table("table1",
             column("col1"),
             column("col2"),
             column("col3"),
             )
-        t2 = table("table2", 
+        t2 = table("table2",
             column("col1"),
             column("col2"),
             column("col3"),
             )
-            
+
 
     def test_table_to_alias(self):
-        
+
         t1alias = t1.alias('t1alias')
-        
+
         vis = sql_util.ClauseAdapter(t1alias)
         ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
         assert ff._get_from_objects() == [t1alias]
-        
+
         self.assert_compile(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias")
         self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
         self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
@@ -306,17 +307,17 @@ class ClauseAdapterTest(SQLCompileTest):
         ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
         self.assert_compile(ff, "count(t1alias.col1) AS foo")
         assert ff._get_from_objects() == [t1alias]
-        
+
 # TODO:
     #    self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
-        
+
         t2alias = t2.alias('t2alias')
         vis.chain(sql_util.ClauseAdapter(t2alias))
         self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
         self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
         self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
         self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2")
-    
+
     def test_include_exclude(self):
         m = MetaData()
         a=Table( 'a',m,
@@ -331,7 +332,7 @@ class ClauseAdapterTest(SQLCompileTest):
         e = sql_util.ClauseAdapter( b, include= set([ a.c.id ]),
           equivalents= { a.c.id: set([ a.c.id]) }
         ).traverse( e)
-        
+
         assert str(e) == "a_1.id = a.xxx_id"
 
     def test_join_to_alias(self):
@@ -363,7 +364,7 @@ class ClauseAdapterTest(SQLCompileTest):
                                  " LEFT OUTER JOIN d ON a_id = d.aid")
         j5 = j3.alias('foo')
         j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0]
-        
+
         # this statement takes c join(a join b), wraps it inside an aliased "select * from c join(a join b) AS foo".
         # the outermost right side "left outer join d" stays the same, except "d" joins against foo.a_id instead
         # of plain "a_id"
@@ -371,30 +372,30 @@ class ClauseAdapterTest(SQLCompileTest):
                                 "c JOIN (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid FROM a LEFT OUTER JOIN b ON a.id = b.aid) "
                                 "ON b_id = c.bid) AS foo"
                                 " LEFT OUTER JOIN d ON foo.a_id = d.aid")
-    
+
     def test_derived_from(self):
         assert select([t1]).is_derived_from(t1)
         assert not select([t2]).is_derived_from(t1)
         assert not t1.is_derived_from(select([t1]))
         assert t1.alias().is_derived_from(t1)
-        
-        
+
+
         s1 = select([t1, t2]).alias('foo')
         s2 = select([s1]).limit(5).offset(10).alias()
         assert s2.is_derived_from(s1)
         s2 = s2._clone()
         assert s2.is_derived_from(s1)
-        
+
     def test_aliasedselect_to_aliasedselect(self):
         # original issue from ticket #904
         s1 = select([t1]).alias('foo')
         s2 = select([s1]).limit(5).offset(10).alias()
 
-        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1), 
+        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1),
             "SELECT foo.col1, foo.col2, foo.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo  LIMIT 5 OFFSET 10")
-        
+
         j = s1.outerjoin(t2, s1.c.col1==t2.c.col1)
-        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), 
+        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
             "SELECT anon_1.col1, anon_1.col2, anon_1.col3, table2.col1, table2.col2, table2.col3 FROM "\
             "(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\
             "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo  LIMIT 5 OFFSET 10) AS anon_1 "\
@@ -402,40 +403,40 @@ class ClauseAdapterTest(SQLCompileTest):
 
         talias = t1.alias('bar')
         j = s1.outerjoin(talias, s1.c.col1==talias.c.col1)
-        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), 
+        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
             "SELECT anon_1.col1, anon_1.col2, anon_1.col3, bar.col1, bar.col2, bar.col3 FROM "\
             "(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\
             "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo  LIMIT 5 OFFSET 10) AS anon_1 "\
             "LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1")
-        
-        
+
+
 class SelectTest(SQLCompileTest):
     """tests the generative capability of Select"""
 
     def setUpAll(self):
         global t1, t2
-        t1 = table("table1", 
+        t1 = table("table1",
             column("col1"),
             column("col2"),
             column("col3"),
             )
-        t2 = table("table2", 
+        t2 = table("table2",
             column("col1"),
             column("col2"),
             column("col3"),
             )
-    
+
     def test_select(self):
-        self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), 
+        self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3),
         "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1_1 ORDER BY table1.col3")
-    
-        self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), 
+
+        self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3),
             "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
             "FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3")
-        
+
         s = select([t2], t2.c.col1==t1.c.col1, correlate=False)
         s = s.correlate(t1).order_by(t2.c.col3)
-        self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), 
+        self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3),
             "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
             "FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3")
 
@@ -444,7 +445,7 @@ class SelectTest(SQLCompileTest):
         self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
         select_copy = s.column('yyy')
         self.assert_compile(select_copy, "SELECT table1.col1, table1.col2, table1.col3, yyy FROM table1")
-        assert s.columns is not select_copy.columns 
+        assert s.columns is not select_copy.columns
         assert s._columns is not select_copy._columns
         assert s._raw_columns is not select_copy._raw_columns
         self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
@@ -464,8 +465,8 @@ class SelectTest(SQLCompileTest):
         self.assert_compile(s2, "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
                 "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 "
                 "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
-                
-        s3 = s.correlate(None)        
+
+        s3 = s.correlate(None)
         self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
                 "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 "
                 "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
@@ -479,7 +480,7 @@ class SelectTest(SQLCompileTest):
         self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
                 "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 "
                 "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
-    
+
     def test_prefixes(self):
         s = t1.select()
         self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
@@ -488,4 +489,4 @@ class SelectTest(SQLCompileTest):
         self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
 
 if __name__ == '__main__':
-    testbase.main()        
+    testbase.main()
index 1b9959ec431966a0b8534e0de3938b7c6831f6eb..850e29e168fb459e743dba4a33ee8bdab3b304bf 100755 (executable)
@@ -1,18 +1,18 @@
 """tests that various From objects properly export their columns, as well as
 useable primary keys and foreign keys.  Full relational algebra depends on
 every selectable unit behaving nicely with others.."""
+
 import testbase
 from sqlalchemy import *
 from testlib import *
 
 metadata = MetaData()
-table = Table('table1', metadata, 
+table = Table('table1', metadata,
     Column('col1', Integer, primary_key=True),
     Column('col2', String(20)),
     Column('col3', Integer),
     Column('colx', Integer),
-    
+
 )
 
 table2 = Table('table2', metadata,
@@ -31,47 +31,47 @@ class SelectableTest(AssertMixin):
         #assert s.corresponding_column(table.c.col1) is s.c.col1
         assert s.corresponding_column(s.c.col1) is s.c.col1
         assert s.corresponding_column(s.c.c1) is s.c.c1
-        
+
     def testjoinagainstself(self):
         jj = select([table.c.col1.label('bar_col1')])
         jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
-        
+
         # test column directly agaisnt itself
         assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
 
         assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
-        
-        # test alias of the join, targets the column with the least 
+
+        # test alias of the join, targets the column with the least
         # "distance" between the requested column and the returned column
         # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than
         # there is from j2.c.bar_col1 to table.c.col1)
         j2 = jjj.alias('foo')
         assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1
-        
+
     def testselectontable(self):
         sel = select([table, table2], use_labels=True)
         assert sel.corresponding_column(table.c.col1) is sel.c.table1_col1
         assert sel.corresponding_column(table.c.col1, require_embedded=True) is sel.c.table1_col1
         assert table.corresponding_column(sel.c.table1_col1) is table.c.col1
         assert table.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
-        
+
     def testjoinagainstjoin(self):
         j  = outerjoin(table, table2, table.c.col1==table2.c.col2)
         jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
         jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
         assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
-        
+
         j2 = jjj.alias('foo')
         print j2.corresponding_column(jjj.c.table1_col1)
         assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
-        
+
         assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
-        
+
     def testtablealias(self):
         a = table.alias('a')
-        
+
         j = join(a, table2)
-        
+
         criterion = a.c.col1 == table2.c.col2
         print
         print str(j)
@@ -124,7 +124,7 @@ class SelectableTest(AssertMixin):
         j1 = table.join(table2)
         assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
         assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
-        
+
     def testjoin(self):
         a = join(table, table2)
         print str(a.select(use_labels=True))
@@ -138,7 +138,7 @@ class SelectableTest(AssertMixin):
         a = table.select().alias('a')
         print str(a.select())
         j = join(a, table2)
-        
+
         criterion = a.c.col1 == table2.c.col2
         print criterion
         print j.onclause
@@ -148,7 +148,7 @@ class SelectableTest(AssertMixin):
         a = table.select(use_labels=True)
         print str(a.select())
         j = join(a, table2)
-        
+
         criterion = a.c.table1_col1 == table2.c.col2
         print
         print str(j)
@@ -163,17 +163,17 @@ class SelectableTest(AssertMixin):
         criterion = a.c.acol1 == table2.c.col2
         print str(j)
         self.assert_(criterion.compare(j.onclause))
-        
+
     def testselectaliaslabels(self):
         a = table2.select(use_labels=True).alias('a')
         print str(a.select())
         j = join(a, table)
-        
+
         criterion =  table.c.col1 == a.c.table2_col2
         print str(criterion)
         print str(j.onclause)
         self.assert_(criterion.compare(j.onclause))
-    
+
     def testtablejoinedtoselectoftable(self):
         metadata = MetaData()
         a = Table('a', metadata,
@@ -195,9 +195,10 @@ class SelectableTest(AssertMixin):
         assert j4.corresponding_column(j2.c.aid) is j4.c.aid
         assert j4.corresponding_column(a.c.id) is j4.c.id
 
+    @testing.emits_warning('.*replaced by another column with the same key')
     def test_oid(self):
         # the oid column of a selectable currently proxies all
-        # oid columns found within.  
+        # oid columns found within.
         s = table.select()
         s2 = table2.select()
         s3 = select([s, s2])
@@ -205,18 +206,18 @@ class SelectableTest(AssertMixin):
         assert s3.corresponding_column(table2.oid_column) is s3.oid_column
         assert s3.corresponding_column(s.oid_column) is s3.oid_column
         assert s3.corresponding_column(s2.oid_column) is s3.oid_column
-        
+
         u = s.union(s2)
         assert u.corresponding_column(table.oid_column) is u.oid_column
         assert u.corresponding_column(table2.oid_column) is u.oid_column
         assert u.corresponding_column(s.oid_column) is u.oid_column
         assert u.corresponding_column(s2.oid_column) is u.oid_column
-        
+
 class PrimaryKeyTest(AssertMixin):
     def test_join_pk_collapse_implicit(self):
-        """test that redundant columns in a join get 'collapsed' into a minimal primary key, 
+        """test that redundant columns in a join get 'collapsed' into a minimal primary key,
         which is the root column along a chain of foreign key relationships."""
-        
+
         meta = MetaData()
         a = Table('a', meta, Column('id', Integer, primary_key=True))
         b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))
@@ -225,7 +226,7 @@ class PrimaryKeyTest(AssertMixin):
 
         assert c.c.id.references(b.c.id)
         assert not d.c.id.references(a.c.id)
-        
+
         assert list(a.join(b).primary_key) == [a.c.id]
         assert list(b.join(c).primary_key) == [b.c.id]
         assert list(a.join(b).join(c).primary_key) == [a.c.id]
@@ -234,7 +235,7 @@ class PrimaryKeyTest(AssertMixin):
         assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
 
     def test_join_pk_collapse_explicit(self):
-        """test that redundant columns in a join get 'collapsed' into a minimal primary key, 
+        """test that redundant columns in a join get 'collapsed' into a minimal primary key,
         which is the root column along a chain of explicit join conditions."""
 
         meta = MetaData()
@@ -251,9 +252,9 @@ class PrimaryKeyTest(AssertMixin):
         assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
         assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]
         assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
-        
+
         assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]
-    
+
     def test_init_doesnt_blowitaway(self):
         meta = MetaData()
         a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
@@ -261,7 +262,7 @@ class PrimaryKeyTest(AssertMixin):
 
         j = a.join(b)
         assert list(j.primary_key) == [a.c.id]
-        
+
         j.foreign_keys
         assert list(j.primary_key) == [a.c.id]
 
@@ -279,20 +280,20 @@ class DerivedTest(AssertMixin):
         meta = MetaData()
         t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
         t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
-        
+
         assert t1.is_derived_from(t1)
         assert not t2.is_derived_from(t1)
-        
-    def test_alias(self):    
+
+    def test_alias(self):
         meta = MetaData()
         t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
         t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
-        
+
         assert t1.alias().is_derived_from(t1)
         assert not t2.alias().is_derived_from(t1)
         assert not t1.is_derived_from(t1.alias())
         assert not t1.is_derived_from(t2.alias())
-    
+
     def test_select(self):
         meta = MetaData()
         t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
@@ -309,4 +310,3 @@ class DerivedTest(AssertMixin):
 
 if __name__ == "__main__":
     testbase.main()
-
index c11f8fcd18399af8522025c9219938cc864bcc0f..8f1cc461ea608248ba51c838a0f423f691727987 100644 (file)
@@ -1,6 +1,5 @@
 import testbase
-import pickleable
-import datetime, os, re
+import datetime, os, pickleable, re
 from sqlalchemy import *
 from sqlalchemy import types, exceptions
 from sqlalchemy.sql import operators
@@ -341,24 +340,11 @@ class UnicodeTest(AssertMixin):
             self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
 
     def testassert(self):
-        import warnings
-
-        warnings.filterwarnings("always", r".*non-unicode bind")
-
-        ## test that data still goes in if warning is emitted....
-        unicode_table.insert().execute(unicode_varchar='not unicode')
-        assert (select([unicode_table.c.unicode_varchar]).execute().fetchall()
-                == [('not unicode', )])
-
-        warnings.filterwarnings("error", r".*non-unicode bind")
         try:
-            try:
-                unicode_table.insert().execute(unicode_varchar='not unicode')
-                assert False
-            except RuntimeWarning, e:
-                assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
-        finally:
-            warnings.filterwarnings("always", r".*non-unicode bind")
+            unicode_table.insert().execute(unicode_varchar='not unicode')
+            assert False
+        except exceptions.SAWarning, e:
+            assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
 
         unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
                                                       'assert_unicode':True})
@@ -368,6 +354,14 @@ class UnicodeTest(AssertMixin):
                 assert False
             except exceptions.InvalidRequestError, e:
                 assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"
+
+            @testing.emits_warning('.*non-unicode bind')
+            def warns():
+                # test that data still goes in if warning is emitted....
+                unicode_table.insert().execute(unicode_varchar='not unicode')
+                assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])
+            warns()
+
         finally:
             unicode_engine.dispose()
 
@@ -674,11 +668,6 @@ class StringTest(AssertMixin):
         foo =Table('foo', metadata,
             Column('one', String))
 
-        import warnings
-        from sqlalchemy.logging import SADeprecationWarning
-
-        warnings.filterwarnings("error", r"Using String type with no length.*")
-
         # no warning
         select([func.count("*")], bind=testbase.db).execute()
 
@@ -686,7 +675,7 @@ class StringTest(AssertMixin):
             # warning during CREATE
             foo.create()
             assert False
-        except SADeprecationWarning, e:
+        except exceptions.SADeprecationWarning, e:
             assert "Using String type with no length" in str(e)
             assert re.search(r'\bone\b', str(e))
 
@@ -700,8 +689,6 @@ class StringTest(AssertMixin):
             select([func.count("*")], from_obj=bar).execute()
         finally:
             bar.drop()
-            warnings.filterwarnings("always", r"Using String type with no length.*")
-
 
 class NumericTest(AssertMixin):
     def setUpAll(self):
index 526700c2a4f60b492e1c40dc3e00e7ab997aeb5e..1231dc1264b8e94cb64586aa6ae563af8054f234 100644 (file)
@@ -7,7 +7,7 @@ import itertools, unittest, re, sys, os, operator, warnings
 from cStringIO import StringIO
 import testlib.config as config
 sql, MetaData, clear_mappers, Session, util = None, None, None, None, None
-salogging = None
+sa_exceptions = None
 
 __all__ = ('PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest')
 
@@ -169,6 +169,46 @@ def _server_version(bind=None):
         bind = config.db
     return bind.dialect.server_version_info(bind.contextual_connect())
 
+def emits_warning(*messages):
+    """Mark a test as emitting a warning.
+
+    With no arguments, squelches all SAWarning failures.  Or pass one or more
+    strings; these will be matched to the root of the warning description by
+    warnings.filterwarnings().
+    """
+
+    # TODO: it would be nice to assert that a named warning was
+    # emitted. should work with some monkeypatching of warnings,
+    # and may work on non-CPython if they keep to the spirit of
+    # warnings.showwarning's docstring.
+    # - update: jython looks ok, it uses cpython's module
+    def decorate(fn):
+        def safe(*args, **kw):
+            global sa_exceptions
+            if sa_exceptions is None:
+                import sqlalchemy.exceptions as sa_exceptions
+
+            if not messages:
+                filters = [dict(action='ignore',
+                                category=sa_exceptions.SAWarning)]
+            else:
+                filters = [dict(action='ignore',
+                                message=message,
+                                category=sa_exceptions.SAWarning)
+                           for message in messages ]
+            for f in filters:
+                warnings.filterwarnings(**f)
+            try:
+                return fn(*args, **kw)
+            finally:
+                resetwarnings()
+        try:
+            safe.__name__ = fn.__name__
+        except:
+            pass
+        return safe
+    return decorate
+
 def uses_deprecated(*messages):
     """Mark a test as immune from fatal deprecation warnings.
 
@@ -183,17 +223,17 @@ def uses_deprecated(*messages):
 
     def decorate(fn):
         def safe(*args, **kw):
-            global salogging
-            if salogging is None:
-                from sqlalchemy import logging as salogging
+            global sa_exceptions
+            if sa_exceptions is None:
+                import sqlalchemy.exceptions as sa_exceptions
 
             if not messages:
                 filters = [dict(action='ignore',
-                                category=salogging.SADeprecationWarning)]
+                                category=sa_exceptions.SADeprecationWarning)]
             else:
                 filters = [dict(action='ignore',
                                 message=message,
-                                category=salogging.SADeprecationWarning)
+                                category=sa_exceptions.SADeprecationWarning)
                            for message in
                            [ (m.startswith('//') and
                               ('Call to deprecated function ' + m[2:]) or m)
@@ -206,7 +246,7 @@ def uses_deprecated(*messages):
             finally:
                 resetwarnings()
         try:
-            safe.__name__ = fn.name
+            safe.__name__ = fn.__name__
         except:
             pass
         return safe
@@ -215,11 +255,12 @@ def uses_deprecated(*messages):
 def resetwarnings():
     """Reset warning behavior to testing defaults."""
 
-    global salogging
-    if salogging is None:
-        from sqlalchemy import logging as salogging
+    global sa_exceptions
+    if sa_exceptions is None:
+        import sqlalchemy.exceptions as sa_exceptions
     warnings.resetwarnings()
-    warnings.filterwarnings('error', category=salogging.SADeprecationWarning)
+    warnings.filterwarnings('error', category=sa_exceptions.SADeprecationWarning)
+    warnings.filterwarnings('error', category=sa_exceptions.SAWarning)
 
 def against(*queries):
     """Boolean predicate, compares to testing database configuration.