# the select test now tests almost completely with TableClause/ColumnClause objects,
-# which are free-roaming table/column objects not attached to any database.
+# which are free-roaming table/column objects not attached to any database.
# so SQLAlchemy's SQL construction engine can be used with no database dependencies at all.
-table1 = table('mytable',
+table1 = table('mytable',
column('myid', Integer),
column('name', String),
column('description', String),
)
table2 = table(
- 'myothertable',
+ 'myothertable',
column('otherid', Integer),
column('othername', String),
)
table3 = table(
- 'thirdtable',
+ 'thirdtable',
column('userid', Integer),
column('otherstuff', String),
)
schema = 'remote_owner'
)
-users = table('users',
+users = table('users',
column('user_id'),
column('user_name'),
column('password'),
)
-addresses = table('addresses',
+addresses = table('addresses',
column('address_id'),
column('user_id'),
column('street'),
)
class SelectTest(SQLCompileTest):
-
+
def test_attribute_sanity(self):
assert hasattr(table1, 'c')
assert hasattr(table1.select(), 'c')
assert not hasattr(table1.select().c.myid, 'columns')
assert not hasattr(table1.alias().c.myid, 'columns')
assert not hasattr(table1.alias().c.myid, 'c')
-
+
def testtableselect(self):
self.assert_compile(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable")
)
,
"SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid")
-
+
sq = select([table1])
self.assert_compile(
sq.select(),
"SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)"
)
-
+
sq = select(
[table1],
).alias('sq')
self.assert_compile(
- sq.select(sq.c.myid == 7),
+ sq.select(sq.c.myid == 7),
"SELECT sq.myid, sq.name, sq.description FROM \
(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid"
)
-
+
sq = select(
[table1, table2],
and_(table1.c.myid ==7, table2.c.otherid==table1.c.myid),
use_labels = True
).alias('sq')
-
+
sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \
mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \
myothertable.othername AS myothertable_othername FROM mytable, myothertable \
(SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \
sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \
sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") AS sq) AS sq2")
-
+
def testmssql_noorderbyinsubquery(self):
"""test that the ms-sql dialect removes ORDER BY clauses from subqueries"""
dialect = mssql.dialect()
# TODO: this is probably incorrect; no "AS <foo>" is being applied to the table
self.assert_compile(table1.join(table4, table1.c.myid==table4.c.rem_id).select(), "SELECT mytable.myid, mytable.name, mytable.description, remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM mytable JOIN remote_owner.remotetable ON remotetable.rem_id = mytable.myid")
-
+
def testdontovercorrelate(self):
self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""")
-
+
def testexistsascolumnclause(self):
self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5})
self.assert_compile(select([table1, exists([1], from_obj=[table2])]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
self.assert_compile(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
-
+
def test_generative_exists(self):
self.assert_compile(
table1.select(exists([1], table2.c.otherid == table1.c.myid).correlate(table1)),
table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)"
)
-
+
def testwheresubquery(self):
s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
self.assert_compile(
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
self.assert_compile(
- table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')),
+ table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name)"
)
-
+
self.assert_compile(
table1.select(table1.c.myid == select([table2.c.otherid], table1.c.name == table2.c.othername)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid FROM myothertable WHERE mytable.name = myothertable.othername)"
self.assert_compile(
select([users, s.c.street], from_obj=[s]),
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
-
+
# test constructing the outer query via append_column(), which occurs in the ORM's Query object
s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=[table1])
s.append_column(table1)
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
)
-
+
def testorderbysubquery(self):
self.assert_compile(
table1.select(order_by=[select([table2.c.otherid], table1.c.myid==table2.c.otherid)]),
table1.select(order_by=[desc(select([table2.c.otherid], table1.c.myid==table2.c.otherid))]),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC"
)
-
-
+
+
def test_scalar_select(self):
s = select([table1.c.myid], scalar=True, correlate=False)
self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable")
s.columns.foo
except exceptions.InvalidRequestError, err:
assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.'
-
+
zips = table('zips',
column('zipcode'),
column('latitude'),
zip = '12345'
qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar()
qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar()
-
+
q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
zips.c.zipcode==zip,
order_by = ['dist', places.c.nm]
self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE "
"zips.zipcode = :zips_zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_1)) AS dist "
"FROM places, zips WHERE zips.zipcode = :zips_zipcode_2 ORDER BY dist, places.nm")
-
+
zalias = zips.alias('main_zip')
qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
s2 = select([table1, s1], from_obj=[j1])
self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
-
+
def testlabelcomparison(self):
x = func.lala(table1.c.myid).label('foo')
self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :literal")
-
+
def testand(self):
self.assert_compile(
- select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")),
+ select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")),
"SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()"
)
select([table1], and_(
table1.c.myid == 12,
or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9),
- "sysdate() = today()",
+ "sysdate() = today()",
)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_othername_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()",
checkparams = {'myothertable_othername': 'asdf', 'myothertable_othername_1':'foo', 'myothertable_otherid': 9, 'mytable_myid': 12}
self.assert_compile(
select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) FROM mytable"
)
-
+
def testoperators(self):
# exercise arithmetic operators
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
fwd_sql + "'\n or\n'" + rev_sql + "'")
self.assert_compile(
- table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
+ table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND mytable.name != :mytable_name"
)
self.assert_compile(
- table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))),
+ table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name BETWEEN :mytable_name AND :mytable_name_1)"
)
self.assert_compile(
- table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
+ table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name AND mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2)"
)
self.assert_compile(
- table1.select((table1.c.myid != 12) & ~table1.c.name),
+ table1.select((table1.c.myid != 12) & ~table1.c.name),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name"
)
table1.select(table1.c.myid.op('hoho')(12)==14),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :literal"
)
-
+
# test that clauses can be pickled (operators need to be module-level, etc.)
clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & table1.c.myid.like('hoho')
assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause)))
def testmultiparam(self):
self.assert_compile(
- select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')),
+ select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')),
"SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2"
)
select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]),
"SELECT myothertable.othername, count(myothertable.otherid) FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
)
-
+
def testforupdate(self):
self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE")
-
+
self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE")
self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE NOWAIT", dialect=oracle.dialect())
self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect())
self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE", dialect=oracle.dialect())
-
+
def testalias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
self.assert_compile(
# labels tablename_columnname, which become the column keys accessible off the Selectable object.
# also, only use one column from the second table and all columns from the first table1.
q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True)
-
+
# make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view".
a = alias(q, 't2view')
myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid"
)
-
-
+
+
def test_prefixes(self):
self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
"SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable"
)
-
+
def testtext(self):
self.assert_compile(
text("select * from foo where lala = bar") ,
["foobar(a)", "pk_foo_bar(syslaal)"],
"a = 12",
from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
- ),
+ ),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
# test unicode
[u"foobar(a)", u"pk_foo_bar(syslaal)"],
u"a = 12",
from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
- ),
+ ),
u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
# test building a select query programmatically with text
select(["column1", "column2"], from_obj=[table1]).alias('somealias').select(),
"SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias"
)
-
+
# test that use_labels doesnt interfere with literal columns
self.assert_compile(
select(["column1", "column2", table1.c.myid], from_obj=[table1], use_labels=True),
select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1], use_labels=True),
"SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
)
-
+
print "---------------------------------------------"
s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1])
print "---------------------------------------------"
def testtextbinds(self):
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
- "select * from foo where lala=:bar and hoho=:whee",
+ text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
+ "select * from foo where lala=:bar and hoho=:whee",
checkparams={'bar':4, 'whee': 7},
)
self.assert_compile(
- text("select * from foo where clock='05:06:07'"),
- "select * from foo where clock='05:06:07'",
+ text("select * from foo where clock='05:06:07'"),
+ "select * from foo where clock='05:06:07'",
checkparams={},
params={},
)
dialect = postgres.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
- "select * from foo where lala=%(bar)s and hoho=%(whee)s",
+ text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ "select * from foo where lala=%(bar)s and hoho=%(whee)s",
checkparams={'bar':4, 'whee': 7},
dialect=dialect
)
dialect = sqlite.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
- "select * from foo where lala=? and hoho=?",
+ text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ "select * from foo where lala=? and hoho=?",
checkparams={'bar':4, 'whee':7},
dialect=dialect
)
-
+
def testtextmix(self):
self.assert_compile(select(
[table1, table2.c.otherid, "sysdate()", "foo, bar, lala"],
"datetime(foo) = Today",
table1.c.myid == table2.c.otherid,
)
- ),
+ ),
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \
FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid")
[alias(table1, 't'), "foo.f"],
"foo.f = t.id",
from_obj = ["(select f from bar where lala=heyhey) foo"]
- ),
+ ),
"SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
def testliteral(self):
- self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
+ self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
"SELECT :literal || :literal_1 FROM mytable")
def testcalculatedcolumns(self):
select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0),
"SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :literal"
)
-
+
def testfunction(self):
"""tests the generation of functions using the func keyword"""
# test an expression with a function
- self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
+ self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
"lala(:lala, :lala_1, :literal, mytable.myid) * myothertable.otherid")
# test it in a SELECT
- self.assert_compile(select([func.count(table1.c.myid)]),
+ self.assert_compile(select([func.count(table1.c.myid)]),
"SELECT count(mytable.myid) FROM mytable")
# test a "dotted" function name
- self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]),
+ self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]),
"SELECT foo.bar.lala(mytable.myid) FROM mytable")
# test the bind parameter name with a "dotted" function name is only the name
# (limits the length of the bind param name)
- self.assert_compile(select([func.foo.bar.lala(12)]),
+ self.assert_compile(select([func.foo.bar.lala(12)]),
"SELECT foo.bar.lala(:lala)")
# test a dotted func off the engine itself
self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho)")
-
+
# test None becomes NULL
self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func, :my_func_1, NULL, :my_func_2)")
-
+
# test pickling
self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func, :my_func_1, NULL, :my_func_2)")
-
+
# assert func raises AttributeError for __bases__ attribute, since its not a class
# fixes pydoc
try:
def test_functions_with_cols(self):
from sqlalchemy.sql import column
users = table('users', column('id'), column('name'), column('fullname'))
- calculate = select([column('q'), column('z'), column('r')],
+ calculate = select([column('q'), column('z'), column('r')],
from_obj=[func.calculate(bindparam('x'), bindparam('y'))])
-
- self.assert_compile(select([users], users.c.id > calculate.c.z),
+
+ self.assert_compile(select([users], users.c.id > calculate.c.z),
"SELECT users.id, users.name, users.fullname "
"FROM users, (SELECT q, z, r "
"FROM calculate(:x, :y)) "
print "--------------------------------------------------"
s = select([users], users.c.id.between(
- calculate.alias('c1').unique_params(x=17, y=45).c.z,
+ calculate.alias('c1').unique_params(x=17, y=45).c.z,
calculate.alias('c2').unique_params(x=5, y=12).c.z))
- self.assert_compile(s,
+ self.assert_compile(s,
"SELECT users.id, users.name, users.fullname "
"FROM users, (SELECT q, z, r "
"FROM calculate(:x, :y)) AS c1, (SELECT q, z, r "
def testextract(self):
"""test the EXTRACT function"""
self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) FROM thirdtable")
-
+
self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date, :to_date_1))")
-
+
def testjoin(self):
self.assert_compile(
join(table2, table1, table1.c.myid == table2.c.otherid).select(),
]),
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
)
-
+
self.assert_compile(
join(users, addresses, users.c.user_id==addresses.c.user_id).select(),
"SELECT users.user_id, users.user_name, users.password, addresses.address_id, addresses.user_id, addresses.street, addresses.city, addresses.state, addresses.zip FROM users JOIN addresses ON users.user_id = addresses.user_id"
)
-
+
def testmultijoin(self):
self.assert_compile(
select([table1, table2, table3],
-
+
from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).outerjoin(table3, table1.c.myid==table3.c.userid)]
-
+
#from_obj = [outerjoin(join(table, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid==table3.c.userid)]
)
,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid = thirdtable.userid"
)
,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON mytable.myid = myothertable.otherid"
)
-
+
def testunion(self):
x = union(
select([table1], table1.c.myid == 5),
select([table1], table1.c.myid == 12),
order_by = [table1.c.myid],
)
-
+
self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \
FROM mytable WHERE mytable.myid = :mytable_myid UNION \
SELECT mytable.myid, mytable.name, mytable.description \
FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid")
-
+
self.assert_compile(
union(
select([table1]),
"SELECT mytable.myid, mytable.name, mytable.description \
FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \
FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable")
-
+
u = union(
select([table1]),
select([table2]),
select([table3])
)
assert u.corresponding_column(table2.c.otherid) is u.c.otherid
-
+
self.assert_compile(
union(
select([table1]),
FROM myothertable ORDER BY myid \
LIMIT 5 OFFSET 10"
)
-
+
self.assert_compile(
union(
select([table1.c.myid, table1.c.name, func.max(table1.c.description)], table1.c.name=='name2', group_by=[table1.c.myid, table1.c.name]),
"SELECT myothertable.otherid FROM myothertable EXCEPT SELECT thirdtable.userid FROM thirdtable \
UNION SELECT mytable.myid FROM mytable"
)
-
+
def testouterjoin(self):
query = select(
[table1, table2],
),
from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ]
)
- self.assert_compile(query,
+ self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \
WHERE mytable.name = %(mytable_name)s OR mytable.myid = %(mytable_myid)s OR \
stmt,
expected_named_stmt,
expected_positional_stmt,
- expected_default_params_dict,
+ expected_default_params_dict,
expected_default_params_list,
- test_param_dict,
+ test_param_dict,
expected_test_params_dict,
expected_test_params_list
) in [
{'myid':5, 'myid_1':6}, {'myid':5, 'myid_1':6}, [5,6]
),
]:
-
+
self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict)
self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
nonpositional = stmt.compile()
assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict).get_raw_dict()))
pp = positional.get_params(**test_param_dict)
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
-
+
# check that params() doesnt modify original statement
s = select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid')))
s2 = s.params({'myid':8, 'myotherid':7})
assert s.compile().params == {'myid':None, 'myotherid':None}
assert s2.compile().params == {'myid':8, 'myotherid':7}
assert s3.compile().params == {'myid':9, 'myotherid':7}
-
-
+
+
# check that conflicts with "unique" params are caught
s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid')))
try:
assert False
except exceptions.CompileError, err:
assert str(err) == "Bind parameter 'mytable_myid_1' conflicts with unique bind parameter of the same name"
-
+
def testbindascol(self):
t = table('foo', column('id'))
s = select([t, literal('lala').label('hoho')])
self.assert_compile(s, "SELECT foo.id, :literal AS hoho FROM foo")
assert [str(c) for c in s.c] == ["id", "hoho"]
-
+
def testin(self):
self.assert_compile(select([table1], table1.c.myid.in_(['a'])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)")
WHERE mytable.myid IN (\
SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid \
UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1)")
-
+
# test that putting a select in an IN clause does not blow away its ORDER BY clause
self.assert_compile(
- select([table1, table2],
+ select([table1, table2],
table2.c.otherid.in_(
select([table2.c.otherid], order_by=[table2.c.othername], limit=10, correlate=False)
),
),
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid WHERE myothertable.otherid IN (SELECT myothertable.otherid FROM myothertable ORDER BY myothertable.othername LIMIT 10) ORDER BY mytable.myid"
)
-
+
# test empty in clause
self.assert_compile(select([table1], table1.c.myid.in_([])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
-
+
def test_in_deprecated_api(self):
self.assert_compile(select([table1], table1.c.myid.in_('abc')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)")
-
+
self.assert_compile(select([table1], table1.c.myid.in_(1)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)")
-
+
self.assert_compile(select([table1], table1.c.myid.in_(1,2)),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)")
-
+
self.assert_compile(select([table1], table1.c.myid.in_()),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
-
+
def testcast(self):
tbl = table('casttest',
column('id', Integer),
column('v2', Float),
column('ts', TIMESTAMP),
)
-
+
def check_results(dialect, expected_results, literal):
self.assertEqual(len(expected_results), 5, 'Incorrect number of expected results')
self.assertEqual(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[0])
self.assertEqual(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), 'CAST(casttest.ts AS %s)' %expected_results[2])
self.assertEqual(str(cast(1234, TEXT).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3]))
self.assertEqual(str(cast('test', String(20)).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[4]))
- sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect)
- self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) \nFROM casttest")
+ # fixme: shoving all of this dialect-specific stuff in one test
+ # is now officialy completely ridiculous AND non-obviously omits
+ # coverage on other dialects.
+ sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect)
+ if isinstance(dialect, type(mysql.dialect())):
+ self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) \nFROM casttest")
+ else:
+ self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) \nFROM casttest")
+
# first test with Postgres engine
check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s')
# then the sqlite engine
check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
- # MySQL seems to only support DATE types for cast
- self.assertEqual(str(cast(tbl.c.ts, Date).compile(dialect=mysql.dialect())), 'CAST(casttest.ts AS DATE)')
- self.assertEqual(str(cast(tbl.c.ts, Numeric).compile(dialect=mysql.dialect())), 'casttest.ts')
+ # then the MySQL engine
+ check_results(mysql.dialect(), ['DECIMAL(10, 2)', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s')
def testdatebetween(self):
import datetime
- table = Table('dt', metadata,
+ table = Table('dt', metadata,
Column('date', Date))
self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)})
self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :literal AND :literal_1", checkparams={'literal':datetime.date(2006,6,1), 'literal_1':datetime.date(2006,6,5)})
-
+
def test_operator_precedence(self):
table = Table('op', metadata,
Column('field', Integer))
# insert with user-supplied bind params for specific columns,
# cols provided literally
self.assert_compile(
- insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}),
+ insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
-
+
# insert with user-supplied bind params for specific columns, cols
# provided as strings
self.assert_compile(
- insert(table1, dict(myid = 3, name = 'jack')),
+ insert(table1, dict(myid = 3, name = 'jack')),
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
)
# test with a tuple of params instead of named
self.assert_compile(
- insert(table1, (3, 'jack', 'mydescription')),
+ insert(table1, (3, 'jack', 'mydescription')),
"INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)",
checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'}
)
-
+
self.assert_compile(
insert(table1, values={table1.c.myid : bindparam('userid')}).values({table1.c.name : bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
)
-
+
def testinlineinsert(self):
metadata = MetaData()
- table = Table('sometable', metadata,
+ table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
Column('foo', Integer, default=func.foobar()))
- self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())")
- self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={})
-
+ self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())")
+ self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={})
+
def testinsertexpression(self):
self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())")
-
+
def testupdate(self):
self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table1.c.name:'fred'})
self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'})
c = s.compile(column_keys=['mytable_id', 'name'])
self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name), description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'})
self.assert_(str(s) == str(c))
-
+
def testupdateexpression(self):
- self.assert_compile(update(table1,
+ self.assert_compile(update(table1,
(table1.c.myid == func.hoho(4)) &
(table1.c.name == literal('foo') + table1.c.name + literal('lala')),
values = {
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
}), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal), name=(mytable.name || :mytable_name) "
"WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal_1 || mytable.name || :literal_2")
-
+
def testcorrelatedupdate(self):
# test against a straight text subquery
u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")})
mt = table1.alias()
u = update(table1, values = {table1.c.name : select([mt.c.name], mt.c.myid==table1.c.myid)})
self.assert_compile(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
-
+
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
u = table1.update(table1.c.name==s)
self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
-
+
def testdelete(self):
self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid")
-
+
def testcorrelateddelete(self):
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
m = MetaData()
foo = Table('foo', m,
Column('id', Integer))
-
+
t = Table('test', m,
Column('col1', Integer, default=func.foo(1)),
Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])),
)
self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo), col2=(SELECT coalesce(max(foo.id)) FROM foo), col3=:col3")
-
+
class SchemaTest(SQLCompileTest):
def testselect(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
def testalias(self):
a = alias(table4, 'remtable')
self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable WHERE remtable.datatype_id = :remtable_datatype_id")
-
+
def testupdate(self):
self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id WHERE remotetable.value = :remotetable_value")
-
+
def testinsert(self):
self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES (:rem_id, :datatype_id, :value)")
-
+
if __name__ == "__main__":
testbase.main()