- Fixed typo that was killing runs of individual named tests.
import testbase
-import sets, warnings
+import sets
from sqlalchemy import *
from sqlalchemy import sql, exceptions
from sqlalchemy.databases import mysql
def_table.drop()
@testing.exclude('mysql', '<', (5, 0, 0))
+ @testing.uses_deprecated('Using String type with no length')
def test_type_reflection(self):
# (ask_for, roundtripped_as_if_different)
specs = [( String(), mysql.MSText(), ),
m = MetaData(db)
t_table = Table('mysql_types', m, *columns)
try:
- try:
- warnings.filterwarnings('ignore',
- 'Using String type with no length.*')
- m.create_all()
- finally:
- warnings.filterwarnings("always",
- 'Using String type with no length.*')
+ m.create_all()
m2 = MetaData(db)
rt = Table('mysql_types', m2, autoload=True)
import testbase
-import pickle, StringIO, unicodedata, warnings
-
+import pickle, StringIO, unicodedata
from sqlalchemy import *
from sqlalchemy import exceptions
from sqlalchemy import types as sqltypes
dialect_module.ischema_names = {}
try:
try:
- warnings.filterwarnings('error', 'Did not recognize type')
m2 = MetaData(testbase.db)
t2 = Table("test", m2, autoload=True)
assert False
- except RuntimeWarning:
+ except exceptions.SAWarning:
assert True
- warnings.filterwarnings('ignore', 'Did not recognize type')
- m3 = MetaData(testbase.db)
- t3 = Table("test", m3, autoload=True)
- assert t3.c.foo.type.__class__ == sqltypes.NullType
+ @testing.emits_warning('Did not recognize type')
+ def warns():
+ m3 = MetaData(testbase.db)
+ t3 = Table("test", m3, autoload=True)
+ assert t3.c.foo.type.__class__ == sqltypes.NullType
finally:
dialect_module.ischema_names = ischema_names
- warnings.filterwarnings('always', 'Did not recognize type')
t.drop()
def test_override_fkandpkcol(self):
import testbase
-import warnings
from sqlalchemy import *
from sqlalchemy import exceptions, util
from sqlalchemy.orm import *
self._do_test(True)
def test_explicit_composite_pk(self):
- warnings.filterwarnings("error", r".*On mapper.*distinct primary key")
-
person_mapper = mapper(Person, person_table)
try:
mapper(Employee, employee_table, inherits=person_mapper, primary_key=[person_table.c.id, employee_table.c.id])
self._do_test(True)
assert False
- except RuntimeWarning, e:
+ except exceptions.SAWarning, e:
assert str(e) == "On mapper Mapper|Employee|employees, primary key column 'employees.id' is being combined with distinct primary key column 'persons.id' in attribute 'id'. Use explicit properties to give each column its own mapped attribute name.", str(e)
def test_explicit_pk(self):
def bad_expunge(foo):
raise Exception("this exception should be stated as a warning")
- import warnings
- warnings.filterwarnings("always", r".*this exception should be stated as a warning")
-
sess.expunge = bad_expunge
try:
Foo(_sa_session=sess)
assert False
except Exception, e:
- assert e is ex
+ assert isinstance(e, exceptions.SAWarning)
clear_mappers()
class QueryTest(FixtureTest):
keep_mappers = True
keep_data = True
-
+
def setUpAll(self):
super(QueryTest, self).setUpAll()
self.setup_mappers()
s.clear()
u2 = s.query(User).get(7)
assert u is not u2
-
+
def test_no_criterion(self):
"""test that get()/load() does not use preexisting filter/etc. criterion"""
s = create_session()
-
- import warnings
- warnings.filterwarnings("error", r".*Query.*")
- print "---------------------------------------"
try:
s.query(User).join('addresses').filter(Address.user_id==8).get(7)
assert False
- except RuntimeWarning, e:
+ except exceptions.SAWarning, e:
assert str(e) == "Query.get() being called on a Query with existing criterion; criterion is being ignored."
- warnings.filterwarnings("once", r".*Query.*")
-
- assert s.query(User).filter(User.id==7).get(19) is None
+ @testing.emits_warning('Query.*')
+ def warns():
+ assert s.query(User).filter(User.id==7).get(19) is None
- u = s.query(User).get(7)
- assert s.query(User).filter(User.id==9).get(7) is u
- s.clear()
- assert s.query(User).filter(User.id==9).get(7).id == u.id
+ u = s.query(User).get(7)
+ assert s.query(User).filter(User.id==9).get(7) is u
+ s.clear()
+ assert s.query(User).filter(User.id==9).get(7).id == u.id
- # user 10 has no addresses
- u = s.query(User).get(10)
- assert s.query(User).join('addresses').get(10) is u
- s.clear()
- assert s.query(User).join('addresses').get(10).id == u.id
-
- u = s.query(User).get(7)
- assert s.query(User).join('addresses').filter(Address.user_id==8).filter(User.id==7).first() is None
- assert s.query(User).join('addresses').filter(Address.user_id==8).get(7) is u
- s.clear()
- assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id
-
- assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id
+ # user 10 has no addresses
+ u = s.query(User).get(10)
+ assert s.query(User).join('addresses').get(10) is u
+ s.clear()
+ assert s.query(User).join('addresses').get(10).id == u.id
+ u = s.query(User).get(7)
+ assert s.query(User).join('addresses').filter(Address.user_id==8).filter(User.id==7).first() is None
+ assert s.query(User).join('addresses').filter(Address.user_id==8).get(7) is u
+ s.clear()
+ assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id
+
+ assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id
+ warns()
-
def test_unique_param_names(self):
class SomeUser(object):
pass
def test_op(self):
assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name_1"
-
+
def test_in(self):
self._test(User.id.in_(['a', 'b']),
"users.id IN (:users_id_1, :users_id_2)")
# one to many generates WHERE NOT EXISTS
assert [User(name='chuck')] == sess.query(User).filter_by(addresses = None).all()
-
+
class AggregateTest(QueryTest):
def test_sum(self):
sess = create_session()
assert [User(name=u'ed',id=8)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)> 2).all()
assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)< 2).all()
-
+
class CountTest(QueryTest):
def test_basic(self):
assert 4 == create_session().query(User).count()
def test_joined(self):
"""test that orderbys from a joined table get placed into the columns clause when DISTINCT is used"""
-
+
sess = create_session()
q = sess.query(User).join('addresses').distinct().order_by(desc(Address.email_address))
assert [User(id=7), User(id=9), User(id=8)] == q.all()
sess.clear()
-
- # test that it works on embedded eagerload/LIMIT subquery
+
+ # test that it works on embedded eagerload/LIMIT subquery
q = sess.query(User).join('addresses').distinct().options(eagerload('addresses')).order_by(desc(Address.email_address)).limit(2)
def go():
assert [
User(id=7, addresses=[
Address(id=1)
- ]),
+ ]),
User(id=9, addresses=[
Address(id=5)
- ]),
+ ]),
] == q.all()
self.assert_sql_count(testbase.db, go, 1)
-
+
class YieldTest(QueryTest):
def test_basic(self):
assert False
except StopIteration:
pass
-
+
class TextTest(QueryTest):
def test_fulltext(self):
assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users").all()
# test against None for parent? this can't be done with the current API since we don't know
# what mapper to use
#assert sess.query(Order).with_parent(None, property='addresses').all() == [Order(description="order 5")]
-
+
def test_noparent(self):
sess = create_session()
q = sess.query(User)
class JoinTest(QueryTest):
-
+
def test_getjoinable_tables(self):
sess = create_session()
-
+
sel1 = select([users]).alias()
sel2 = select([users], from_obj=users.join(addresses)).alias()
-
+
j1 = sel1.join(users, sel1.c.id==users.c.id)
j2 = j1.join(addresses)
-
+
for from_obj, assert_cond in (
(users, [users]),
(users.join(addresses), [users, addresses]),
(sel1.join(users, sel1.c.id==users.c.id), [sel1, users]),
(sel2.join(users, sel2.c.id==users.c.id), [sel2, users]),
(j2, [j1, j2, sel1, users, addresses])
-
+
):
ret = set(sess.query(User).select_from(from_obj)._get_joinable_tables())
self.assertEquals(ret, set(assert_cond).union([from_obj]), [x.description for x in ret])
-
+
def test_overlapping_paths(self):
for aliased in (True,False):
# load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
def go():
l = q.options(contains_alias('ulist'), contains_eager('addresses')).from_statement(query).all()
assert fixtures.user_address_result == l
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
def go():
l = q.options(contains_eager('addresses')).from_statement(selectquery).all()
assert fixtures.user_address_result[0:3] == l
assert fixtures.user_address_result == l
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
def go():
# test using the Alias object itself
l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute())
assert fixtures.user_address_result == l
self.assert_sql_count(testbase.db, go, 1)
-
+
sess.clear()
-
+
def decorate(row):
d = {}
for c in addresses.columns:
assert fixtures.user_address_result == l
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
oalias = orders.alias('o1')
ialias = items.alias('i1')
query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id).order_by(oalias.c.id).order_by(ialias.c.id)
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
# test using Alias with more than one level deep
def go():
l = q.options(contains_eager('orders', alias=oalias), contains_eager('orders.items', alias=ialias)).instances(query.execute())
q = sess.query(User)
l = q.instances(selectquery.execute(), Address)
assert l == expected
-
+
sess.clear()
-
+
for aliased in (False, True):
q = sess.query(User)
q = sess.query(User, Address).join('addresses', aliased=aliased).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
assert q.all() == [(user8, address3)]
sess.clear()
-
+
def test_aliased_multi_mappers(self):
sess = create_session()
assert l == expected
sess.clear()
-
+
q = sess.query(User).add_entity(Address, alias=adalias)
l = q.select_from(users.outerjoin(adalias)).filter(adalias.c.email_address=='ed@bettyboop.com').all()
assert l == [(user8, address3)]
sess = create_session()
expected = [(u, u.name) for u in sess.query(User).all()]
-
+
for add_col in (User.name, users.c.name, User.c.name):
assert sess.query(User).add_column(add_col).all() == expected
sess.clear()
-
+
try:
sess.query(User).add_column(object()).all()
assert False
except exceptions.InvalidRequestError, e:
assert "Invalid column expression" in str(e)
-
-
+
+
def test_multi_columns_2(self):
"""test aliased/nonalised joins with the usage of add_column()"""
sess = create_session()
l = q.all()
assert l == expected
sess.clear()
-
+
s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id)
q = sess.query(User)
l = q.add_column("count").from_statement(s).all()
assert l == expected
-
-
+
+
def test_two_columns(self):
sess = create_session()
(user7, user8, user9, user10) = sess.query(User).all()
q = create_session().query(User)
l = q.add_column("count").add_column("concat").from_statement(s).all()
assert l == expected
-
+
sess.clear()
-
+
# test with select_from()
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\
assert q.all() == expected
sess.clear()
-
+
# test with outerjoin() both aliased and non
for aliased in (False, True):
q = create_session().query(User).add_column(func.count(addresses.c.id))\
class SelectFromTest(QueryTest):
keep_mappers = False
-
+
def setup_mappers(self):
pass
-
+
def test_replace_with_select(self):
mapper(User, users, properties = {
'addresses':relation(Address)
})
mapper(Address, addresses)
-
+
sel = users.select(users.c.id.in_([7, 8])).alias()
sess = create_session()
-
+
self.assertEquals(sess.query(User).select_from(sel).all(), [User(id=7), User(id=8)])
self.assertEquals(sess.query(User).select_from(sel).filter(User.c.id==8).all(), [User(id=8)])
self.assertEquals(sess.query(User).select_from(sel).order_by(asc(User.name)).all(), [
User(name='ed',id=8), User(name='jack',id=7)
])
-
- self.assertEquals(sess.query(User).select_from(sel).options(eagerload('addresses')).first(),
+
+ self.assertEquals(sess.query(User).select_from(sel).options(eagerload('addresses')).first(),
User(name='jack', addresses=[Address(id=1)])
)
sel = users.select(users.c.id.in_([7, 8]))
sess = create_session()
- self.assertEquals(sess.query(User).select_from(sel).join('addresses').add_entity(Address).order_by(User.id).order_by(Address.id).all(),
+ self.assertEquals(sess.query(User).select_from(sel).join('addresses').add_entity(Address).order_by(User.id).order_by(Address.id).all(),
[
- (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
- (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
+ (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
+ (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
(User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)),
(User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4))
]
)
- self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(),
+ self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(),
[
- (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
- (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
+ (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
+ (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
(User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)),
(User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4))
]
'keywords':relation(Keyword, secondary=item_keywords, order_by=keywords.c.id) #m2m
})
mapper(Keyword, keywords)
-
+
sel = users.select(users.c.id.in_([7, 8]))
sess = create_session()
-
+
self.assertEquals(sess.query(User).select_from(sel).join(['orders', 'items', 'keywords']).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
User(name=u'jack',id=7)
])
self.assertEquals(sess.query(User).select_from(sel).options(eagerload_all('orders.items.keywords')).join(['orders', 'items', 'keywords'], aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
User(name=u'jack',orders=[
Order(description=u'order 1',items=[
- Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]),
- Item(description=u'item 2',keywords=[Keyword(name=u'red',id=2), Keyword(name=u'small',id=5), Keyword(name=u'square')]),
+ Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]),
+ Item(description=u'item 2',keywords=[Keyword(name=u'red',id=2), Keyword(name=u'small',id=5), Keyword(name=u'square')]),
Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)])
- ]),
+ ]),
Order(description=u'order 3',items=[
- Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]),
- Item(description=u'item 4',keywords=[],id=4),
+ Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]),
+ Item(description=u'item 4',keywords=[],id=4),
Item(description=u'item 5',keywords=[],id=5)
- ]),
+ ]),
Order(description=u'order 5',items=[Item(description=u'item 5',keywords=[])])])
])
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
sel2 = orders.select(orders.c.id.in_([1,2,3]))
self.assertEquals(sess.query(Order).select_from(sel2).join(['items', 'keywords']).filter(Keyword.name == 'red').all(), [
- Order(description=u'order 1',id=1),
- Order(description=u'order 2',id=2),
+ Order(description=u'order 1',id=1),
+ Order(description=u'order 2',id=2),
])
self.assertEquals(sess.query(Order).select_from(sel2).join(['items', 'keywords'], aliased=True).filter(Keyword.name == 'red').all(), [
- Order(description=u'order 1',id=1),
- Order(description=u'order 2',id=2),
+ Order(description=u'order 1',id=1),
+ Order(description=u'order 2',id=2),
])
-
-
+
+
def test_replace_with_eager(self):
mapper(User, users, properties = {
'addresses':relation(Address)
})
mapper(Address, addresses)
-
+
sel = users.select(users.c.id.in_([7, 8]))
sess = create_session()
-
+
def go():
- self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).all(),
+ self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).all(),
[
- User(id=7, addresses=[Address(id=1)]),
+ User(id=7, addresses=[Address(id=1)]),
User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])
]
)
self.assert_sql_count(testbase.db, go, 1)
sess.clear()
-
+
def go():
- self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(),
+ self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(),
[User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])]
)
self.assert_sql_count(testbase.db, go, 1)
def go():
self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel)[1], User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)]))
self.assert_sql_count(testbase.db, go, 1)
-
+
class CustomJoinTest(QueryTest):
keep_mappers = False
if __name__ == '__main__':
testbase.main()
-
-
class TraversalTest(AssertMixin):
"""test ClauseVisitor's traversal, particularly its ability to copy and modify
a ClauseElement in place."""
-
+
def setUpAll(self):
global A, B
-
+
# establish two ficticious ClauseElements.
# define deep equality semantics as well as deep identity semantics.
class A(ClauseElement):
def is_other(self, other):
return other is self
-
+
def __eq__(self, other):
return other.expr == self.expr
-
+
def __ne__(self, other):
return other.expr != self.expr
-
+
def __str__(self):
return "A(%s)" % repr(self.expr)
-
+
class B(ClauseElement):
def __init__(self, *items):
self.items = items
if i1 != i2:
return False
return True
-
+
def __ne__(self, other):
for i1, i2 in zip(self.items, other.items):
if i1 != i2:
return True
return False
-
- def _copy_internals(self, clone=_clone):
+
+ def _copy_internals(self, clone=_clone):
self.items = [clone(i) for i in self.items]
def get_children(self, **kwargs):
return self.items
-
+
def __str__(self):
return "B(%s)" % repr([str(i) for i in self.items])
-
+
def test_test_classes(self):
a1 = A("expr1")
struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
assert struct != struct3
assert not struct.is_other(struct2)
assert not struct.is_other(struct3)
-
- def test_clone(self):
+
+ def test_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
-
+
class Vis(ClauseVisitor):
def visit_a(self, a):
pass
def visit_b(self, b):
pass
-
+
vis = Vis()
s2 = vis.traverse(struct, clone=True)
assert struct == s2
assert not struct.is_other(s2)
- def test_no_clone(self):
+ def test_no_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
class Vis(ClauseVisitor):
s2 = vis.traverse(struct, clone=False)
assert struct == s2
assert struct.is_other(s2)
-
+
def test_change_in_place(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), A("expr2b")), A("expr3"))
s3 = vis2.traverse(struct, clone=True)
assert struct != s3
assert struct3 == s3
-
-
+
+
class ClauseTest(SQLCompileTest):
"""test copy-in-place behavior of various ClauseElements."""
-
+
def setUpAll(self):
global t1, t2
- t1 = table("table1",
+ t1 = table("table1",
column("col1"),
column("col2"),
column("col3"),
)
- t2 = table("table2",
+ t2 = table("table2",
column("col1"),
column("col2"),
column("col3"),
)
-
+
def test_binary(self):
clause = t1.c.col2 == t2.c.col2
assert str(clause) == ClauseVisitor().traverse(clause, clone=True)
-
+
def test_join(self):
clause = t1.join(t2, t1.c.col2==t2.c.col2)
c1 = str(clause)
assert str(clause) == str(ClauseVisitor().traverse(clause, clone=True))
-
+
class Vis(ClauseVisitor):
def visit_binary(self, binary):
binary.right = t2.c.col3
-
+
clause2 = Vis().traverse(clause, clone=True)
assert c1 == str(clause)
assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
-
+
def test_text(self):
clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')])
c1 = str(clause)
def visit_textclause(self, text):
text.text = text.text + " SOME MODIFIER=:lala"
text.bindparams['lala'] = bindparam('lala')
-
+
clause2 = Vis().traverse(clause, clone=True)
assert c1 == str(clause)
assert str(clause2) == c1 + " SOME MODIFIER=:lala"
assert clause.bindparams.keys() == ['bar']
assert util.Set(clause2.bindparams.keys()) == util.Set(['bar', 'lala'])
-
+
def test_select(self):
s2 = select([t1])
s2_assert = str(s2)
assert str(s2) == s3_assert
print "------------------"
-
+
s4_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col3==9)))
class Vis(ClauseVisitor):
def visit_select(self, select):
print str(s4)
assert str(s4) == s4_assert
assert str(s3) == s3_assert
-
+
print "------------------"
s5_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col1==9)))
class Vis(ClauseVisitor):
print str(s5)
assert str(s5) == s5_assert
assert str(s4) == s4_assert
-
+
def test_binds(self):
"""test that unique bindparams change their name upon clone() to prevent conflicts"""
-
+
s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias()
s2 = ClauseVisitor().traverse(s, clone=True).alias()
s3 = select([s], s.c.col2==s2.c.col2)
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) AS anon_1, "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 "\
"WHERE anon_1.col2 = anon_2.col2")
-
+
s = select([t1], t1.c.col1==4).alias()
s2 = ClauseVisitor().traverse(s, clone=True).alias()
s3 = select([s], s.c.col2==s2.c.col2)
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_1) AS anon_1, "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_2) AS anon_2 "\
"WHERE anon_1.col2 = anon_2.col2")
-
+
+ @testing.emits_warning('.*replaced by another column with the same key')
def test_alias(self):
subq = t2.select().alias('subq')
s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)])
s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3, clone=True)
assert orig == str(s) == str(s3) == str(s4)
-
+
def test_correlated_select(self):
s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2)
class Vis(ClauseVisitor):
def visit_select(self, select):
select.append_whereclause(t1.c.col2==7)
-
+
self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2_1")
class ClauseAdapterTest(SQLCompileTest):
def setUpAll(self):
global t1, t2
- t1 = table("table1",
+ t1 = table("table1",
column("col1"),
column("col2"),
column("col3"),
)
- t2 = table("table2",
+ t2 = table("table2",
column("col1"),
column("col2"),
column("col3"),
)
-
+
def test_table_to_alias(self):
-
+
t1alias = t1.alias('t1alias')
-
+
vis = sql_util.ClauseAdapter(t1alias)
ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
assert ff._get_from_objects() == [t1alias]
-
+
self.assert_compile(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
self.assert_compile(ff, "count(t1alias.col1) AS foo")
assert ff._get_from_objects() == [t1alias]
-
+
# TODO:
# self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
-
+
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2")
-
+
def test_include_exclude(self):
m = MetaData()
a=Table( 'a',m,
e = sql_util.ClauseAdapter( b, include= set([ a.c.id ]),
equivalents= { a.c.id: set([ a.c.id]) }
).traverse( e)
-
+
assert str(e) == "a_1.id = a.xxx_id"
def test_join_to_alias(self):
" LEFT OUTER JOIN d ON a_id = d.aid")
j5 = j3.alias('foo')
j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0]
-
+
# this statement takes c join(a join b), wraps it inside an aliased "select * from c join(a join b) AS foo".
# the outermost right side "left outer join d" stays the same, except "d" joins against foo.a_id instead
# of plain "a_id"
"c JOIN (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid FROM a LEFT OUTER JOIN b ON a.id = b.aid) "
"ON b_id = c.bid) AS foo"
" LEFT OUTER JOIN d ON foo.a_id = d.aid")
-
+
def test_derived_from(self):
assert select([t1]).is_derived_from(t1)
assert not select([t2]).is_derived_from(t1)
assert not t1.is_derived_from(select([t1]))
assert t1.alias().is_derived_from(t1)
-
-
+
+
s1 = select([t1, t2]).alias('foo')
s2 = select([s1]).limit(5).offset(10).alias()
assert s2.is_derived_from(s1)
s2 = s2._clone()
assert s2.is_derived_from(s1)
-
+
def test_aliasedselect_to_aliasedselect(self):
# original issue from ticket #904
s1 = select([t1]).alias('foo')
s2 = select([s1]).limit(5).offset(10).alias()
- self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1),
+ self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1),
"SELECT foo.col1, foo.col2, foo.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10")
-
+
j = s1.outerjoin(t2, s1.c.col1==t2.c.col1)
- self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
+ self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
"SELECT anon_1.col1, anon_1.col2, anon_1.col3, table2.col1, table2.col2, table2.col3 FROM "\
"(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\
talias = t1.alias('bar')
j = s1.outerjoin(talias, s1.c.col1==talias.c.col1)
- self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
+ self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
"SELECT anon_1.col1, anon_1.col2, anon_1.col3, bar.col1, bar.col2, bar.col3 FROM "\
"(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\
"LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1")
-
-
+
+
class SelectTest(SQLCompileTest):
"""tests the generative capability of Select"""
def setUpAll(self):
global t1, t2
- t1 = table("table1",
+ t1 = table("table1",
column("col1"),
column("col2"),
column("col3"),
)
- t2 = table("table2",
+ t2 = table("table2",
column("col1"),
column("col2"),
column("col3"),
)
-
+
def test_select(self):
- self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3),
+ self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3),
"SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1_1 ORDER BY table1.col3")
-
- self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3),
+
+ self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3),
"SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
"FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3")
-
+
s = select([t2], t2.c.col1==t1.c.col1, correlate=False)
s = s.correlate(t1).order_by(t2.c.col3)
- self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3),
+ self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3),
"SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
"FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3")
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
select_copy = s.column('yyy')
self.assert_compile(select_copy, "SELECT table1.col1, table1.col2, table1.col3, yyy FROM table1")
- assert s.columns is not select_copy.columns
+ assert s.columns is not select_copy.columns
assert s._columns is not select_copy._columns
assert s._raw_columns is not select_copy._raw_columns
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
self.assert_compile(s2, "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
"(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 "
"WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
-
- s3 = s.correlate(None)
+
+ s3 = s.correlate(None)
self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
"(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 "
"WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
"(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 "
"WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
-
+
def test_prefixes(self):
s = t1.select()
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
if __name__ == '__main__':
- testbase.main()
+ testbase.main()
"""tests that various From objects properly export their columns, as well as
useable primary keys and foreign keys. Full relational algebra depends on
every selectable unit behaving nicely with others.."""
-
+
import testbase
from sqlalchemy import *
from testlib import *
metadata = MetaData()
-table = Table('table1', metadata,
+table = Table('table1', metadata,
Column('col1', Integer, primary_key=True),
Column('col2', String(20)),
Column('col3', Integer),
Column('colx', Integer),
-
+
)
table2 = Table('table2', metadata,
#assert s.corresponding_column(table.c.col1) is s.c.col1
assert s.corresponding_column(s.c.col1) is s.c.col1
assert s.corresponding_column(s.c.c1) is s.c.c1
-
+
def testjoinagainstself(self):
jj = select([table.c.col1.label('bar_col1')])
jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
-
+
# test column directly agaisnt itself
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
-
- # test alias of the join, targets the column with the least
+
+ # test alias of the join, targets the column with the least
# "distance" between the requested column and the returned column
# (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than
# there is from j2.c.bar_col1 to table.c.col1)
j2 = jjj.alias('foo')
assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1
-
+
def testselectontable(self):
sel = select([table, table2], use_labels=True)
assert sel.corresponding_column(table.c.col1) is sel.c.table1_col1
assert sel.corresponding_column(table.c.col1, require_embedded=True) is sel.c.table1_col1
assert table.corresponding_column(sel.c.table1_col1) is table.c.col1
assert table.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
-
+
def testjoinagainstjoin(self):
j = outerjoin(table, table2, table.c.col1==table2.c.col2)
jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
-
+
j2 = jjj.alias('foo')
print j2.corresponding_column(jjj.c.table1_col1)
assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
-
+
assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
-
+
def testtablealias(self):
a = table.alias('a')
-
+
j = join(a, table2)
-
+
criterion = a.c.col1 == table2.c.col2
print
print str(j)
j1 = table.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
-
+
def testjoin(self):
a = join(table, table2)
print str(a.select(use_labels=True))
a = table.select().alias('a')
print str(a.select())
j = join(a, table2)
-
+
criterion = a.c.col1 == table2.c.col2
print criterion
print j.onclause
a = table.select(use_labels=True)
print str(a.select())
j = join(a, table2)
-
+
criterion = a.c.table1_col1 == table2.c.col2
print
print str(j)
criterion = a.c.acol1 == table2.c.col2
print str(j)
self.assert_(criterion.compare(j.onclause))
-
+
def testselectaliaslabels(self):
a = table2.select(use_labels=True).alias('a')
print str(a.select())
j = join(a, table)
-
+
criterion = table.c.col1 == a.c.table2_col2
print str(criterion)
print str(j.onclause)
self.assert_(criterion.compare(j.onclause))
-
+
def testtablejoinedtoselectoftable(self):
metadata = MetaData()
a = Table('a', metadata,
assert j4.corresponding_column(j2.c.aid) is j4.c.aid
assert j4.corresponding_column(a.c.id) is j4.c.id
+ @testing.emits_warning('.*replaced by another column with the same key')
def test_oid(self):
# the oid column of a selectable currently proxies all
- # oid columns found within.
+ # oid columns found within.
s = table.select()
s2 = table2.select()
s3 = select([s, s2])
assert s3.corresponding_column(table2.oid_column) is s3.oid_column
assert s3.corresponding_column(s.oid_column) is s3.oid_column
assert s3.corresponding_column(s2.oid_column) is s3.oid_column
-
+
u = s.union(s2)
assert u.corresponding_column(table.oid_column) is u.oid_column
assert u.corresponding_column(table2.oid_column) is u.oid_column
assert u.corresponding_column(s.oid_column) is u.oid_column
assert u.corresponding_column(s2.oid_column) is u.oid_column
-
+
class PrimaryKeyTest(AssertMixin):
def test_join_pk_collapse_implicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
+ """test that redundant columns in a join get 'collapsed' into a minimal primary key,
which is the root column along a chain of foreign key relationships."""
-
+
meta = MetaData()
a = Table('a', meta, Column('id', Integer, primary_key=True))
b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))
assert c.c.id.references(b.c.id)
assert not d.c.id.references(a.c.id)
-
+
assert list(a.join(b).primary_key) == [a.c.id]
assert list(b.join(c).primary_key) == [b.c.id]
assert list(a.join(b).join(c).primary_key) == [a.c.id]
assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
def test_join_pk_collapse_explicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
+ """test that redundant columns in a join get 'collapsed' into a minimal primary key,
which is the root column along a chain of explicit join conditions."""
meta = MetaData()
assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]
assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
-
+
assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]
-
+
def test_init_doesnt_blowitaway(self):
meta = MetaData()
a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
j = a.join(b)
assert list(j.primary_key) == [a.c.id]
-
+
j.foreign_keys
assert list(j.primary_key) == [a.c.id]
meta = MetaData()
t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
-
+
assert t1.is_derived_from(t1)
assert not t2.is_derived_from(t1)
-
- def test_alias(self):
+
+ def test_alias(self):
meta = MetaData()
t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
-
+
assert t1.alias().is_derived_from(t1)
assert not t2.alias().is_derived_from(t1)
assert not t1.is_derived_from(t1.alias())
assert not t1.is_derived_from(t2.alias())
-
+
def test_select(self):
meta = MetaData()
t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
if __name__ == "__main__":
testbase.main()
-
import testbase
-import pickleable
-import datetime, os, re
+import datetime, os, pickleable, re
from sqlalchemy import *
from sqlalchemy import types, exceptions
from sqlalchemy.sql import operators
self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
def testassert(self):
- import warnings
-
- warnings.filterwarnings("always", r".*non-unicode bind")
-
- ## test that data still goes in if warning is emitted....
- unicode_table.insert().execute(unicode_varchar='not unicode')
- assert (select([unicode_table.c.unicode_varchar]).execute().fetchall()
- == [('not unicode', )])
-
- warnings.filterwarnings("error", r".*non-unicode bind")
try:
- try:
- unicode_table.insert().execute(unicode_varchar='not unicode')
- assert False
- except RuntimeWarning, e:
- assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
- finally:
- warnings.filterwarnings("always", r".*non-unicode bind")
+ unicode_table.insert().execute(unicode_varchar='not unicode')
+ assert False
+ except exceptions.SAWarning, e:
+ assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
'assert_unicode':True})
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"
+
+ @testing.emits_warning('.*non-unicode bind')
+ def warns():
+ # test that data still goes in if warning is emitted....
+ unicode_table.insert().execute(unicode_varchar='not unicode')
+ assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])
+ warns()
+
finally:
unicode_engine.dispose()
foo =Table('foo', metadata,
Column('one', String))
- import warnings
- from sqlalchemy.logging import SADeprecationWarning
-
- warnings.filterwarnings("error", r"Using String type with no length.*")
-
# no warning
select([func.count("*")], bind=testbase.db).execute()
# warning during CREATE
foo.create()
assert False
- except SADeprecationWarning, e:
+ except exceptions.SADeprecationWarning, e:
assert "Using String type with no length" in str(e)
assert re.search(r'\bone\b', str(e))
select([func.count("*")], from_obj=bar).execute()
finally:
bar.drop()
- warnings.filterwarnings("always", r"Using String type with no length.*")
-
class NumericTest(AssertMixin):
def setUpAll(self):
from cStringIO import StringIO
import testlib.config as config
sql, MetaData, clear_mappers, Session, util = None, None, None, None, None
-salogging = None
+sa_exceptions = None
__all__ = ('PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest')
bind = config.db
return bind.dialect.server_version_info(bind.contextual_connect())
+def emits_warning(*messages):
+ """Mark a test as emitting a warning.
+
+ With no arguments, squelches all SAWarning failures. Or pass one or more
+ strings; these will be matched to the root of the warning description by
+ warnings.filterwarnings().
+ """
+
+ # TODO: it would be nice to assert that a named warning was
+ # emitted. should work with some monkeypatching of warnings,
+ # and may work on non-CPython if they keep to the spirit of
+ # warnings.showwarning's docstring.
+ # - update: jython looks ok, it uses cpython's module
+ def decorate(fn):
+ def safe(*args, **kw):
+ global sa_exceptions
+ if sa_exceptions is None:
+ import sqlalchemy.exceptions as sa_exceptions
+
+ if not messages:
+ filters = [dict(action='ignore',
+ category=sa_exceptions.SAWarning)]
+ else:
+ filters = [dict(action='ignore',
+ message=message,
+ category=sa_exceptions.SAWarning)
+ for message in messages ]
+ for f in filters:
+ warnings.filterwarnings(**f)
+ try:
+ return fn(*args, **kw)
+ finally:
+ resetwarnings()
+ try:
+ safe.__name__ = fn.__name__
+ except:
+ pass
+ return safe
+ return decorate
+
def uses_deprecated(*messages):
"""Mark a test as immune from fatal deprecation warnings.
def decorate(fn):
def safe(*args, **kw):
- global salogging
- if salogging is None:
- from sqlalchemy import logging as salogging
+ global sa_exceptions
+ if sa_exceptions is None:
+ import sqlalchemy.exceptions as sa_exceptions
if not messages:
filters = [dict(action='ignore',
- category=salogging.SADeprecationWarning)]
+ category=sa_exceptions.SADeprecationWarning)]
else:
filters = [dict(action='ignore',
message=message,
- category=salogging.SADeprecationWarning)
+ category=sa_exceptions.SADeprecationWarning)
for message in
[ (m.startswith('//') and
('Call to deprecated function ' + m[2:]) or m)
finally:
resetwarnings()
try:
- safe.__name__ = fn.name
+ safe.__name__ = fn.__name__
except:
pass
return safe
def resetwarnings():
"""Reset warning behavior to testing defaults."""
- global salogging
- if salogging is None:
- from sqlalchemy import logging as salogging
+ global sa_exceptions
+ if sa_exceptions is None:
+ import sqlalchemy.exceptions as sa_exceptions
warnings.resetwarnings()
- warnings.filterwarnings('error', category=salogging.SADeprecationWarning)
+ warnings.filterwarnings('error', category=sa_exceptions.SADeprecationWarning)
+ warnings.filterwarnings('error', category=sa_exceptions.SAWarning)
def against(*queries):
"""Boolean predicate, compares to testing database configuration.